|
@@ -4,14 +4,21 @@ import com.judong.chuanyiserver.dao.DataSourceDao;
|
|
|
import com.judong.chuanyiserver.dao.ItemGroupDao;
|
|
|
import com.judong.chuanyiserver.dao.RawDataDao;
|
|
|
import com.judong.chuanyiserver.entity.*;
|
|
|
+import com.judong.chuanyiserver.enums.DataSourceTypeEnum;
|
|
|
import com.judong.chuanyiserver.enums.ResultEnum;
|
|
|
import com.judong.chuanyiserver.exception.CustomException;
|
|
|
+import com.judong.chuanyiserver.task.KepServerTimerTask;
|
|
|
+import com.judong.chuanyiserver.task.OpcUaTimerTask;
|
|
|
import com.judong.chuanyiserver.util.*;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
-import lombok.val;
|
|
|
import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
|
|
|
import org.eclipse.milo.opcua.stack.core.types.builtin.*;
|
|
|
+import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.UInteger;
|
|
|
import org.eclipse.milo.opcua.stack.core.types.enumerated.TimestampsToReturn;
|
|
|
+import org.eclipse.milo.opcua.stack.core.types.structured.HistoryReadResponse;
|
|
|
+import org.eclipse.milo.opcua.stack.core.types.structured.HistoryReadResult;
|
|
|
+import org.eclipse.milo.opcua.stack.core.types.structured.HistoryReadValueId;
|
|
|
+import org.eclipse.milo.opcua.stack.core.types.structured.ReadRawModifiedDetails;
|
|
|
import org.openscada.opc.dcom.da.OPCSERVERSTATE;
|
|
|
import org.openscada.opc.lib.da.Item;
|
|
|
import org.openscada.opc.lib.da.*;
|
|
@@ -22,13 +29,12 @@ import org.springframework.stereotype.Component;
|
|
|
import javax.annotation.Resource;
|
|
|
import java.text.SimpleDateFormat;
|
|
|
import java.util.*;
|
|
|
-import java.util.concurrent.CompletableFuture;
|
|
|
import java.util.concurrent.Future;
|
|
|
|
|
|
import static com.judong.chuanyiserver.util.KepOpcServerUtil.getVal;
|
|
|
|
|
|
@Component
|
|
|
-@Async("asyncThreadPoolTaskExecutor")
|
|
|
+@Async("threadPoolTaskExecutor")
|
|
|
@Slf4j
|
|
|
public class OpcAsyncTask {
|
|
|
|
|
@@ -54,61 +60,23 @@ public class OpcAsyncTask {
|
|
|
log.info("异步任务B在执行,时间是:" + System.currentTimeMillis());
|
|
|
}
|
|
|
|
|
|
- public Future<String> testC(String xx) {
|
|
|
- return new AsyncResult(xx);
|
|
|
- }
|
|
|
-
|
|
|
- public void KepServerReadItemList(ServerInformation serverInformation, List<ChannelSetting> channelSettingList) {
|
|
|
- log.info("KepServer:" + serverInformation.getIpAddress() + "开始读取数据");
|
|
|
- try {
|
|
|
- String opcServerDaPoolKey = KepOpcServerUtil.generateOpcPoolKey(serverInformation);
|
|
|
- if (KepOpcServerUtil.validationKey(opcServerDaPoolKey)) {
|
|
|
- Server server = KepOpcServerUtil.getServer(opcServerDaPoolKey);
|
|
|
- if (null == server.getServerState()) {
|
|
|
- throw new CustomException(ResultEnum.NOT_FOUND.getRespCode(), "连接失败");
|
|
|
- }
|
|
|
- if (OPCSERVERSTATE.OPC_STATUS_RUNNING == server.getServerState().getServerState()) {
|
|
|
- Group group = server.addGroup();
|
|
|
- group.addItems();
|
|
|
- //循环读值需要使用到的
|
|
|
- final AccessBase access = new SyncAccess(server, 1000);
|
|
|
- //开始读值
|
|
|
- access.bind();
|
|
|
- //停止读值
|
|
|
- access.unbind();
|
|
|
- Iterator<ChannelSetting> iterator = channelSettingList.iterator();
|
|
|
- while (iterator.hasNext()) {
|
|
|
- Item item = group.addItem(iterator.next().getChannelName());
|
|
|
- Map<String, Object> value = getVal(item.read(true).getValue());
|
|
|
- System.err.println(iterator.next().getChannelName() + ":" + value);
|
|
|
- }
|
|
|
- KepOpcServerUtil.returnServer(opcServerDaPoolKey, server);
|
|
|
- }
|
|
|
- }
|
|
|
- } catch (Exception e) {
|
|
|
- throw new CustomException(ResultEnum.REQUEST_TIME_OUT.getRespCode(), e.getMessage());
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- public void KepServerCloseReadItemList(ServerInformation serverInformation, List<ChannelSetting> channelSettingList) {
|
|
|
- log.info("KepServer:" + serverInformation.getIpAddress() + "停止读取数据");
|
|
|
+ public void testXXX() {
|
|
|
+ Timer timer = new Timer();
|
|
|
+ timer.schedule(new OpcUaTimerTask(), 1000, 2000);
|
|
|
}
|
|
|
|
|
|
- public void OpcServerUaReadItemList(ServerInformation serverInformation, List<ChannelSetting> channelSettingList) {
|
|
|
- log.info("OpcServerUa:opc.tcp://" + serverInformation.getIpAddress() + ":" + serverInformation.getIpPort() + "开始读取数据");
|
|
|
- }
|
|
|
-
|
|
|
- public void OpcServerUaCloseReadItemList(ServerInformation serverInformation, List<ChannelSetting> channelSettingList) {
|
|
|
- log.info("OpcServerUa:opc.tcp://" + serverInformation.getIpAddress() + ":" + serverInformation.getIpPort() + "停止读取数据");
|
|
|
+ public Future<String> testC(String xx) {
|
|
|
+ return new AsyncResult(xx);
|
|
|
}
|
|
|
|
|
|
//异步读取kepserver
|
|
|
public void runKepServer(ItemGroup itemGroup, DataSource dataSource) {
|
|
|
Integer id = itemGroup.getId();
|
|
|
- Server server = KepOpcServerUtil.createServer(dataSource);
|
|
|
+ Server server = null;
|
|
|
List<String> itemList = itemGroupDao.getItemByIdChange(id);
|
|
|
String[] items = itemList.toArray(new String[]{});
|
|
|
try {
|
|
|
+ server = KepOpcServerUtil.createServer(dataSource);
|
|
|
server.connect();
|
|
|
if (null == server.getServerState()) {
|
|
|
throw new CustomException(ResultEnum.NOT_FOUND.getRespCode(), "连接失败");
|
|
@@ -139,45 +107,75 @@ public class OpcAsyncTask {
|
|
|
}
|
|
|
} catch (Exception e) {
|
|
|
e.printStackTrace();
|
|
|
+ redisUtil.convertAndSend(ConstantStr.ITEM_GROUP, Result.no(ResultEnum.SERVER_ERROR.getRespCode(), "组" + itemGroup.getGroupName() + "运行异常,错误信息为:" + e.getMessage()));
|
|
|
} finally {
|
|
|
- server.dispose();
|
|
|
+ if (Blank.isNotEmpty(server)) {
|
|
|
+ server.dispose();
|
|
|
+ }
|
|
|
redisUtil.del(ConstantStr.ITEM_GROUP + id);
|
|
|
itemGroupDao.runItemGroupById(id, ConstantStr.STOP_IT);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
//异步读取OpcUa
|
|
|
- public void runOpcUa(ItemGroup itemGroup, DataSource dataSource) throws Exception {
|
|
|
+ public void runOpcUa(ItemGroup itemGroup, DataSource dataSource) {
|
|
|
Integer id = itemGroup.getId();
|
|
|
- OpcUaClient opcUaClient = OpcUaUtil.createClient(dataSource);
|
|
|
+ DataSourceType dataSourceType = dataSourceDao.getDataSourceTypeById(dataSource.getTypeId());
|
|
|
+ OpcUaClient opcUaClient = null;
|
|
|
List<com.judong.chuanyiserver.entity.Item> itemList = itemGroupDao.getItemListByGroupId(id);
|
|
|
- List<NodeId> nodeIdList = OpcUaUtil.genNodeId(itemList);
|
|
|
try {
|
|
|
+ opcUaClient = OpcUaUtil.createClient(dataSource);
|
|
|
opcUaClient.connect().get();
|
|
|
- Boolean flage = true;
|
|
|
- while (flage) {
|
|
|
- List<RawData> rawDataList = new ArrayList<>();
|
|
|
- List<DataValue> valueList = opcUaClient.readValues(0.0, TimestampsToReturn.Neither, nodeIdList).get();
|
|
|
- Date date = new Date();
|
|
|
- for (DataValue dataValue : valueList) {
|
|
|
- StatusCode statusCode = dataValue.getStatusCode();
|
|
|
- if (Blank.isNotEmpty(statusCode)) {
|
|
|
- if (statusCode.isGood()) {
|
|
|
+ if (dataSourceType.getDataSourceTypeKey().equals(DataSourceTypeEnum.OPC_UA_REAL.getValue())) {
|
|
|
+ Boolean flage = true;
|
|
|
+ List<NodeId> nodeIdList = OpcUaUtil.genNodeId(itemList);
|
|
|
+ while (flage) {
|
|
|
+ List<RawData> rawDataList = new ArrayList<>();
|
|
|
+ List<DataValue> valueList = opcUaClient.readValues(0.0, TimestampsToReturn.Both, nodeIdList).get();
|
|
|
+ Date date = new Date();
|
|
|
+ if(Blank.isNotEmpty(nodeIdList)){
|
|
|
+ for (int i = 0; i < nodeIdList.size(); i++) {
|
|
|
+ DataValue dataValue = valueList.get(i);
|
|
|
Variant value = dataValue.getValue();
|
|
|
ExpandedNodeId expandedNodeId = value.getDataType().get();
|
|
|
- rawDataList.add(new RawData(itemGroup.getDataSourceId(), expandedNodeId.getIdentifier().toString(), expandedNodeId.getType().toString(), value.getValue().toString(), date));
|
|
|
+ rawDataList.add(new RawData(itemGroup.getDataSourceId(), nodeIdList.get(i).getIdentifier().toString(), expandedNodeId.getType().toString(), value.getValue().toString(), date));
|
|
|
}
|
|
|
}
|
|
|
+// for (DataValue dataValue : valueList) {
|
|
|
+// StatusCode statusCode = dataValue.getStatusCode();
|
|
|
+// if (Blank.isNotEmpty(statusCode)) {
|
|
|
+// if (statusCode.isGood()) {
|
|
|
+// Variant value = dataValue.getValue();
|
|
|
+// ExpandedNodeId expandedNodeId = value.getDataType().get();
|
|
|
+// rawDataList.add(new RawData(itemGroup.getDataSourceId(), expandedNodeId.getIdentifier().toString(), expandedNodeId.getType().toString(), value.getValue().toString(), date));
|
|
|
+// }
|
|
|
+// }
|
|
|
+// }
|
|
|
+ addRawDataList(id, rawDataList);
|
|
|
+ Thread.sleep(itemGroup.getModeValue() * 1000);
|
|
|
+ flage = (Boolean) redisUtil.get(ConstantStr.ITEM_GROUP + id);
|
|
|
+ if (Blank.isEmpty(flage)) {
|
|
|
+ flage = false;
|
|
|
+ }
|
|
|
}
|
|
|
- addRawDataList(id, rawDataList);
|
|
|
- Thread.sleep(itemGroup.getModeValue() * 1000);
|
|
|
- flage = (Boolean) redisUtil.get(ConstantStr.ITEM_GROUP + id);
|
|
|
- if (Blank.isEmpty(flage)) {
|
|
|
- flage = false;
|
|
|
+ } else if (dataSourceType.getDataSourceTypeKey().equals(DataSourceTypeEnum.OPC_UA_HISTORY.getValue())) {
|
|
|
+ List<HistoryReadValueId> historyReadValueIdList = OpcUaUtil.genHisNodeId(itemList);
|
|
|
+ List<RawData> rawDataList = new ArrayList<>();
|
|
|
+ Date date = new Date();
|
|
|
+ HistoryReadResponse historyReadResponse = opcUaClient.historyRead(new ReadRawModifiedDetails(false, DateTime.MIN_VALUE, DateTime.MIN_VALUE, UInteger.MAX, false),
|
|
|
+ TimestampsToReturn.Both, true, historyReadValueIdList).get();
|
|
|
+ HistoryReadResult[] results = historyReadResponse.getResults();
|
|
|
+ for (HistoryReadResult historyReadResult : results) {
|
|
|
+ System.out.println(historyReadResult);
|
|
|
+// rawDataList.add(new RawData(itemGroup.getDataSourceId(), expandedNodeId.getIdentifier().toString(), expandedNodeId.getType().toString(), value.getValue().toString(), date));
|
|
|
}
|
|
|
+ addRawDataList(id, rawDataList);
|
|
|
+ } else {
|
|
|
+ throw new CustomException(ResultEnum.SERVER_ERROR.getRespCode(), "目前还没有此种类型的连接方式");
|
|
|
}
|
|
|
} catch (Exception e) {
|
|
|
e.printStackTrace();
|
|
|
+ redisUtil.convertAndSend(ConstantStr.ITEM_GROUP, Result.no(ResultEnum.SERVER_ERROR.getRespCode(), "组" + itemGroup.getGroupName() + "运行异常,错误信息为:" + e.getMessage()));
|
|
|
} finally {
|
|
|
if (Blank.isNotEmpty(opcUaClient)) {
|
|
|
opcUaClient.disconnect();
|