|
@@ -11,13 +11,14 @@ import com.judong.chuanyiserver.task.OpcUaTimerTask;
|
|
|
import com.judong.chuanyiserver.util.*;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
|
|
|
+import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaSubscription;
|
|
|
+import org.eclipse.milo.opcua.stack.core.AttributeId;
|
|
|
import org.eclipse.milo.opcua.stack.core.types.builtin.*;
|
|
|
import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.UInteger;
|
|
|
+import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.Unsigned;
|
|
|
+import org.eclipse.milo.opcua.stack.core.types.enumerated.MonitoringMode;
|
|
|
import org.eclipse.milo.opcua.stack.core.types.enumerated.TimestampsToReturn;
|
|
|
-import org.eclipse.milo.opcua.stack.core.types.structured.HistoryReadResponse;
|
|
|
-import org.eclipse.milo.opcua.stack.core.types.structured.HistoryReadResult;
|
|
|
-import org.eclipse.milo.opcua.stack.core.types.structured.HistoryReadValueId;
|
|
|
-import org.eclipse.milo.opcua.stack.core.types.structured.ReadRawModifiedDetails;
|
|
|
+import org.eclipse.milo.opcua.stack.core.types.structured.*;
|
|
|
import org.springframework.scheduling.annotation.Async;
|
|
|
import org.springframework.scheduling.annotation.AsyncResult;
|
|
|
import org.springframework.stereotype.Component;
|
|
@@ -46,23 +47,6 @@ public class OpcAsyncTask {
|
|
|
@Resource
|
|
|
private RedisUtil redisUtil;
|
|
|
|
|
|
- public void testA() {
|
|
|
- log.info("异步任务A在执行,时间是:" + System.currentTimeMillis());
|
|
|
- }
|
|
|
-
|
|
|
- public void testB() {
|
|
|
- log.info("异步任务B在执行,时间是:" + System.currentTimeMillis());
|
|
|
- }
|
|
|
-
|
|
|
- public void testXXX() {
|
|
|
- Timer timer = new Timer();
|
|
|
- timer.schedule(new OpcUaTimerTask(), 1000, 2000);
|
|
|
- }
|
|
|
-
|
|
|
- public Future<String> testC(String xx) {
|
|
|
- return new AsyncResult(xx);
|
|
|
- }
|
|
|
-
|
|
|
//异步读取OpcUa
|
|
|
public void runOpcUa(ItemGroup itemGroup, DataSource dataSource, Map<String, DataModel> map) {
|
|
|
Integer id = itemGroup.getId();
|
|
@@ -125,13 +109,86 @@ public class OpcAsyncTask {
|
|
|
}
|
|
|
} catch (Exception e) {
|
|
|
String message = e.getMessage();
|
|
|
+ String messageContent = "";
|
|
|
+ if (message.contains("Bad_ConnectionRejected")) {
|
|
|
+ messageContent = "ip连接不可用";
|
|
|
+ } else if (message.contains("Bad_UserAccessDenied")) {
|
|
|
+ messageContent = "用户无权执行请求的操作";
|
|
|
+ } else {
|
|
|
+ messageContent = e.getMessage();
|
|
|
+ }
|
|
|
+ redisUtil.convertAndSend(ConstantStr.ITEM_GROUP, Result.no(ResultEnum.SERVER_ERROR.getRespCode(), "组" + itemGroup.getGroupName() + "运行异常,错误信息为:" + messageContent));
|
|
|
+ } finally {
|
|
|
+ if (Blank.isNotEmpty(opcUaClient)) {
|
|
|
+ opcUaClient.disconnect();
|
|
|
+ redisUtil.del(ConstantStr.ITEM_GROUP + id);
|
|
|
+ itemGroupDao.runItemGroupById(id, ConstantStr.STOP_IT);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ //异步读取OpcUa
|
|
|
+ public void runOpcUa1(ItemGroup itemGroup, DataSource dataSource, Map<String, DataModel> map) {
|
|
|
+ Integer id = itemGroup.getId();
|
|
|
+ DataSourceType dataSourceType = dataSourceDao.getDataSourceTypeById(dataSource.getTypeId());
|
|
|
+ OpcUaClient opcUaClient = null;
|
|
|
+ List<com.judong.chuanyiserver.entity.Item> itemList = itemGroupDao.getItemListByGroupId(id);
|
|
|
+ try {
|
|
|
+ opcUaClient = OpcUaUtil.createClient(dataSource);
|
|
|
+ opcUaClient.connect().get();
|
|
|
+// List<NodeId> nodeIdList = OpcUaUtil.genNodeId(itemList);
|
|
|
+ if (dataSourceType.getDataSourceTypeKey().equals(DataSourceTypeEnum.OPC_UA_REAL.getValue())) {
|
|
|
+ if (itemGroup.getReadMode() == ConstantStr.ON_FREQUENCY) {
|
|
|
+ //创建发布间隔为1000ms的订阅对象
|
|
|
+ UaSubscription uaSubscription = opcUaClient.getSubscriptionManager().createSubscription(1000.0).get();
|
|
|
+ // 监控项请求列表
|
|
|
+ List<MonitoredItemCreateRequest> requests = new ArrayList<>();
|
|
|
+ for (Item item : itemList) {
|
|
|
+ // 创建监控的参数
|
|
|
+ MonitoringParameters parameters = new MonitoringParameters(uaSubscription.nextClientHandle(),
|
|
|
+ 1000.0, // 取样间隔
|
|
|
+ null, // 过滤器,null表示使用默认值
|
|
|
+ Unsigned.uint(10), // 队列大小
|
|
|
+ true // 丢弃旧的
|
|
|
+ );
|
|
|
+ // 创建订阅的变量, 创建监控项请 求
|
|
|
+ MonitoredItemCreateRequest request = new MonitoredItemCreateRequest(
|
|
|
+ new ReadValueId(new NodeId(item.getNodeIndex(), item.getItemReadName()), AttributeId.Value.uid(),
|
|
|
+ null, null),
|
|
|
+ MonitoringMode.Reporting, parameters);
|
|
|
+ requests.add(request);
|
|
|
+ }
|
|
|
+ // 创建监控项,并且注册变量值改变时候的回调函数
|
|
|
+ uaSubscription.createMonitoredItems(TimestampsToReturn.Both, requests, (item, id1) -> {
|
|
|
+ item.setValueConsumer((i, v) -> {
|
|
|
+
|
|
|
+ log.info("item={}, value={}", i.getReadValueId().getNodeId(), v.getValue());
|
|
|
+ log.info("时间戳={}, value={}", i.getTimestamps(), v.getValue());
|
|
|
+ });
|
|
|
+ }).get();
|
|
|
+ } else if (itemGroup.getReadMode() == ConstantStr.ON_CHANGE) {
|
|
|
+
|
|
|
+ } else if (itemGroup.getReadMode() == ConstantStr.EXCEED_SET_VALUE) {
|
|
|
+
|
|
|
+ } else if (itemGroup.getReadMode() == ConstantStr.LOWER_SET_VALUE) {
|
|
|
+
|
|
|
+ } else {
|
|
|
+ throw new CustomException(ResultEnum.REQUEST_WRONGPARAMS.getRespCode(), "目前未适配其他类型的读取模式");
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ throw new CustomException(ResultEnum.SERVER_ERROR.getRespCode(), "目前还没有此种类型的连接方式");
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ String message = e.getMessage();
|
|
|
+ String messageContent = "";
|
|
|
if (message.contains("Bad_ConnectionRejected")) {
|
|
|
- redisUtil.convertAndSend(ConstantStr.ITEM_GROUP, Result.no(ResultEnum.SERVER_ERROR.getRespCode(), "组" + itemGroup.getGroupName() + "运行异常,错误信息为:" + "ip连接不可用"));
|
|
|
+ messageContent = "ip连接不可用";
|
|
|
} else if (message.contains("Bad_UserAccessDenied")) {
|
|
|
- redisUtil.convertAndSend(ConstantStr.ITEM_GROUP, Result.no(ResultEnum.SERVER_ERROR.getRespCode(), "组" + itemGroup.getGroupName() + "运行异常,错误信息为:" + "用户无权执行请求的操作"));
|
|
|
+ messageContent = "用户无权执行请求的操作";
|
|
|
} else {
|
|
|
- redisUtil.convertAndSend(ConstantStr.ITEM_GROUP, Result.no(ResultEnum.SERVER_ERROR.getRespCode(), "组" + itemGroup.getGroupName() + "运行异常,错误信息为:" + e.getMessage()));
|
|
|
+ messageContent = e.getMessage();
|
|
|
}
|
|
|
+ redisUtil.convertAndSend(ConstantStr.ITEM_GROUP, Result.no(ResultEnum.SERVER_ERROR.getRespCode(), "组" + itemGroup.getGroupName() + "运行异常,错误信息为:" + messageContent));
|
|
|
} finally {
|
|
|
if (Blank.isNotEmpty(opcUaClient)) {
|
|
|
opcUaClient.disconnect();
|