Переглянути джерело

因为IFIX,WINCC,横河读取没得问题,所以将opcda读取版本测回,并保留新版本备份

zhoupeng 1 рік тому
батько
коміт
c8c1ee1605

+ 53 - 45
chaunyi_opc/opc_da/src/main/java/com/example/opc_da/task/OpcDaChangeTask.java

@@ -94,20 +94,9 @@ public class OpcDaChangeTask extends TimerTask {
                             ConstantStr.NO_READ));
                     Timer listenerTimer = new Timer();
                     listenerTimer.schedule(new TimerTask() {
-                        private int sum = 0;
-
                         @Override
                         public void run() {
                             try {
-                                sum++;
-                                if (sum > 3) {
-                                    redisUtil.set(ConstantStr.ITEM_GROUP + id, false);
-                                    messageNoticeDao.addMessageNotice(new MessageNotice(itemGroup.getUserId(),
-                                            itemGroup.getGroupName() + DateUtil.dateChangeStrYmdhms(new Date()) + "服务重新连接失败",
-                                            "3次服务重新连接失败",
-                                            ConstantStr.NO_READ));
-                                    listenerTimer.cancel();
-                                }
                                 server.connect();
                                 if (Blank.isNotEmpty(server.getServerState()) && server.getServerState().getServerState() == OPCSERVERSTATE.OPC_STATUS_RUNNING) {
                                     messageNoticeDao.addMessageNotice(new MessageNotice(itemGroup.getUserId(),
@@ -117,16 +106,16 @@ public class OpcDaChangeTask extends TimerTask {
                                     listenerTimer.cancel();
                                 }
                             } catch (Exception e) {
-                                redisUtil.set(ConstantStr.ITEM_GROUP + id, false);
-                                String message = OpcDaUtil.genException(e.getMessage());
-                                messageNoticeDao.addMessageNotice(new MessageNotice(itemGroup.getUserId(),
-                                        itemGroup.getGroupName() + DateUtil.dateChangeStrYmdhms(new Date()) + "服务重新连接失败",
-                                        message,
-                                        ConstantStr.NO_READ));
-                                listenerTimer.cancel();
+//                                redisUtil.set(ConstantStr.ITEM_GROUP + id, false);
+//                                String message = OpcDaUtil.genException(e.getMessage());
+//                                messageNoticeDao.addMessageNotice(new MessageNotice(itemGroup.getUserId(),
+//                                        itemGroup.getGroupName() + DateUtil.dateChangeStrYmdhms(new Date()) + "服务重新连接失败",
+//                                        message,
+//                                        ConstantStr.NO_READ));
+//                                listenerTimer.cancel();
                             }
                         }
-                    }, 10000);
+                    }, 60000);
                 }
             });
         } catch (Exception e) {
@@ -150,7 +139,6 @@ public class OpcDaChangeTask extends TimerTask {
                     private BigDecimal oldValue;
                     private String sqlCurrentYmdh = "";
                     private Long index = 0L;
-                    private Long timeStamp = 0L;
 
                     @Override
                     public void changed(Item item1, ItemState itemState) {
@@ -163,49 +151,69 @@ public class OpcDaChangeTask extends TimerTask {
                             Object value = val.get("value");
                             //值对应取值的时间
                             Date time = itemState.getTimestamp().getTime();
-                            long time1 = time.getTime();
                             String currentYmdh = DateUtil.dateChangeStr(time, timeFormat);
                             String currentYmdhmss = DateUtil.dateChangeStrYmdhmss(time);
                             if (Blank.isEmpty(sqlCurrentYmdh)) {
                                 sqlCurrentYmdh = currentYmdh;
-                                timeStamp = time1;
                                 redisUtil.set(ConstantStr.VALUE_BELONG_TIME + id, currentYmdh, ConstantStr.TWO_HOUR);
                             } else {
-                                //如果上次时间戳和这次时间戳相等,说明值未发生变化,不需要进行下面的操作
-                                if (timeStamp == time1) {
-                                    return;
-                                }
                                 if (!sqlCurrentYmdh.equals(currentYmdh)) {
                                     //组装相应的原始数据
                                     opcAsyncTask.packageRawDataList(itemList, dataSourceId, sqlCurrentYmdh);
                                     redisUtil.set(ConstantStr.VALUE_BELONG_TIME + id, currentYmdh, ConstantStr.TWO_HOUR);
                                     sqlCurrentYmdh = currentYmdh;
-                                    timeStamp = time1;
                                 }
                             }
+                            String valueStr = value.toString();
                             if (javaType.toLowerCase().equals("boolean")) {
-                                Boolean dmData = JSON.parseObject(value.toString(), Boolean.class);
-                                CursorRawData cursorRawData = new CursorRawData(id, dataSourceId, itemId, javaType, JSON.toJSONString(dmData), JSON.toJSONString(dmData),
-                                        currentYmdhmss, index, currentYmdh, ConstantStr.NOT_RECORD, new Date());
-                                opcAsyncTask.addCursorRawData(cursorRawData);
-                            } else {
-                                try {
-                                    BigDecimal bigDecimal = JSON.parseObject(value.toString(), BigDecimal.class);
-                                    BigDecimal dmData = Blank.isNotEmpty(dm) ?
-                                            MathUtil.quadricOperation(dm.getMathParameter(), dm.getOperationRule(), bigDecimal) :
-                                            bigDecimal;
-                                    CursorRawData cursorRawData = new CursorRawData(id, dataSourceId, itemId, javaType, JSON.toJSONString(bigDecimal), JSON.toJSONString(dmData),
-                                            currentYmdhmss, index, currentYmdh,
-                                            Blank.isEmpty(oldValue) ? ConstantStr.IS_RECORD : MathUtil.isMeetChange(oldValue,
-                                                    dmData, new BigDecimal(modeValue), readModeType), new Date());
+                                Boolean data = JSON.parseObject(valueStr, Boolean.class);
+                                if (Blank.isNotEmpty(dm) && dm.getModelType().equals(ConstantStr.VALUE_REPLACE) && dm.getOperationRule().equals(valueStr)) {
+                                    CursorRawData cursorRawData = new CursorRawData(id, dataSourceId, itemId, javaType, JSON.toJSONString(data), dm.getReplacingValue(),
+                                            currentYmdhmss, index, currentYmdh, ConstantStr.NOT_RECORD, new Date());
                                     opcAsyncTask.addCursorRawData(cursorRawData);
-                                    oldValue = dmData;
-                                } catch (Exception e) {
-                                    String valueStr = value.toString();
-                                    CursorRawData cursorRawData = new CursorRawData(id, dataSourceId, itemId, javaType, valueStr, "".equals(valueStr) ? ConstantStr.STRING_EMPTY : valueStr,
+                                } else {
+                                    CursorRawData cursorRawData = new CursorRawData(id, dataSourceId, itemId, javaType, JSON.toJSONString(data), JSON.toJSONString(data),
                                             currentYmdhmss, index, currentYmdh, ConstantStr.NOT_RECORD, new Date());
                                     opcAsyncTask.addCursorRawData(cursorRawData);
                                 }
+                            } else {
+                                try {
+                                    BigDecimal bigDecimal = JSON.parseObject(valueStr, BigDecimal.class);
+                                    if (Blank.isNotEmpty(dm) && dm.getModelType().equals(ConstantStr.VALUE_REPLACE) && dm.getOperationRule().equals(valueStr)) {
+                                        CursorRawData cursorRawData = new CursorRawData(id, dataSourceId, itemId, javaType, JSON.toJSONString(bigDecimal), dm.getReplacingValue(),
+                                                currentYmdhmss, index, currentYmdh, ConstantStr.NOT_RECORD, new Date());
+                                        opcAsyncTask.addCursorRawData(cursorRawData);
+                                    } else {
+                                        BigDecimal dmData = Blank.isNotEmpty(dm) ?
+                                                MathUtil.quadricOperation(dm.getMathParameter(), dm.getOperationRule(), bigDecimal) :
+                                                bigDecimal;
+                                        CursorRawData cursorRawData = new CursorRawData(id, dataSourceId, itemId, javaType, JSON.toJSONString(bigDecimal), JSON.toJSONString(dmData),
+                                                currentYmdhmss, index, currentYmdh,
+                                                Blank.isEmpty(oldValue) ? ConstantStr.IS_RECORD : MathUtil.isMeetChange(oldValue,
+                                                        dmData, new BigDecimal(modeValue), readModeType), new Date());
+                                        opcAsyncTask.addCursorRawData(cursorRawData);
+                                        oldValue = dmData;
+                                    }
+//                                    BigDecimal dmData = Blank.isNotEmpty(dm) ?
+//                                            MathUtil.quadricOperation(dm.getMathParameter(), dm.getOperationRule(), bigDecimal) :
+//                                            bigDecimal;
+//                                    CursorRawData cursorRawData = new CursorRawData(id, dataSourceId, itemId, javaType, JSON.toJSONString(bigDecimal), JSON.toJSONString(dmData),
+//                                            currentYmdhmss, index, currentYmdh,
+//                                            Blank.isEmpty(oldValue) ? ConstantStr.IS_RECORD : MathUtil.isMeetChange(oldValue,
+//                                                    dmData, new BigDecimal(modeValue), readModeType), new Date());
+//                                    opcAsyncTask.addCursorRawData(cursorRawData);
+//                                    oldValue = dmData;
+                                } catch (Exception e) {
+                                    if (Blank.isNotEmpty(dm) && dm.getModelType().equals(ConstantStr.VALUE_REPLACE) && dm.getOperationRule().equals(valueStr)) {
+                                        CursorRawData cursorRawData = new CursorRawData(id, dataSourceId, itemId, javaType, valueStr, dm.getReplacingValue(),
+                                                currentYmdhmss, index, currentYmdh, ConstantStr.NOT_RECORD, new Date());
+                                        opcAsyncTask.addCursorRawData(cursorRawData);
+                                    } else {
+                                        CursorRawData cursorRawData = new CursorRawData(id, dataSourceId, itemId, javaType, valueStr, "".equals(valueStr) ? ConstantStr.STRING_EMPTY : valueStr,
+                                                currentYmdhmss, index, currentYmdh, ConstantStr.NOT_RECORD, new Date());
+                                        opcAsyncTask.addCursorRawData(cursorRawData);
+                                    }
+                                }
                             }
                         } catch (Exception e) {
                             //执行组装数据库的原始数据

+ 310 - 0
chaunyi_opc/opc_da/src/main/java/com/example/opc_da/task/OpcDaChangeTask1.java

@@ -0,0 +1,310 @@
+package com.example.opc_da.task;
+
+import com.alibaba.fastjson.JSON;
+import com.example.opc_common.entity.*;
+import com.example.opc_common.util.Blank;
+import com.example.opc_common.util.ConstantStr;
+import com.example.opc_common.util.DateUtil;
+import com.example.opc_common.util.MathUtil;
+import com.example.opc_da.dao.ItemGroupDao;
+import com.example.opc_da.dao.MessageNoticeDao;
+import com.example.opc_da.dynamicSchedule.CronTaskRegister;
+import com.example.opc_da.util.OpcDaUtil;
+import com.example.opc_da.util.RedisUtil;
+import lombok.extern.slf4j.Slf4j;
+import org.openscada.opc.dcom.da.OPCSERVERSTATE;
+import org.openscada.opc.lib.da.Item;
+import org.openscada.opc.lib.da.*;
+
+import java.math.BigDecimal;
+import java.util.*;
+
+@Slf4j
+public class OpcDaChangeTask1 extends TimerTask {
+
+    private final RedisUtil redisUtil;
+
+    private final OpcAsyncTask opcAsyncTask;
+
+    private final String cronId;
+
+    private final CronTaskRegister cronTaskRegister;
+
+    private final ItemGroupDao itemGroupDao;
+
+    private final MessageNoticeDao messageNoticeDao;
+
+    private final Timer timer;
+
+    private final ItemGroup itemGroup;
+
+    private final DataSource dataSource;
+
+    private final Map<String, DataModel> map;
+
+    private final List<com.example.opc_common.entity.Item> itemList;
+
+    private final String timeFormat;
+
+    private final Long endTime;
+
+    public OpcDaChangeTask1(RedisUtil redisUtil,
+                            OpcAsyncTask opcAsyncTask,
+                            String cronId,
+                            CronTaskRegister cronTaskRegister,
+                            ItemGroupDao itemGroupDao,
+                            MessageNoticeDao messageNoticeDao,
+                            Timer timer,
+                            ItemGroup itemGroup,
+                            DataSource dataSource,
+                            Map<String, DataModel> map,
+                            List<com.example.opc_common.entity.Item> itemList,
+                            String timeFormat,
+                            Long endTime) {
+        this.redisUtil = redisUtil;
+        this.opcAsyncTask = opcAsyncTask;
+        this.cronId = cronId;
+        this.cronTaskRegister = cronTaskRegister;
+        this.itemGroupDao = itemGroupDao;
+        this.messageNoticeDao = messageNoticeDao;
+        this.timer = timer;
+        this.itemGroup = itemGroup;
+        this.dataSource = dataSource;
+        this.map = map;
+        this.itemList = itemList;
+        this.timeFormat = timeFormat;
+        this.endTime = endTime;
+    }
+
+    @Override
+    public void run() {
+        Server server = OpcDaUtil.createServer(dataSource);
+        Integer id = itemGroup.getId();
+        Integer readModeType = itemGroup.getReadModeType();
+        Double modeValue = itemGroup.getModeValue();
+        Integer dataSourceId = dataSource.getId();
+        SyncAccess access = null;
+        try {
+            server.connect();
+            server.addStateListener(connected -> {
+                if (!connected) {
+                    messageNoticeDao.addMessageNotice(new MessageNotice(itemGroup.getUserId(),
+                            itemGroup.getGroupName() + DateUtil.dateChangeStrYmdhms(new Date()) + "服务断开",
+                            "服务断开,马山进行重新连接",
+                            ConstantStr.NO_READ));
+                    Timer listenerTimer = new Timer();
+                    listenerTimer.schedule(new TimerTask() {
+                        private int sum = 0;
+
+                        @Override
+                        public void run() {
+                            try {
+                                sum++;
+                                if (sum > 3) {
+                                    redisUtil.set(ConstantStr.ITEM_GROUP + id, false);
+                                    messageNoticeDao.addMessageNotice(new MessageNotice(itemGroup.getUserId(),
+                                            itemGroup.getGroupName() + DateUtil.dateChangeStrYmdhms(new Date()) + "服务重新连接失败",
+                                            "3次服务重新连接失败",
+                                            ConstantStr.NO_READ));
+                                    listenerTimer.cancel();
+                                }
+                                server.connect();
+                                if (Blank.isNotEmpty(server.getServerState()) && server.getServerState().getServerState() == OPCSERVERSTATE.OPC_STATUS_RUNNING) {
+                                    messageNoticeDao.addMessageNotice(new MessageNotice(itemGroup.getUserId(),
+                                            itemGroup.getGroupName() + DateUtil.dateChangeStrYmdhms(new Date()) + "服务重新连接成功",
+                                            "服务重新连接成功",
+                                            ConstantStr.NO_READ));
+                                    listenerTimer.cancel();
+                                }
+                            } catch (Exception e) {
+                                redisUtil.set(ConstantStr.ITEM_GROUP + id, false);
+                                String message = OpcDaUtil.genException(e.getMessage());
+                                messageNoticeDao.addMessageNotice(new MessageNotice(itemGroup.getUserId(),
+                                        itemGroup.getGroupName() + DateUtil.dateChangeStrYmdhms(new Date()) + "服务重新连接失败",
+                                        message,
+                                        ConstantStr.NO_READ));
+                                listenerTimer.cancel();
+                            }
+                        }
+                    }, 10000);
+                }
+            });
+        } catch (Exception e) {
+            String message = OpcDaUtil.genException(e.getMessage());
+            messageNoticeDao.addMessageNotice(new MessageNotice(itemGroup.getUserId(),
+                    itemGroup.getGroupName() + DateUtil.dateChangeStrYmdhms(new Date()) + "运行失败",
+                    message,
+                    ConstantStr.NO_READ));
+            if (Blank.isNotEmpty(server)) {
+                server.dispose();
+            }
+            itemGroupDao.stopItemGroupById(itemGroup.getId(), ConstantStr.EXCEPT_STOP_UP);
+            cronTaskRegister.removeCronTask(cronId);
+            timer.cancel();
+        }
+        try {
+            access = new SyncAccess(server, 500);
+            for (com.example.opc_common.entity.Item item : itemList) {
+                String itemId = item.getItemReadName();
+                access.addItem(itemId, new DataCallback() {
+                    private BigDecimal oldValue;
+                    private String sqlCurrentYmdh = "";
+                    private Long index = 0L;
+                    private Long timeStamp = 0L;
+
+                    @Override
+                    public void changed(Item item1, ItemState itemState) {
+                        index++;
+                        try {
+                            Map<String, Object> val = OpcDaUtil.getVal(itemState.getValue());
+                            DataModel dm = map.get(itemId);
+                            //读取的值
+                            String javaType = String.valueOf(val.get("javaType"));
+                            Object value = val.get("value");
+                            //值对应取值的时间
+                            Date time = itemState.getTimestamp().getTime();
+                            long time1 = time.getTime();
+                            String currentYmdh = DateUtil.dateChangeStr(time, timeFormat);
+                            String currentYmdhmss = DateUtil.dateChangeStrYmdhmss(time);
+                            if (Blank.isEmpty(sqlCurrentYmdh)) {
+                                sqlCurrentYmdh = currentYmdh;
+                                timeStamp = time1;
+                                redisUtil.set(ConstantStr.VALUE_BELONG_TIME + id, currentYmdh, ConstantStr.TWO_HOUR);
+                            } else {
+                                //如果上次时间戳和这次时间戳相等,说明值未发生变化,不需要进行下面的操作
+                                if (timeStamp == time1) {
+                                    return;
+                                }
+                                if (!sqlCurrentYmdh.equals(currentYmdh)) {
+                                    //组装相应的原始数据
+                                    opcAsyncTask.packageRawDataList(itemList, dataSourceId, sqlCurrentYmdh);
+                                    redisUtil.set(ConstantStr.VALUE_BELONG_TIME + id, currentYmdh, ConstantStr.TWO_HOUR);
+                                    sqlCurrentYmdh = currentYmdh;
+                                    timeStamp = time1;
+                                }
+                            }
+                            if (javaType.toLowerCase().equals("boolean")) {
+                                Boolean dmData = JSON.parseObject(value.toString(), Boolean.class);
+                                CursorRawData cursorRawData = new CursorRawData(id, dataSourceId, itemId, javaType, JSON.toJSONString(dmData), JSON.toJSONString(dmData),
+                                        currentYmdhmss, index, currentYmdh, ConstantStr.NOT_RECORD, new Date());
+                                opcAsyncTask.addCursorRawData(cursorRawData);
+                            } else {
+                                try {
+                                    BigDecimal bigDecimal = JSON.parseObject(value.toString(), BigDecimal.class);
+                                    BigDecimal dmData = Blank.isNotEmpty(dm) ?
+                                            MathUtil.quadricOperation(dm.getMathParameter(), dm.getOperationRule(), bigDecimal) :
+                                            bigDecimal;
+                                    CursorRawData cursorRawData = new CursorRawData(id, dataSourceId, itemId, javaType, JSON.toJSONString(bigDecimal), JSON.toJSONString(dmData),
+                                            currentYmdhmss, index, currentYmdh,
+                                            Blank.isEmpty(oldValue) ? ConstantStr.IS_RECORD : MathUtil.isMeetChange(oldValue,
+                                                    dmData, new BigDecimal(modeValue), readModeType), new Date());
+                                    opcAsyncTask.addCursorRawData(cursorRawData);
+                                    oldValue = dmData;
+                                } catch (Exception e) {
+                                    String valueStr = value.toString();
+                                    CursorRawData cursorRawData = new CursorRawData(id, dataSourceId, itemId, javaType, valueStr, "".equals(valueStr) ? ConstantStr.STRING_EMPTY : valueStr,
+                                            currentYmdhmss, index, currentYmdh, ConstantStr.NOT_RECORD, new Date());
+                                    opcAsyncTask.addCursorRawData(cursorRawData);
+                                }
+                            }
+                        } catch (Exception e) {
+                            //执行组装数据库的原始数据
+                            opcAsyncTask.packageRawDataList(itemList, dataSourceId, sqlCurrentYmdh);
+                            redisUtil.del(ConstantStr.VALUE_BELONG_TIME + id);
+
+                            messageNoticeDao.addMessageNotice(new MessageNotice(itemGroup.getUserId(),
+                                    itemGroup.getGroupName() + DateUtil.dateChangeStrYmdhms(new Date()) + "运行停止",
+                                    e.getMessage(),
+                                    ConstantStr.NO_READ));
+                            if (Blank.isNotEmpty(server)) {
+                                server.dispose();
+                            }
+                            itemGroupDao.stopItemGroupById(id, ConstantStr.EXCEPT_STOP_UP);
+                            cronTaskRegister.removeCronTask(cronId);
+                            timer.cancel();
+                        }
+                    }
+                });
+            }
+            access.bind();
+            while (true) {
+                Boolean flage = (Boolean) redisUtil.get(ConstantStr.ITEM_GROUP + id);
+                if (Blank.isEmpty(flage)) {
+                    flage = false;
+                }
+                if (!access.isActive()) {
+                    //执行组装数据库的数据,以及生成驱动报表
+                    String sqlCurrentYmdh = String.valueOf(redisUtil.get(ConstantStr.VALUE_BELONG_TIME + id));
+                    opcAsyncTask.packageRawDataList(itemList, dataSourceId, sqlCurrentYmdh);
+                    redisUtil.del(ConstantStr.VALUE_BELONG_TIME + id);
+                    messageNoticeDao.addMessageNotice(new MessageNotice(itemGroup.getUserId(),
+                            itemGroup.getGroupName() + DateUtil.dateChangeStrYmdhms(new Date()) + "运行失败",
+                            "服务异常停止了",
+                            ConstantStr.NO_READ));
+                    if (Blank.isNotEmpty(access)) {
+                        access.clear();
+                    }
+                    if (Blank.isNotEmpty(server)) {
+                        server.dispose();
+                    }
+                    itemGroupDao.stopItemGroupById(id, ConstantStr.EXCEPT_STOP_UP);
+                    cronTaskRegister.removeCronTask(cronId);
+                    redisUtil.del(ConstantStr.ITEM_GROUP + id);
+                    timer.cancel();
+                    break;
+                }
+                if (!flage) {
+                    //执行组装数据库的数据,以及生成驱动报表
+                    String sqlCurrentYmdh = String.valueOf(redisUtil.get(ConstantStr.VALUE_BELONG_TIME + id));
+                    opcAsyncTask.packageRawDataList(itemList, dataSourceId, sqlCurrentYmdh);
+                    redisUtil.del(ConstantStr.VALUE_BELONG_TIME + id);
+                    if (Blank.isNotEmpty(access)) {
+                        access.clear();
+                    }
+                    if (Blank.isNotEmpty(server)) {
+                        server.dispose();
+                    }
+                    cronTaskRegister.removeCronTask(cronId);
+                    redisUtil.del(ConstantStr.ITEM_GROUP + id);
+                    timer.cancel();
+                    break;
+                }
+                if (System.currentTimeMillis() >= endTime) {
+                    //执行组装数据库的数据,以及生成驱动报表
+                    String sqlCurrentYmdh = String.valueOf(redisUtil.get(ConstantStr.VALUE_BELONG_TIME + id));
+                    opcAsyncTask.packageRawDataList(itemList, dataSourceId, sqlCurrentYmdh);
+                    redisUtil.del(ConstantStr.VALUE_BELONG_TIME + id);
+
+                    if (Blank.isNotEmpty(access)) {
+                        access.clear();
+                    }
+                    if (Blank.isNotEmpty(server)) {
+                        server.dispose();
+                    }
+                    timer.cancel();
+                    break;
+                }
+            }
+        } catch (Exception e) {
+            Boolean flage = (Boolean) redisUtil.get(ConstantStr.ITEM_GROUP + id);
+            if (Blank.isEmpty(flage)) {
+                flage = false;
+            }
+            if (!flage) {
+                //执行组装数据库的数据,以及生成驱动报表
+                String sqlCurrentYmdh = String.valueOf(redisUtil.get(ConstantStr.VALUE_BELONG_TIME + id));
+                opcAsyncTask.packageRawDataList(itemList, dataSourceId, sqlCurrentYmdh);
+                redisUtil.del(ConstantStr.VALUE_BELONG_TIME + id);
+                if (Blank.isNotEmpty(access)) {
+                    access.clear();
+                }
+                if (Blank.isNotEmpty(server)) {
+                    server.dispose();
+                }
+                itemGroupDao.stopItemGroupById(itemGroup.getId(), ConstantStr.EXCEPT_STOP_UP);
+                cronTaskRegister.removeCronTask(cronId);
+                timer.cancel();
+            }
+        }
+    }
+}

+ 8 - 19
chaunyi_opc/opc_da/src/main/java/com/example/opc_da/task/OpcDaExceedTask.java

@@ -92,20 +92,9 @@ public class OpcDaExceedTask extends TimerTask {
                             ConstantStr.NO_READ));
                     Timer listenerTimer = new Timer();
                     listenerTimer.schedule(new TimerTask() {
-                        private int sum = 0;
-
                         @Override
                         public void run() {
                             try {
-                                sum++;
-                                if (sum > 3) {
-                                    redisUtil.set(ConstantStr.ITEM_GROUP + id, false);
-                                    messageNoticeDao.addMessageNotice(new MessageNotice(itemGroup.getUserId(),
-                                            itemGroup.getGroupName() + DateUtil.dateChangeStrYmdhms(new Date()) + "服务重新连接失败",
-                                            "3次服务重新连接失败",
-                                            ConstantStr.NO_READ));
-                                    listenerTimer.cancel();
-                                }
                                 server.connect();
                                 if (Blank.isNotEmpty(server.getServerState()) && server.getServerState().getServerState() == OPCSERVERSTATE.OPC_STATUS_RUNNING) {
                                     messageNoticeDao.addMessageNotice(new MessageNotice(itemGroup.getUserId(),
@@ -115,16 +104,16 @@ public class OpcDaExceedTask extends TimerTask {
                                     listenerTimer.cancel();
                                 }
                             } catch (Exception e) {
-                                redisUtil.set(ConstantStr.ITEM_GROUP + id, false);
-                                String message = OpcDaUtil.genException(e.getMessage());
-                                messageNoticeDao.addMessageNotice(new MessageNotice(itemGroup.getUserId(),
-                                        itemGroup.getGroupName() + DateUtil.dateChangeStrYmdhms(new Date()) + "服务重新连接失败",
-                                        message,
-                                        ConstantStr.NO_READ));
-                                listenerTimer.cancel();
+//                                redisUtil.set(ConstantStr.ITEM_GROUP + id, false);
+//                                String message = OpcDaUtil.genException(e.getMessage());
+//                                messageNoticeDao.addMessageNotice(new MessageNotice(itemGroup.getUserId(),
+//                                        itemGroup.getGroupName() + DateUtil.dateChangeStrYmdhms(new Date()) + "服务重新连接失败",
+//                                        message,
+//                                        ConstantStr.NO_READ));
+//                                listenerTimer.cancel();
                             }
                         }
-                    }, 10000);
+                    }, 60000);
                 }
             });
         }catch (Exception e){

+ 84 - 68
chaunyi_opc/opc_da/src/main/java/com/example/opc_da/task/OpcDaFrequencyTask.java

@@ -12,7 +12,6 @@ import com.example.opc_da.dynamicSchedule.CronTaskRegister;
 import com.example.opc_da.util.OpcDaUtil;
 import com.example.opc_da.util.RedisUtil;
 import lombok.extern.slf4j.Slf4j;
-import org.jinterop.dcom.common.JIException;
 import org.openscada.opc.dcom.da.OPCSERVERSTATE;
 import org.openscada.opc.lib.da.Item;
 import org.openscada.opc.lib.da.*;
@@ -82,7 +81,7 @@ public class OpcDaFrequencyTask extends TimerTask {
         Server server = OpcDaUtil.createServer(dataSource);
         Integer id = itemGroup.getId();
         Integer dataSourceId = dataSource.getId();
-        Timer chTimer = new Timer();
+        SyncAccess access = null;
         try {
             server.connect();
             server.addStateListener(connected -> {
@@ -93,20 +92,9 @@ public class OpcDaFrequencyTask extends TimerTask {
                             ConstantStr.NO_READ));
                     Timer listenerTimer = new Timer();
                     listenerTimer.schedule(new TimerTask() {
-                        private int sum = 0;
-
                         @Override
                         public void run() {
                             try {
-                                sum++;
-                                if (sum > 3) {
-                                    redisUtil.set(ConstantStr.ITEM_GROUP + id, false);
-                                    messageNoticeDao.addMessageNotice(new MessageNotice(itemGroup.getUserId(),
-                                            itemGroup.getGroupName() + DateUtil.dateChangeStrYmdhms(new Date()) + "服务重新连接失败",
-                                            "3次服务重新连接失败",
-                                            ConstantStr.NO_READ));
-                                    listenerTimer.cancel();
-                                }
                                 server.connect();
                                 if (Blank.isNotEmpty(server.getServerState()) && server.getServerState().getServerState() == OPCSERVERSTATE.OPC_STATUS_RUNNING) {
                                     messageNoticeDao.addMessageNotice(new MessageNotice(itemGroup.getUserId(),
@@ -116,16 +104,16 @@ public class OpcDaFrequencyTask extends TimerTask {
                                     listenerTimer.cancel();
                                 }
                             } catch (Exception e) {
-                                redisUtil.set(ConstantStr.ITEM_GROUP + id, false);
-                                String message = OpcDaUtil.genException(e.getMessage());
-                                messageNoticeDao.addMessageNotice(new MessageNotice(itemGroup.getUserId(),
-                                        itemGroup.getGroupName() + DateUtil.dateChangeStrYmdhms(new Date()) + "服务重新连接失败",
-                                        message,
-                                        ConstantStr.NO_READ));
-                                listenerTimer.cancel();
+//                                redisUtil.set(ConstantStr.ITEM_GROUP + id, false);
+//                                String message = OpcDaUtil.genException(e.getMessage());
+//                                messageNoticeDao.addMessageNotice(new MessageNotice(itemGroup.getUserId(),
+//                                        itemGroup.getGroupName() + DateUtil.dateChangeStrYmdhms(new Date()) + "服务重新连接失败",
+//                                        message,
+//                                        ConstantStr.NO_READ));
+//                                listenerTimer.cancel();
                             }
                         }
-                    }, 10000);
+                    }, 60000);
                 }
             });
         } catch (Exception e) {
@@ -139,37 +127,21 @@ public class OpcDaFrequencyTask extends TimerTask {
             }
             itemGroupDao.stopItemGroupById(itemGroup.getId(), ConstantStr.EXCEPT_STOP_UP);
             cronTaskRegister.removeCronTask(cronId);
-            chTimer.cancel();
             timer.cancel();
         }
         try {
-            Group group = server.addGroup();
-            List<String> itemStrList = new ArrayList<>();
+            access = new SyncAccess(server, (int) (Math.round(itemGroup.getModeValue() * 1000)));
+//            AccessBase access = new Async20Access(server, (int) (itemGroup.getModeValue() * 1000), true);
             for (com.example.opc_common.entity.Item item : itemList) {
-                itemStrList.add(item.getItemReadName());
-            }
-            String[] items = itemStrList.toArray(new String[]{});
-            Map<String, Item> itemResult = group.addItems(items);
-            Set itemSet = new HashSet(itemResult.values());
-            org.openscada.opc.lib.da.Item[] itemArr = new org.openscada.opc.lib.da.Item[itemSet.size()];
-            itemSet.toArray(itemArr);
+                String itemId = item.getItemReadName();
+                access.addItem(itemId, new DataCallback() {
 
-            chTimer.schedule(new TimerTask() {
-                private String sqlCurrentYmdh = "";
-                private Long index = 0L;
+                    private String sqlCurrentYmdh = "";
+                    private Long index = 0L;
 
-                @Override
-                public void run() {
-                    index++;
-                    Map<Item, ItemState> resultMap = null;
-                    try {
-                        resultMap = group.read(true, itemArr);
-                    } catch (JIException e) {
-                        throw new RuntimeException(e);
-                    }
-                    for (Item item : resultMap.keySet()) {
-                        String itemId = item.getId();
-                        ItemState itemState = resultMap.get(item);
+                    @Override
+                    public void changed(Item item1, ItemState itemState) {
+                        index++;
                         try {
                             Map<String, Object> val = OpcDaUtil.getVal(itemState.getValue());
                             DataModel dm = map.get(itemId);
@@ -191,29 +163,47 @@ public class OpcDaFrequencyTask extends TimerTask {
                                     sqlCurrentYmdh = currentYmdh;
                                 }
                             }
+                            String valueStr = value.toString();
                             if (javaType.toLowerCase().equals("boolean")) {
                                 //存数据
-                                Boolean dmData = JSON.parseObject(value.toString(), Boolean.class);
-                                CursorRawData cursorRawData = new CursorRawData(id, dataSourceId, itemId, javaType, JSON.toJSONString(dmData), JSON.toJSONString(dmData),
-                                        currentYmdhmss, index, currentYmdh, ConstantStr.IS_RECORD, new Date());
-                                opcAsyncTask.addCursorRawData(cursorRawData);
-                            } else {
-                                try {
-                                    BigDecimal bigDecimal = JSON.parseObject(value.toString(), BigDecimal.class);
-                                    BigDecimal dmData = Blank.isNotEmpty(dm) ?
-                                            MathUtil.quadricOperation(dm.getMathParameter(), dm.getOperationRule(), bigDecimal) :
-                                            bigDecimal;
-                                    CursorRawData cursorRawData = new CursorRawData(id, dataSourceId, itemId, javaType, JSON.toJSONString(bigDecimal), JSON.toJSONString(dmData),
+                                Boolean data = JSON.parseObject(valueStr, Boolean.class);
+                                if (Blank.isNotEmpty(dm) && dm.getModelType().equals(ConstantStr.VALUE_REPLACE) && dm.getOperationRule().equals(valueStr)) {
+                                    CursorRawData cursorRawData = new CursorRawData(id, dataSourceId, itemId, javaType, JSON.toJSONString(data), dm.getReplacingValue(),
                                             currentYmdhmss, index, currentYmdh, ConstantStr.IS_RECORD, new Date());
                                     opcAsyncTask.addCursorRawData(cursorRawData);
-                                } catch (Exception e) {
-                                    String valueStr = value.toString();
-                                    CursorRawData cursorRawData = new CursorRawData(id, dataSourceId, itemId, javaType, valueStr, "".equals(valueStr) ? ConstantStr.STRING_EMPTY : valueStr,
+                                } else {
+                                    CursorRawData cursorRawData = new CursorRawData(id, dataSourceId, itemId, javaType, JSON.toJSONString(data), JSON.toJSONString(data),
                                             currentYmdhmss, index, currentYmdh, ConstantStr.IS_RECORD, new Date());
                                     opcAsyncTask.addCursorRawData(cursorRawData);
                                 }
+                            } else {
+                                try {
+                                    BigDecimal bigDecimal = JSON.parseObject(valueStr, BigDecimal.class);
+                                    if (Blank.isNotEmpty(dm) && dm.getModelType().equals(ConstantStr.VALUE_REPLACE) && dm.getOperationRule().equals(valueStr)) {
+                                        CursorRawData cursorRawData = new CursorRawData(id, dataSourceId, itemId, javaType, JSON.toJSONString(bigDecimal), dm.getReplacingValue(),
+                                                currentYmdhmss, index, currentYmdh, ConstantStr.IS_RECORD, new Date());
+                                        opcAsyncTask.addCursorRawData(cursorRawData);
+                                    } else {
+                                        BigDecimal dmData = Blank.isNotEmpty(dm) ?
+                                                MathUtil.quadricOperation(dm.getMathParameter(), dm.getOperationRule(), bigDecimal) :
+                                                bigDecimal;
+                                        CursorRawData cursorRawData = new CursorRawData(id, dataSourceId, itemId, javaType, JSON.toJSONString(bigDecimal), JSON.toJSONString(dmData),
+                                                currentYmdhmss, index, currentYmdh, ConstantStr.IS_RECORD, new Date());
+                                        opcAsyncTask.addCursorRawData(cursorRawData);
+                                    }
+                                } catch (Exception e) {
+                                    if (Blank.isNotEmpty(dm) && dm.getModelType().equals(ConstantStr.VALUE_REPLACE) && dm.getOperationRule().equals(valueStr)) {
+                                        CursorRawData cursorRawData = new CursorRawData(id, dataSourceId, itemId, javaType, valueStr, dm.getReplacingValue(),
+                                                currentYmdhmss, index, currentYmdh, ConstantStr.IS_RECORD, new Date());
+                                        opcAsyncTask.addCursorRawData(cursorRawData);
+                                    } else {
+                                        CursorRawData cursorRawData = new CursorRawData(id, dataSourceId, itemId, javaType, valueStr, "".equals(valueStr) ? ConstantStr.STRING_EMPTY : valueStr,
+                                                currentYmdhmss, index, currentYmdh, ConstantStr.IS_RECORD, new Date());
+                                        opcAsyncTask.addCursorRawData(cursorRawData);
+                                    }
+                                }
                             }
-                        } catch (JIException e) {
+                        } catch (Exception e) {
                             //执行组装数据库的数据,以及生成驱动报表
                             opcAsyncTask.packageRawDataList(itemList, dataSourceId, sqlCurrentYmdh);
                             redisUtil.del(ConstantStr.VALUE_BELONG_TIME + id);
@@ -228,29 +218,51 @@ public class OpcDaFrequencyTask extends TimerTask {
                             itemGroupDao.stopItemGroupById(id, ConstantStr.EXCEPT_STOP_UP);
                             cronTaskRegister.removeCronTask(cronId);
                             redisUtil.del(ConstantStr.ITEM_GROUP + id);
-                            chTimer.cancel();
                             timer.cancel();
                         }
                     }
-                }
-            }, new Date(), (int) (Math.round(itemGroup.getModeValue() * 1000)));
-
+                });
+            }
+            access.bind();
             while (true) {
                 Boolean flage = (Boolean) redisUtil.get(ConstantStr.ITEM_GROUP + id);
                 if (Blank.isEmpty(flage)) {
                     flage = false;
                 }
+                if (!access.isActive()) {
+                    //执行组装数据库的数据,以及生成驱动报表
+                    String sqlCurrentYmdh = String.valueOf(redisUtil.get(ConstantStr.VALUE_BELONG_TIME + id));
+                    opcAsyncTask.packageRawDataList(itemList, dataSourceId, sqlCurrentYmdh);
+                    redisUtil.del(ConstantStr.VALUE_BELONG_TIME + id);
+                    messageNoticeDao.addMessageNotice(new MessageNotice(itemGroup.getUserId(),
+                            itemGroup.getGroupName() + DateUtil.dateChangeStrYmdhms(new Date()) + "运行失败",
+                            "服务异常停止了",
+                            ConstantStr.NO_READ));
+                    if (Blank.isNotEmpty(access)) {
+                        access.clear();
+                    }
+                    if (Blank.isNotEmpty(server)) {
+                        server.dispose();
+                    }
+                    itemGroupDao.stopItemGroupById(id, ConstantStr.EXCEPT_STOP_UP);
+                    cronTaskRegister.removeCronTask(cronId);
+                    redisUtil.del(ConstantStr.ITEM_GROUP + id);
+                    timer.cancel();
+                    break;
+                }
                 if (!flage) {
                     //执行组装数据库的数据,以及生成驱动报表
                     String sqlCurrentYmdh = String.valueOf(redisUtil.get(ConstantStr.VALUE_BELONG_TIME + id));
                     opcAsyncTask.packageRawDataList(itemList, dataSourceId, sqlCurrentYmdh);
                     redisUtil.del(ConstantStr.VALUE_BELONG_TIME + id);
+                    if (Blank.isNotEmpty(access)) {
+                        access.clear();
+                    }
                     if (Blank.isNotEmpty(server)) {
                         server.dispose();
                     }
                     cronTaskRegister.removeCronTask(cronId);
                     redisUtil.del(ConstantStr.ITEM_GROUP + id);
-                    chTimer.cancel();
                     timer.cancel();
                     break;
                 }
@@ -260,10 +272,12 @@ public class OpcDaFrequencyTask extends TimerTask {
                     opcAsyncTask.packageRawDataList(itemList, dataSourceId, sqlCurrentYmdh);
                     redisUtil.del(ConstantStr.VALUE_BELONG_TIME + id);
 
+                    if (Blank.isNotEmpty(access)) {
+                        access.clear();
+                    }
                     if (Blank.isNotEmpty(server)) {
                         server.dispose();
                     }
-                    chTimer.cancel();
                     timer.cancel();
                     break;
                 }
@@ -278,12 +292,14 @@ public class OpcDaFrequencyTask extends TimerTask {
                 String sqlCurrentYmdh = String.valueOf(redisUtil.get(ConstantStr.VALUE_BELONG_TIME + id));
                 opcAsyncTask.packageRawDataList(itemList, dataSourceId, sqlCurrentYmdh);
                 redisUtil.del(ConstantStr.VALUE_BELONG_TIME + id);
+                if (Blank.isNotEmpty(access)) {
+                    access.clear();
+                }
                 if (Blank.isNotEmpty(server)) {
                     server.dispose();
                 }
                 itemGroupDao.stopItemGroupById(itemGroup.getId(), ConstantStr.EXCEPT_STOP_UP);
                 cronTaskRegister.removeCronTask(cronId);
-                chTimer.cancel();
                 timer.cancel();
             }
         }

+ 293 - 0
chaunyi_opc/opc_da/src/main/java/com/example/opc_da/task/OpcDaFrequencyTask1.java

@@ -0,0 +1,293 @@
+package com.example.opc_da.task;
+
+import com.alibaba.fastjson.JSON;
+import com.example.opc_common.entity.*;
+import com.example.opc_common.util.Blank;
+import com.example.opc_common.util.ConstantStr;
+import com.example.opc_common.util.DateUtil;
+import com.example.opc_common.util.MathUtil;
+import com.example.opc_da.dao.ItemGroupDao;
+import com.example.opc_da.dao.MessageNoticeDao;
+import com.example.opc_da.dynamicSchedule.CronTaskRegister;
+import com.example.opc_da.util.OpcDaUtil;
+import com.example.opc_da.util.RedisUtil;
+import lombok.extern.slf4j.Slf4j;
+import org.jinterop.dcom.common.JIException;
+import org.openscada.opc.dcom.da.OPCSERVERSTATE;
+import org.openscada.opc.lib.da.Group;
+import org.openscada.opc.lib.da.Item;
+import org.openscada.opc.lib.da.ItemState;
+import org.openscada.opc.lib.da.Server;
+
+import java.math.BigDecimal;
+import java.util.*;
+
+@Slf4j
+public class OpcDaFrequencyTask1 extends TimerTask {
+
+    private final RedisUtil redisUtil;
+
+    private final OpcAsyncTask opcAsyncTask;
+
+    private final String cronId;
+
+    private final CronTaskRegister cronTaskRegister;
+
+    private final ItemGroupDao itemGroupDao;
+
+    private final MessageNoticeDao messageNoticeDao;
+
+    private final Timer timer;
+
+    private final ItemGroup itemGroup;
+
+    private final DataSource dataSource;
+
+    private final Map<String, DataModel> map;
+
+    private final List<com.example.opc_common.entity.Item> itemList;
+
+    private final String timeFormat;
+
+    private final Long endTime;
+
+    public OpcDaFrequencyTask1(RedisUtil redisUtil,
+                               OpcAsyncTask opcAsyncTask,
+                               String cronId,
+                               CronTaskRegister cronTaskRegister,
+                               ItemGroupDao itemGroupDao,
+                               MessageNoticeDao messageNoticeDao,
+                               Timer timer,
+                               ItemGroup itemGroup,
+                               DataSource dataSource,
+                               Map<String, DataModel> map,
+                               List<com.example.opc_common.entity.Item> itemList,
+                               String timeFormat,
+                               Long endTime) {
+        this.redisUtil = redisUtil;
+        this.opcAsyncTask = opcAsyncTask;
+        this.cronId = cronId;
+        this.cronTaskRegister = cronTaskRegister;
+        this.itemGroupDao = itemGroupDao;
+        this.messageNoticeDao = messageNoticeDao;
+        this.timer = timer;
+        this.itemGroup = itemGroup;
+        this.dataSource = dataSource;
+        this.map = map;
+        this.itemList = itemList;
+        this.timeFormat = timeFormat;
+        this.endTime = endTime;
+    }
+
+    @Override
+    public void run() {
+        Server server = OpcDaUtil.createServer(dataSource);
+        Integer id = itemGroup.getId();
+        Integer dataSourceId = dataSource.getId();
+        Timer chTimer = new Timer();
+        try {
+            server.connect();
+            server.addStateListener(connected -> {
+                if (!connected) {
+                    messageNoticeDao.addMessageNotice(new MessageNotice(itemGroup.getUserId(),
+                            itemGroup.getGroupName() + DateUtil.dateChangeStrYmdhms(new Date()) + "服务断开",
+                            "服务断开,马山进行重新连接",
+                            ConstantStr.NO_READ));
+                    Timer listenerTimer = new Timer();
+                    listenerTimer.schedule(new TimerTask() {
+                        private int sum = 0;
+
+                        @Override
+                        public void run() {
+                            try {
+                                sum++;
+                                if (sum > 3) {
+                                    redisUtil.set(ConstantStr.ITEM_GROUP + id, false);
+                                    messageNoticeDao.addMessageNotice(new MessageNotice(itemGroup.getUserId(),
+                                            itemGroup.getGroupName() + DateUtil.dateChangeStrYmdhms(new Date()) + "服务重新连接失败",
+                                            "3次服务重新连接失败",
+                                            ConstantStr.NO_READ));
+                                    listenerTimer.cancel();
+                                }
+                                server.connect();
+                                if (Blank.isNotEmpty(server.getServerState()) && server.getServerState().getServerState() == OPCSERVERSTATE.OPC_STATUS_RUNNING) {
+                                    messageNoticeDao.addMessageNotice(new MessageNotice(itemGroup.getUserId(),
+                                            itemGroup.getGroupName() + DateUtil.dateChangeStrYmdhms(new Date()) + "服务重新连接成功",
+                                            "服务重新连接成功",
+                                            ConstantStr.NO_READ));
+                                    listenerTimer.cancel();
+                                }
+                            } catch (Exception e) {
+                                redisUtil.set(ConstantStr.ITEM_GROUP + id, false);
+                                String message = OpcDaUtil.genException(e.getMessage());
+                                messageNoticeDao.addMessageNotice(new MessageNotice(itemGroup.getUserId(),
+                                        itemGroup.getGroupName() + DateUtil.dateChangeStrYmdhms(new Date()) + "服务重新连接失败",
+                                        message,
+                                        ConstantStr.NO_READ));
+                                listenerTimer.cancel();
+                            }
+                        }
+                    }, 10000);
+                }
+            });
+        } catch (Exception e) {
+            String message = OpcDaUtil.genException(e.getMessage());
+            messageNoticeDao.addMessageNotice(new MessageNotice(itemGroup.getUserId(),
+                    itemGroup.getGroupName() + DateUtil.dateChangeStrYmdhms(new Date()) + "运行失败",
+                    message,
+                    ConstantStr.NO_READ));
+            if (Blank.isNotEmpty(server)) {
+                server.dispose();
+            }
+            itemGroupDao.stopItemGroupById(itemGroup.getId(), ConstantStr.EXCEPT_STOP_UP);
+            cronTaskRegister.removeCronTask(cronId);
+            chTimer.cancel();
+            timer.cancel();
+        }
+        try {
+            Group group = server.addGroup();
+            List<String> itemStrList = new ArrayList<>();
+            for (com.example.opc_common.entity.Item item : itemList) {
+                itemStrList.add(item.getItemReadName());
+            }
+            String[] items = itemStrList.toArray(new String[]{});
+            Map<String, Item> itemResult = group.addItems(items);
+            Set itemSet = new HashSet(itemResult.values());
+            Item[] itemArr = new Item[itemSet.size()];
+            itemSet.toArray(itemArr);
+
+            chTimer.schedule(new TimerTask() {
+                private String sqlCurrentYmdh = "";
+                private Long index = 0L;
+
+                @Override
+                public void run() {
+                    index++;
+                    Map<Item, ItemState> resultMap = null;
+                    try {
+                        resultMap = group.read(true, itemArr);
+                    } catch (JIException e) {
+                        throw new RuntimeException(e);
+                    }
+                    for (Item item : resultMap.keySet()) {
+                        String itemId = item.getId();
+                        ItemState itemState = resultMap.get(item);
+                        try {
+                            Map<String, Object> val = OpcDaUtil.getVal(itemState.getValue());
+                            DataModel dm = map.get(itemId);
+                            //读取的值
+                            String javaType = String.valueOf(val.get("javaType"));
+                            Object value = val.get("value");
+                            //值对应取值的时间
+                            Date time = itemState.getTimestamp().getTime();
+                            String currentYmdh = DateUtil.dateChangeStr(time, timeFormat);
+                            String currentYmdhmss = DateUtil.dateChangeStrYmdhmss(time);
+                            if (Blank.isEmpty(sqlCurrentYmdh)) {
+                                sqlCurrentYmdh = currentYmdh;
+                                redisUtil.set(ConstantStr.VALUE_BELONG_TIME + id, currentYmdh, ConstantStr.TWO_HOUR);
+                            } else {
+                                if (!sqlCurrentYmdh.equals(currentYmdh)) {
+                                    //组装相应的原始数据
+                                    opcAsyncTask.packageRawDataList(itemList, dataSourceId, sqlCurrentYmdh);
+                                    redisUtil.set(ConstantStr.VALUE_BELONG_TIME + id, currentYmdh, ConstantStr.TWO_HOUR);
+                                    sqlCurrentYmdh = currentYmdh;
+                                }
+                            }
+                            if (javaType.toLowerCase().equals("boolean")) {
+                                //存数据
+                                Boolean dmData = JSON.parseObject(value.toString(), Boolean.class);
+                                CursorRawData cursorRawData = new CursorRawData(id, dataSourceId, itemId, javaType, JSON.toJSONString(dmData), JSON.toJSONString(dmData),
+                                        currentYmdhmss, index, currentYmdh, ConstantStr.IS_RECORD, new Date());
+                                opcAsyncTask.addCursorRawData(cursorRawData);
+                            } else {
+                                try {
+                                    BigDecimal bigDecimal = JSON.parseObject(value.toString(), BigDecimal.class);
+                                    BigDecimal dmData = Blank.isNotEmpty(dm) ?
+                                            MathUtil.quadricOperation(dm.getMathParameter(), dm.getOperationRule(), bigDecimal) :
+                                            bigDecimal;
+                                    CursorRawData cursorRawData = new CursorRawData(id, dataSourceId, itemId, javaType, JSON.toJSONString(bigDecimal), JSON.toJSONString(dmData),
+                                            currentYmdhmss, index, currentYmdh, ConstantStr.IS_RECORD, new Date());
+                                    opcAsyncTask.addCursorRawData(cursorRawData);
+                                } catch (Exception e) {
+                                    String valueStr = value.toString();
+                                    CursorRawData cursorRawData = new CursorRawData(id, dataSourceId, itemId, javaType, valueStr, "".equals(valueStr) ? ConstantStr.STRING_EMPTY : valueStr,
+                                            currentYmdhmss, index, currentYmdh, ConstantStr.IS_RECORD, new Date());
+                                    opcAsyncTask.addCursorRawData(cursorRawData);
+                                }
+                            }
+                        } catch (JIException e) {
+                            //执行组装数据库的数据,以及生成驱动报表
+                            opcAsyncTask.packageRawDataList(itemList, dataSourceId, sqlCurrentYmdh);
+                            redisUtil.del(ConstantStr.VALUE_BELONG_TIME + id);
+
+                            messageNoticeDao.addMessageNotice(new MessageNotice(itemGroup.getUserId(),
+                                    itemGroup.getGroupName() + DateUtil.dateChangeStrYmdhms(new Date()) + "运行失败",
+                                    e.getMessage(),
+                                    ConstantStr.NO_READ));
+                            if (Blank.isNotEmpty(server)) {
+                                server.dispose();
+                            }
+                            itemGroupDao.stopItemGroupById(id, ConstantStr.EXCEPT_STOP_UP);
+                            cronTaskRegister.removeCronTask(cronId);
+                            redisUtil.del(ConstantStr.ITEM_GROUP + id);
+                            chTimer.cancel();
+                            timer.cancel();
+                        }
+                    }
+                }
+            }, new Date(), (int) (Math.round(itemGroup.getModeValue() * 1000)));
+
+            while (true) {
+                Boolean flage = (Boolean) redisUtil.get(ConstantStr.ITEM_GROUP + id);
+                if (Blank.isEmpty(flage)) {
+                    flage = false;
+                }
+                if (!flage) {
+                    //执行组装数据库的数据,以及生成驱动报表
+                    String sqlCurrentYmdh = String.valueOf(redisUtil.get(ConstantStr.VALUE_BELONG_TIME + id));
+                    opcAsyncTask.packageRawDataList(itemList, dataSourceId, sqlCurrentYmdh);
+                    redisUtil.del(ConstantStr.VALUE_BELONG_TIME + id);
+                    if (Blank.isNotEmpty(server)) {
+                        server.dispose();
+                    }
+                    cronTaskRegister.removeCronTask(cronId);
+                    redisUtil.del(ConstantStr.ITEM_GROUP + id);
+                    chTimer.cancel();
+                    timer.cancel();
+                    break;
+                }
+                if (System.currentTimeMillis() >= endTime) {
+                    //执行组装数据库的数据,以及生成驱动报表
+                    String sqlCurrentYmdh = String.valueOf(redisUtil.get(ConstantStr.VALUE_BELONG_TIME + id));
+                    opcAsyncTask.packageRawDataList(itemList, dataSourceId, sqlCurrentYmdh);
+                    redisUtil.del(ConstantStr.VALUE_BELONG_TIME + id);
+
+                    if (Blank.isNotEmpty(server)) {
+                        server.dispose();
+                    }
+                    chTimer.cancel();
+                    timer.cancel();
+                    break;
+                }
+            }
+        } catch (Exception e) {
+            Boolean flage = (Boolean) redisUtil.get(ConstantStr.ITEM_GROUP + id);
+            if (Blank.isEmpty(flage)) {
+                flage = false;
+            }
+            if (!flage) {
+                //执行组装数据库的数据,以及生成驱动报表
+                String sqlCurrentYmdh = String.valueOf(redisUtil.get(ConstantStr.VALUE_BELONG_TIME + id));
+                opcAsyncTask.packageRawDataList(itemList, dataSourceId, sqlCurrentYmdh);
+                redisUtil.del(ConstantStr.VALUE_BELONG_TIME + id);
+                if (Blank.isNotEmpty(server)) {
+                    server.dispose();
+                }
+                itemGroupDao.stopItemGroupById(itemGroup.getId(), ConstantStr.EXCEPT_STOP_UP);
+                cronTaskRegister.removeCronTask(cronId);
+                chTimer.cancel();
+                timer.cancel();
+            }
+        }
+    }
+}