Bläddra i källkod

完成opcua,通过频率读取数据组,变化率读取数据组,阈值读取数据组

zhoupeng 2 år sedan
förälder
incheckning
b31b1a37a3
21 ändrade filer med 814 tillägg och 864 borttagningar
  1. 0 5
      chaunyi_opc/opc_common/src/main/java/com/example/opc_common/entity/ItemGroup.java
  2. 4 15
      chaunyi_opc/opc_da/src/main/java/com/example/opc_da/service/impl/ItemGroupServiceImpl.java
  3. 10 7
      chaunyi_opc/opc_da/src/main/resources/mapper/ItemGroupDao.xml
  4. 1 91
      chaunyi_opc/opc_ua/src/main/java/com/example/opc_ua/config/InitRunner.java
  5. 61 0
      chaunyi_opc/opc_ua/src/main/java/com/example/opc_ua/config/SpringContextUtils.java
  6. 3 1
      chaunyi_opc/opc_ua/src/main/java/com/example/opc_ua/dao/ItemGroupDao.java
  7. 8 1
      chaunyi_opc/opc_ua/src/main/java/com/example/opc_ua/dao/RawDataDao.java
  8. 77 0
      chaunyi_opc/opc_ua/src/main/java/com/example/opc_ua/dynamicSchedule/CronTaskRegister.java
  9. 81 0
      chaunyi_opc/opc_ua/src/main/java/com/example/opc_ua/dynamicSchedule/DynamicScheduleConfig.java
  10. 24 0
      chaunyi_opc/opc_ua/src/main/java/com/example/opc_ua/dynamicSchedule/ScheduleConfig.java
  11. 57 0
      chaunyi_opc/opc_ua/src/main/java/com/example/opc_ua/dynamicSchedule/SchedulingRunnable.java
  12. 21 10
      chaunyi_opc/opc_ua/src/main/java/com/example/opc_ua/service/impl/ItemGroupServiceImpl.java
  13. 89 99
      chaunyi_opc/opc_ua/src/main/java/com/example/opc_ua/task/OpcAsyncTask.java
  14. 53 51
      chaunyi_opc/opc_ua/src/main/java/com/example/opc_ua/task/OpcUaChangeTask.java
  15. 80 77
      chaunyi_opc/opc_ua/src/main/java/com/example/opc_ua/task/OpcUaExceedTask.java
  16. 56 107
      chaunyi_opc/opc_ua/src/main/java/com/example/opc_ua/task/OpcUaFrequencyTask.java
  17. 0 196
      chaunyi_opc/opc_ua/src/main/java/com/example/opc_ua/task/OpcUaLowerTask.java
  18. 128 111
      chaunyi_opc/opc_ua/src/main/java/com/example/opc_ua/task/OpcUaTask.java
  19. 0 91
      chaunyi_opc/opc_ua/src/main/java/com/example/opc_ua/task/OpcUaWeekTask.java
  20. 10 1
      chaunyi_opc/opc_ua/src/main/resources/mapper/ItemGroupDao.xml
  21. 51 1
      chaunyi_opc/opc_ua/src/main/resources/mapper/RawDataDao.xml

+ 0 - 5
chaunyi_opc/opc_common/src/main/java/com/example/opc_common/entity/ItemGroup.java

@@ -48,11 +48,6 @@ public class ItemGroup extends BaseSchedule implements Serializable {
      */
     private Double modeValue;
     /**
-     * 指定时间对应的模式值
-     */
-    private String modeValueTime;
-
-    /**
      * 读取周,0周天,1周一,2周二,...6周六
      */
     private String readWeek;

+ 4 - 15
chaunyi_opc/opc_da/src/main/java/com/example/opc_da/service/impl/ItemGroupServiceImpl.java

@@ -57,9 +57,6 @@ public class ItemGroupServiceImpl implements ItemGroupService {
     private DataModelDao dataModelDao;
 
     @Autowired
-    private MessageNoticeDao messageNoticeDao;
-
-    @Autowired
     private RestTemplate restTemplate;
 
     @Value("${opc_ua_server.address}")
@@ -70,8 +67,6 @@ public class ItemGroupServiceImpl implements ItemGroupService {
         String userId = userUtil.getCurrentUserId();
         itemGroup.setUserId(userId);
         Integer readMode = itemGroup.getReadMode();
-        String startReadTime = itemGroup.getStartReadTime();
-        String endReadTime = itemGroup.getEndReadTime();
         if (readMode != ConstantStr.ON_CHANGE && readMode != ConstantStr.EXCEED_SET_VALUE &&
                 readMode != ConstantStr.SPECIFY_TIME) {
             return Result.no(ResultEnum.REQUEST_WRONGPARAMS.getRespCode(), "目前未适配此种类型的读取模式");
@@ -88,17 +83,11 @@ public class ItemGroupServiceImpl implements ItemGroupService {
             if (Blank.isEmpty(itemGroup.getReadModeType())) {
                 return Result.no(ResultEnum.REQUEST_WRONGPARAMS.getRespCode(), "请选择一种比较类型");
             }
-            if (Blank.isEmpty(itemGroup.getModeValueTime())) {
-                return Result.no(ResultEnum.REQUEST_WRONGPARAMS.getRespCode(), "请输入模式值");
+            if (Blank.isEmpty(itemGroup.getEventMode())) {
+                return Result.no(ResultEnum.REQUEST_WRONGPARAMS.getRespCode(), "阈值条件");
             }
-        }
-        if (readMode == ConstantStr.SPECIFY_TIME) {
-            String modeValueTime = itemGroup.getModeValueTime();
-            Long startLong = DateUtil.strChangeDate(startReadTime, "HH:mm:ss").getTime();
-            Long endLong = DateUtil.strChangeDate(endReadTime, "HH:mm:ss").getTime();
-            Long modeLong = DateUtil.strChangeDate(modeValueTime, "HH:mm:ss").getTime();
-            if (!(modeLong > startLong && modeLong < endLong)) {
-                return Result.no(ResultEnum.REQUEST_WRONGPARAMS.getRespCode(), "指定时间,是介于开始,结束读取时间之间");
+            if (Blank.isEmpty(itemGroup.getModeValue())) {
+                return Result.no(ResultEnum.REQUEST_WRONGPARAMS.getRespCode(), "请输入模式值");
             }
         }
         DataSource dataSource = dataSourceDao.getDataSourceById(itemGroup.getDataSourceId());

+ 10 - 7
chaunyi_opc/opc_da/src/main/resources/mapper/ItemGroupDao.xml

@@ -4,7 +4,7 @@
 
     <sql id="itemGroup">
         id
-        , user_id, group_name, group_describe, data_source_id, read_mode, event_mode, read_mode_type, mode_value, mode_value_time,
+        , user_id, group_name, group_describe, data_source_id, read_mode, event_mode, read_mode_type, mode_value,
             read_week, run_state, cron_id, start_read_time, end_read_time , create_time
     </sql>
 
@@ -15,10 +15,12 @@
 
     <insert id="addItemGroup" parameterType="com.example.opc_common.entity.ItemGroup" useGeneratedKeys="true"
             keyProperty="id">
-        insert into t_item_group(user_id, group_name, group_describe, data_source_id, read_mode, event_mode, read_mode_type,
-                                 mode_value, mode_value_time, read_week, start_read_time, end_read_time, create_time)
-        values (#{userId}, #{groupName}, #{groupDescribe}, #{dataSourceId}, #{readMode}, #{eventMode}, #{readModeType}, #{modeValue},
-                #{modeValueTime}, #{readWeek}, #{startReadTime}, #{endReadTime}, now())
+        insert into t_item_group(user_id, group_name, group_describe, data_source_id, read_mode, event_mode,
+                                 read_mode_type,
+                                 mode_value, read_week, start_read_time, end_read_time, create_time)
+        values (#{userId}, #{groupName}, #{groupDescribe}, #{dataSourceId}, #{readMode}, #{eventMode}, #{readModeType},
+                #{modeValue},
+                #{readWeek}, #{startReadTime}, #{endReadTime}, now())
     </insert>
 
     <insert id="addItem">
@@ -40,7 +42,6 @@
             event_mode=#{eventMode},
             read_mode_type=#{readModeType},
             mode_value=#{modeValue},
-            mode_value_time=#{modeValueTime},
             read_week=#{readWeek},
             start_read_time=#{startReadTime},
             end_read_time=#{endReadTime}
@@ -119,7 +120,9 @@
         select
         <include refid="itemGroup"/>
         from t_item_group
-        where user_id = #{userId}
+        <if test="userId != null and userId != ''">
+            where user_id = #{userId}
+        </if>
     </select>
 
     <select id="getItemGroupByNameNoId" resultType="com.example.opc_common.entity.ItemGroup">

+ 1 - 91
chaunyi_opc/opc_ua/src/main/java/com/example/opc_ua/config/InitRunner.java

@@ -1,30 +1,17 @@
 package com.example.opc_ua.config;
 
-import com.example.opc_common.entity.*;
+import com.example.opc_common.entity.DataSource;
 import com.example.opc_common.enums.DataSourceTypeEnum;
-import com.example.opc_common.enums.OpcDaDriverEnum;
-import com.example.opc_common.enums.ResultEnum;
-import com.example.opc_common.exception.CustomException;
 import com.example.opc_common.util.Blank;
 import com.example.opc_common.util.ConstantStr;
-import com.example.opc_common.util.DateUtil;
-import com.example.opc_ua.dao.DataModelDao;
 import com.example.opc_ua.dao.DataSourceDao;
-import com.example.opc_ua.dao.ItemGroupDao;
-import com.example.opc_ua.dao.MessageNoticeDao;
 import com.example.opc_ua.task.OpcAsyncTask;
-import com.example.opc_ua.task.OpcUaTask;
-import com.example.opc_ua.util.RedisUtil;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Value;
 import org.springframework.context.annotation.Configuration;
 
 import javax.annotation.PostConstruct;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
-import java.util.Timer;
 
 @Slf4j
 @Configuration
@@ -36,22 +23,6 @@ public class InitRunner {
     @Autowired
     private OpcAsyncTask opcAsyncTask;
 
-    @Autowired
-    private ItemGroupDao itemGroupDao;
-
-    @Autowired
-    private DataModelDao dataModelDao;
-
-    @Autowired
-    private MessageNoticeDao messageNoticeDao;
-
-    @Autowired
-    private RedisUtil redisUtil;
-
-    @Value("${opc_storage.time_format}")
-    private String timeFormat;
-
-
     @PostConstruct
     public void initData() {
         //将ua和da的树,加载到redis中去
@@ -73,66 +44,5 @@ public class InitRunner {
                 }
             }
         }
-
-        //将已经启动的服务重新启动
-        List<ItemGroup> itemGroupList = itemGroupDao.getAllItemGroup(null);
-        if (Blank.isNotEmpty(itemGroupList)) {
-            for (ItemGroup itemGroup : itemGroupList) {
-                DataSource dataSource = DataSource.convertPassword(dataSourceDao.getDataSourceById(itemGroup.getDataSourceId()));
-                DataSourceType dataSourceType = dataSourceDao.getDataSourceTypeById(dataSource.getTypeId());
-                if (dataSourceType.getDataSourceTypeKey().equals(DataSourceTypeEnum.OPC_UA_REAL.getValue()) ||
-                        dataSourceType.getDataSourceTypeKey().equals(DataSourceTypeEnum.OPC_UA_HISTORY.getValue())) {
-                    if (itemGroup.getRunState() == ConstantStr.START_UP) {
-                        if (Blank.isEmpty(dataSource.getIpAddress(), dataSource.getIpUserName(), dataSource.getIpPassword())) {
-                            throw new CustomException(ResultEnum.REQUEST_WRONGPARAMS.getRespCode(), "ip地址,帐户,密码都不能为空");
-                        }
-                        Integer id = itemGroup.getId();
-                        redisUtil.set(ConstantStr.ITEM_GROUP + id, true);
-                        Boolean flage = false;
-                        //获取组中所有标签的数据模型,并转换为map<itemName,DataModel>
-                        List<Item> allItemList = itemGroupDao.getItemListByGroupId(id);
-                        itemGroup.setItemList(allItemList);
-                        if (Blank.isNotEmpty(allItemList)) {
-                            for (Item item : allItemList) {
-                                if (Blank.isNotEmpty(item.getDataModelId())) {
-                                    flage = true;
-                                    break;
-                                }
-                            }
-                        }
-                        Map<String, DataModel> map = new HashMap<>();
-                        if (flage) {
-                            List<DataModel> dmListByItemList = dataModelDao.getDmListByItemList(allItemList);
-                            if (Blank.isNotEmpty(dmListByItemList)) {
-                                for (int i = 0; i < allItemList.size(); i++) {
-                                    for (DataModel dm : dmListByItemList) {
-                                        if (dm.getId() == allItemList.get(i).getDataModelId()) {
-                                            map.put(allItemList.get(i).getItemReadName(), dm);
-                                            break;
-                                        }
-                                    }
-                                }
-                            }
-                        }
-                        redisUtil.set(ConstantStr.ITEM_GROUP + id, true);
-                        Timer timer = new Timer();
-                        List<Item> itemList = itemGroupDao.getItemListByGroupId(id);
-                        //异步读取opcUA数据并保存
-                        timer.schedule(new OpcUaTask(redisUtil,
-                                        opcAsyncTask,
-                                        itemGroupDao,
-                                        messageNoticeDao,
-                                        timer,
-                                        itemGroup,
-                                        dataSource,
-                                        map,
-                                        itemList,
-                                        timeFormat),
-                                DateUtil.strYmdhmsChangeDate(DateUtil.getCurrentYmd() + " " + itemGroup.getStartReadTime()), ConstantStr.PERIOD_DAY);
-                        itemGroupDao.runItemGroupById(id, ConstantStr.START_UP);
-                    }
-                }
-            }
-        }
     }
 }

+ 61 - 0
chaunyi_opc/opc_ua/src/main/java/com/example/opc_ua/config/SpringContextUtils.java

@@ -0,0 +1,61 @@
+package com.example.opc_ua.config;
+
+import org.springframework.beans.BeansException;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationContextAware;
+import org.springframework.stereotype.Component;
+
+@Component
+public class SpringContextUtils implements ApplicationContextAware {
+
+    private static ApplicationContext applicationContext = null;
+
+    @Override
+    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
+        if (SpringContextUtils.applicationContext == null) {
+            SpringContextUtils.applicationContext = applicationContext;
+        }
+    }
+
+    /**
+     * 获取ApplicationContext
+     *
+     * @return
+     */
+    public static ApplicationContext getApplicationContext() {
+        return applicationContext;
+    }
+
+    /**
+     * 通过name获取Bean
+     *
+     * @param name
+     * @return
+     */
+    public static Object getBean(String name) {
+        return getApplicationContext().getBean(name);
+    }
+
+    /**
+     * 通过class获取Bean
+     *
+     * @param clazz
+     * @param <T>
+     * @return
+     */
+    public static <T> T getBean(Class<T> clazz) {
+        return getApplicationContext().getBean(clazz);
+    }
+
+    /**
+     * 通过name和clazz返回指定的Bean
+     *
+     * @param name
+     * @param clazz
+     * @param <T>
+     * @return
+     */
+    public static <T> T getBean(String name, Class<T> clazz) {
+        return getApplicationContext().getBean(name, clazz);
+    }
+}

+ 3 - 1
chaunyi_opc/opc_ua/src/main/java/com/example/opc_ua/dao/ItemGroupDao.java

@@ -13,7 +13,9 @@ public interface ItemGroupDao {
 
     List<Item> getItemListByGroupId(Integer itemGroupId);
 
-    Integer runItemGroupById(Integer id, Integer runState);
+    Integer runItemGroupById(Integer id, Integer runState, String cronId);
+
+    Integer stopItemGroupById(Integer id, Integer runState);
 
     List<ItemGroup> getAllItemGroup(String userId);
 }

+ 8 - 1
chaunyi_opc/opc_ua/src/main/java/com/example/opc_ua/dao/RawDataDao.java

@@ -1,5 +1,6 @@
 package com.example.opc_ua.dao;
 
+import com.example.opc_common.entity.CursorRawData;
 import com.example.opc_common.entity.Item;
 import com.example.opc_common.entity.RawData;
 import org.springframework.stereotype.Repository;
@@ -13,7 +14,7 @@ public interface RawDataDao {
 
     Integer addRawDataList(Integer itemGroupId, Integer remainder, List<RawData> rawDataList);
 
-    Integer addTempRawData(RawData rawData);
+    Integer addCursorRawData(CursorRawData cursorRawData);
 
     RawData getRawDataList(Item item, Integer remainder, Integer dataSourceId, String valueBelongTime);
 
@@ -28,4 +29,10 @@ public interface RawDataDao {
     Integer delEventRawDataList(Item item, Integer dataSourceId, String valueBelongTime);
 
     Integer updateRawData(Integer remainder, RawData rawData);
+
+    List<Long> getMeetIndexList(Integer itemGroupId, Integer dataSourceId, List<Item> itemList, String valueBelongTime, Integer notMeetChange);
+
+    List<CursorRawData> getCursorRawDataList(Item item, Integer dataSourceId, String valueBelongTime, List<Long> indexList);
+
+    Integer delCursorRawDataList(Integer itemGroupId, Integer dataSourceId, String valueBelongTime);
 }

+ 77 - 0
chaunyi_opc/opc_ua/src/main/java/com/example/opc_ua/dynamicSchedule/CronTaskRegister.java

@@ -0,0 +1,77 @@
+package com.example.opc_ua.dynamicSchedule;
+
+import com.example.opc_common.dynamicSchedule.ScheduledTask;
+import org.springframework.beans.factory.DisposableBean;
+import org.springframework.scheduling.TaskScheduler;
+import org.springframework.scheduling.config.CronTask;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.Resource;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * 定时器注册类,增加定时器任务,删除定时器任务
+ */
+@Component
+public class CronTaskRegister implements DisposableBean {
+
+    @Resource
+    TaskScheduler taskScheduler;
+
+    private final Map<String, ScheduledTask> scheduledTaskMap = new ConcurrentHashMap<>();
+
+    public TaskScheduler getTaskScheduler() {
+        return this.taskScheduler;
+    }
+
+    /**
+     * 新增定时器任务
+     *
+     * @param task
+     * @param cron
+     */
+    public void addCronTask(Runnable task, String cronId, String cron) {
+        addCronTask(new CronTask(task, cron), cronId);
+    }
+
+    public void addCronTask(CronTask cronTask, String cronId) {
+        if (cronTask != null) {
+            if (this.scheduledTaskMap.containsKey(cronId)) {
+                removeCronTask(cronId);
+            }
+            //重新添加
+            this.scheduledTaskMap.put(cronId, scheduleCronTask(cronTask));
+        }
+    }
+
+    /**
+     * 移除定时器
+     *
+     * @param cronId
+     */
+    public void removeCronTask(String cronId) {
+        try {
+            ScheduledTask scheduledTask = this.scheduledTaskMap.remove(cronId);
+            if (scheduledTask != null) {
+                scheduledTask.cancel();
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    public ScheduledTask scheduleCronTask(CronTask cronTask) {
+        ScheduledTask scheduledTask = new ScheduledTask();
+        scheduledTask.future = this.taskScheduler.schedule(cronTask.getRunnable(), cronTask.getTrigger());
+        return scheduledTask;
+    }
+
+    @Override
+    public void destroy() throws Exception {
+        for (ScheduledTask task : this.scheduledTaskMap.values()) {
+            task.cancel();
+        }
+        this.scheduledTaskMap.clear();
+    }
+}

+ 81 - 0
chaunyi_opc/opc_ua/src/main/java/com/example/opc_ua/dynamicSchedule/DynamicScheduleConfig.java

@@ -0,0 +1,81 @@
+package com.example.opc_ua.dynamicSchedule;
+
+import com.example.opc_common.entity.DataSource;
+import com.example.opc_common.entity.DataSourceType;
+import com.example.opc_common.entity.ItemGroup;
+import com.example.opc_common.enums.DataSourceTypeEnum;
+import com.example.opc_common.enums.ResultEnum;
+import com.example.opc_common.exception.CustomException;
+import com.example.opc_common.util.Blank;
+import com.example.opc_common.util.ConstantStr;
+import com.example.opc_ua.dao.DataSourceDao;
+import com.example.opc_ua.dao.ItemGroupDao;
+import com.example.opc_ua.task.OpcUaTask;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.scheduling.annotation.SchedulingConfigurer;
+import org.springframework.scheduling.config.ScheduledTaskRegistrar;
+
+import javax.annotation.Resource;
+import java.util.List;
+import java.util.UUID;
+
+@Slf4j
+@Configuration
+public class DynamicScheduleConfig implements SchedulingConfigurer {
+
+    @Resource
+    CronTaskRegister cronTaskRegister;
+
+    @Resource
+    private DataSourceDao dataSourceDao;
+
+    @Resource
+    private ItemGroupDao itemGroupDao;
+
+    @Override
+    public void configureTasks(ScheduledTaskRegistrar scheduledTaskRegistrar) {
+
+        List<ItemGroup> itemGroupList = itemGroupDao.getAllItemGroup(null);
+        if (Blank.isNotEmpty(itemGroupList)) {
+            for (ItemGroup itemGroup : itemGroupList) {
+                String readWeek = itemGroup.getReadWeek();
+                DataSource dataSource = DataSource.convertPassword(dataSourceDao.getDataSourceById(itemGroup.getDataSourceId()));
+                DataSourceType dataSourceType = dataSourceDao.getDataSourceTypeById(dataSource.getTypeId());
+                if (dataSourceType.getDataSourceTypeKey().equals(DataSourceTypeEnum.OPC_UA_REAL.getValue()) ||
+                        dataSourceType.getDataSourceTypeKey().equals(DataSourceTypeEnum.OPC_UA_HISTORY.getValue())) {
+                    if (itemGroup.getRunState() == ConstantStr.START_UP) {
+                        if (Blank.isEmpty(dataSource.getIpAddress(), dataSource.getIpUserName(), dataSource.getIpPassword())) {
+                            throw new CustomException(ResultEnum.REQUEST_WRONGPARAMS.getRespCode(), "数据组配置的数据源的ip地址,帐户,密码都不能为空");
+                        }
+                        Integer id = itemGroup.getId();
+                        if (Blank.isEmpty(dataSource.getIpAddress(), dataSource.getIpPort(), dataSource.getIsAnonymous())) {
+                            throw new CustomException(ResultEnum.REQUEST_WRONGPARAMS.getRespCode(), "数据组配置的数据源的ip地址,端口号,匿名方式都不能为空");
+                        }
+                        if (dataSource.getIsAnonymous() == ConstantStr.NOT_ANONYMOUS) {
+                            if (Blank.isEmpty(dataSource.getIpUserName(), dataSource.getIpPassword())) {
+                                throw new CustomException(ResultEnum.REQUEST_WRONGPARAMS.getRespCode(), "选择不匿名方式,需要填写帐户和密码");
+                            }
+                        }
+                        //新增定时器任务
+                        String cronId = "";
+                        String cron = "0 0 0 ? * ";
+                        if (Blank.isEmpty(readWeek)) {
+                            cron = cron + "MON,TUE,WED,THU,FRI,SAT,SUN";
+                        } else {
+                            cron = cron + itemGroup.getReadWeek();
+                        }
+                        if (Blank.isEmpty(itemGroup.getCronId())) {
+                            cronId = UUID.randomUUID().toString().replace("-", "");
+                        } else {
+                            cronId = itemGroup.getCronId();
+                        }
+                        SchedulingRunnable task = new SchedulingRunnable(OpcUaTask.class, "opcUaTask", new Object[]{itemGroup, dataSource, cronId});
+                        cronTaskRegister.addCronTask(task, cronId, cron);
+                        itemGroupDao.runItemGroupById(id, ConstantStr.START_UP, cronId);
+                    }
+                }
+            }
+        }
+    }
+}

+ 24 - 0
chaunyi_opc/opc_ua/src/main/java/com/example/opc_ua/dynamicSchedule/ScheduleConfig.java

@@ -0,0 +1,24 @@
+package com.example.opc_ua.dynamicSchedule;
+
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.scheduling.TaskScheduler;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
+
+@Configuration
+public class ScheduleConfig {
+
+    /**
+     * 线程池
+     * @return
+     */
+    @Bean
+    public TaskScheduler taskScheduler() {
+        ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
+        //定时任务执行线程池核心线程数
+        taskScheduler.setPoolSize(50);
+        taskScheduler.setRemoveOnCancelPolicy(true);
+        taskScheduler.setThreadNamePrefix("schedule-task-");
+        return taskScheduler;
+    }
+}

+ 57 - 0
chaunyi_opc/opc_ua/src/main/java/com/example/opc_ua/dynamicSchedule/SchedulingRunnable.java

@@ -0,0 +1,57 @@
+package com.example.opc_ua.dynamicSchedule;
+
+import com.example.opc_ua.config.SpringContextUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.util.ReflectionUtils;
+
+import java.lang.reflect.Method;
+
+public class SchedulingRunnable implements Runnable {
+
+    private static final Logger logger = LoggerFactory.getLogger(SchedulingRunnable.class);
+
+    private Class clazz;
+    private String className;
+
+    private String methodName;
+
+    private Object[] params;
+
+    public SchedulingRunnable(Class clazz, String methodName, Object... params) {
+        this.clazz = clazz;
+        className = clazz.getName();
+        this.methodName = methodName;
+        this.params = params;
+    }
+
+    @Override
+    public void run() {
+        logger.info("定时任务开始执行 -bean:{},方法:{},参数:{}", className, methodName, params);
+
+        long startTime = System.currentTimeMillis();
+        try {
+            Object target = SpringContextUtils.getBean(clazz);
+            Method method = null;
+            if (null != params && params.length > 0) {
+                Class<?>[] paramCls = new Class[params.length];
+                for (int i = 0; i < params.length; i++) {
+                    paramCls[i] = params[i].getClass();
+                }
+                method = target.getClass().getDeclaredMethod(methodName, paramCls);
+            } else {
+                method = target.getClass().getDeclaredMethod(methodName);
+            }
+            ReflectionUtils.makeAccessible(method);
+            if (null != params && params.length > 0) {
+                method.invoke(target, params);
+            } else {
+                method.invoke(target);
+            }
+        } catch (Exception e) {
+            logger.error(String.format("定时任务执行异常 - bean:%s,方法:%s,参数:%s ", className, methodName, params), e);
+        }
+        long times = System.currentTimeMillis() - startTime;
+        logger.info("定时任务执行结束 - bean:{},方法:{},参数:{},耗时:{} 毫秒", className, methodName, params, times);
+    }
+}

+ 21 - 10
chaunyi_opc/opc_ua/src/main/java/com/example/opc_ua/service/impl/ItemGroupServiceImpl.java

@@ -6,25 +6,27 @@ import com.example.opc_common.enums.ResultEnum;
 import com.example.opc_common.exception.CustomException;
 import com.example.opc_common.util.Blank;
 import com.example.opc_common.util.ConstantStr;
-import com.example.opc_common.util.DateUtil;
 import com.example.opc_common.util.Result;
 import com.example.opc_ua.dao.DataModelDao;
 import com.example.opc_ua.dao.DataSourceDao;
 import com.example.opc_ua.dao.ItemGroupDao;
 import com.example.opc_ua.dao.MessageNoticeDao;
+import com.example.opc_ua.dynamicSchedule.CronTaskRegister;
+import com.example.opc_ua.dynamicSchedule.SchedulingRunnable;
 import com.example.opc_ua.service.ItemGroupService;
 import com.example.opc_ua.task.OpcAsyncTask;
 import com.example.opc_ua.task.OpcUaTask;
-import com.example.opc_ua.task.OpcUaWeekTask;
 import com.example.opc_ua.util.OpcUaUtil;
 import com.example.opc_ua.util.RedisUtil;
-import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Service;
 import org.springframework.transaction.annotation.Transactional;
 
 import javax.annotation.Resource;
-import java.util.*;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
 
 @Service
 @Transactional
@@ -43,6 +45,9 @@ public class ItemGroupServiceImpl implements ItemGroupService {
     private RedisUtil redisUtil;
 
     @Resource
+    private CronTaskRegister cronTaskRegister;
+
+    @Resource
     private OpcAsyncTask opcAsyncTask;
 
     @Resource
@@ -100,6 +105,7 @@ public class ItemGroupServiceImpl implements ItemGroupService {
     @Override
     public Result runItemGroupById(Integer id, Integer runState) {
         ItemGroup itemGroup = itemGroupDao.getItemGroupById(id);
+        String readWeek = itemGroup.getReadWeek();
         DataSource dataSource = DataSource.convertPassword(dataSourceDao.getDataSourceById(itemGroup.getDataSourceId()));
         DataSourceType dataSourceType = dataSourceDao.getDataSourceTypeById(dataSource.getTypeId());
 
@@ -117,23 +123,28 @@ public class ItemGroupServiceImpl implements ItemGroupService {
                         throw new CustomException(ResultEnum.REQUEST_WRONGPARAMS.getRespCode(), "选择不匿名方式,需要填写帐户和密码");
                     }
                 }
-                redisUtil.set(ConstantStr.ITEM_GROUP + id, true);
                 //新增定时器任务
                 String cronId = "";
-                String cron = "0 0 0 ? * " + itemGroup.getReadWeek();
+                String cron = "0 0 0 ? * ";
+                if (Blank.isEmpty(readWeek)) {
+                    cron = cron + "MON,TUE,WED,THU,FRI,SAT,SUN";
+                } else {
+                    cron = cron + itemGroup.getReadWeek();
+                }
                 if (Blank.isEmpty(itemGroup.getCronId())) {
                     cronId = UUID.randomUUID().toString().replace("-", "");
                 } else {
                     cronId = itemGroup.getCronId();
                 }
-                itemGroupDao.runItemGroupById(id, runState);
+                SchedulingRunnable task = new SchedulingRunnable(OpcUaTask.class, "opcUaTask", new Object[]{itemGroup, dataSource, cronId});
+                cronTaskRegister.addCronTask(task, cronId, cron);
+                itemGroupDao.runItemGroupById(id, runState, cronId);
                 return Result.ok("启动成功");
             } else {
-                throw new CustomException(ResultEnum.SERVER_ERROR.getRespCode(), "目前还没有此种类型的连接方式");
+                throw new CustomException(ResultEnum.REQUEST_WRONGPARAMS.getRespCode(), "目前还没有此种类型的连接方式");
             }
         } else if (runState.equals(ConstantStr.STOP_IT)) {
-            redisUtil.del(ConstantStr.ITEM_GROUP + id);
-            itemGroupDao.runItemGroupById(id, runState);
+            itemGroupDao.stopItemGroupById(id, runState);
             return Result.ok("停用成功");
         } else {
             throw new CustomException(ResultEnum.REQUEST_WRONGPARAMS.getRespCode(), ResultEnum.REQUEST_WRONGPARAMS.getRespMsg());

+ 89 - 99
chaunyi_opc/opc_ua/src/main/java/com/example/opc_ua/task/OpcAsyncTask.java

@@ -7,6 +7,7 @@ import com.example.opc_common.entity.*;
 import com.example.opc_common.util.Blank;
 import com.example.opc_common.util.ConstantStr;
 import com.example.opc_common.util.DateUtil;
+import com.example.opc_common.util.JavaTypeUtil;
 import com.example.opc_ua.dao.RawDataDao;
 import com.example.opc_ua.dao.ReportTableDao;
 import com.example.opc_ua.util.OpcUaUtil;
@@ -50,11 +51,11 @@ public class OpcAsyncTask {
     /**
      * 新增一条临时数据
      *
-     * @param rawData
+     * @param cursorRawData
      */
-    public void addTempRawData(RawData rawData) {
-        if (Blank.isNotEmpty(rawData)) {
-            rawDataDao.addTempRawData(rawData);
+    public void addCursorRawData(CursorRawData cursorRawData) {
+        if (Blank.isNotEmpty(cursorRawData)) {
+            rawDataDao.addCursorRawData(cursorRawData);
         }
     }
 
@@ -131,68 +132,81 @@ public class OpcAsyncTask {
 
     public void packageRawDataList(List<Item> itemList, Integer dataSourceId, String sqlCurrentYmdh) {
         try {
-            Thread.sleep(5000);
+            Thread.sleep(3000);
         } catch (Exception e) {
             e.printStackTrace();
         }
-
         if (Blank.isNotEmpty(itemList)) {
-            Item item1 = itemList.get(0);
-            Integer itemGroupId = item1.getItemGroupId();
-            Integer remainder = itemGroupId % ConstantStr.SUB_TABLE_NUM;
-            for (Item item : itemList) {
-                RawData oldRawData = rawDataDao.getRawDataList(item, remainder, dataSourceId, sqlCurrentYmdh);
-                List<RawData> rawDataList = rawDataDao.getTempRawDataList(item, dataSourceId, sqlCurrentYmdh);
-                if (Blank.isNotEmpty(rawDataList)) {
-                    RawData rawData1 = rawDataList.get(0);
-                    String itemName = rawData1.getItemName();
-                    String dataType = rawData1.getDataType();
-                    if (dataType.equals("boolean")) {
-                        List<Boolean> dataList = Blank.isNotEmpty(oldRawData) ? new ArrayList<>(Arrays.asList(JSON.parseObject(oldRawData.getDataValue(), Boolean[].class))) : new ArrayList<>();
-                        List<String> dataTimeList = Blank.isNotEmpty(oldRawData) ? new ArrayList<>(Arrays.asList(JSON.parseObject(oldRawData.getDataValueTime(), String[].class))) : new ArrayList<>();
-                        for (RawData rawData : rawDataList) {
-                            dataList.add(JSON.parseObject(rawData.getDataValue(), Boolean.class));
-                            dataTimeList.add(rawData.getDataValueTime());
-                        }
-//                        RawData rawData = new RawData(itemGroupId, dataSourceId, itemName, dataType, JSON.toJSONString(dataList),
-//                                JSON.toJSONString(dataTimeList), sqlCurrentYmdh, new Date());
-//                        if (Blank.isEmpty(oldRawData)) {
-//                            rawDataDao.addRawData(remainder, rawData);
-//                        } else {
-//                            rawDataDao.updateRawData(remainder, rawData);
-//                        }
-                    } else {
-                        List<String> dataTimeList = Blank.isNotEmpty(oldRawData) ? new ArrayList<>(Arrays.asList(JSON.parseObject(oldRawData.getDataValueTime(), String[].class))) : new ArrayList<>();
-                        try {
-                            List<BigDecimal> dataList = Blank.isNotEmpty(oldRawData) ? new ArrayList<>(Arrays.asList(JSON.parseObject(oldRawData.getDataValue(), BigDecimal[].class))) : new ArrayList<>();
-                            for (RawData rawData : rawDataList) {
-                                BigDecimal bigDecimal = JSON.parseObject(rawData.getDataValue(), BigDecimal.class);
-                                dataList.add(bigDecimal);
-                                dataTimeList.add(rawData.getDataValueTime());
+            Item item_ = itemList.get(0);
+            Integer itemGroupId = item_.getItemGroupId();
+            int remainder = itemGroupId % ConstantStr.SUB_TABLE_NUM;
+            //获取原始数据表中,满足记录的数据
+            List<Long> indexList = rawDataDao.getMeetIndexList(itemGroupId, dataSourceId, itemList, sqlCurrentYmdh, ConstantStr.NOT_MEET_CHANGE);
+            if (Blank.isNotEmpty(indexList)) {
+                for (Item item : itemList) {
+                    //从临时表中获取相应的数据
+                    List<CursorRawData> cursorRawDataList = rawDataDao.getCursorRawDataList(item, dataSourceId, sqlCurrentYmdh, indexList);
+                    if (Blank.isNotEmpty(cursorRawDataList)) {
+                        CursorRawData cursorRawData_ = cursorRawDataList.get(0);
+                        String itemName = cursorRawData_.getItemName();
+                        String dataType = cursorRawData_.getDataType();
+                        if (dataType.toLowerCase().equals("boolean")) {
+                            List<Boolean> dataList = new ArrayList<>();
+                            List<String> dataTimeList = new ArrayList<>();
+                            List<Long> dataIndexList = new ArrayList<>();
+                            List<Integer> isMeetChangeList = new ArrayList<>();
+                            for (CursorRawData cursorRawData : cursorRawDataList) {
+                                dataList.add(JSON.parseObject(cursorRawData.getDataValue(), Boolean.class));
+                                dataTimeList.add(cursorRawData.getDataValueTime());
+                                dataIndexList.add(cursorRawData.getDataIndex());
+                                isMeetChangeList.add(cursorRawData.getIsMeetChange());
                             }
-//                            RawData rawData = new RawData(itemGroupId, dataSourceId, itemName, dataType, JSON.toJSONString(dataList),
-//                                    JSON.toJSONString(dataTimeList), sqlCurrentYmdh, new Date());
-//                            if (Blank.isEmpty(oldRawData)) {
-//                                rawDataDao.addRawData(remainder, rawData);
-//                            } else {
-//                                rawDataDao.updateRawData(remainder, rawData);
-//                            }
-                        } catch (Exception e) {
-                            List<String> dataList = Blank.isNotEmpty(oldRawData) ? new ArrayList<>(Arrays.asList(JSON.parseObject(oldRawData.getDataValue(), String[].class))) : new ArrayList<>();
-                            for (RawData rawData : rawDataList) {
-                                dataList.add(rawData.getDataValue());
-                                dataTimeList.add(rawData.getDataValueTime());
+                            RawData rawData = new RawData(itemGroupId, dataSourceId, itemName, dataType, JSON.toJSONString(dataList),
+                                    JSON.toJSONString(dataTimeList), JSON.toJSONString(dataIndexList), sqlCurrentYmdh, JSON.toJSONString(isMeetChangeList), new Date());
+                            rawDataDao.addRawData(remainder, rawData);
+                        } else {
+                            try {
+                                List<BigDecimal> dataList = new ArrayList<>();
+                                List<String> dataTimeList = new ArrayList<>();
+                                List<Long> dataIndexList = new ArrayList<>();
+                                List<Integer> isMeetChangeList = new ArrayList<>();
+                                for (CursorRawData cursorRawData : cursorRawDataList) {
+                                    dataList.add(JSON.parseObject(cursorRawData.getDataValue(), BigDecimal.class));
+                                    dataTimeList.add(cursorRawData.getDataValueTime());
+                                    dataIndexList.add(cursorRawData.getDataIndex());
+                                    isMeetChangeList.add(cursorRawData.getIsMeetChange());
+                                }
+                                RawData rawData = new RawData(itemGroupId, dataSourceId, itemName, dataType, JSON.toJSONString(dataList),
+                                        JSON.toJSONString(dataTimeList), JSON.toJSONString(dataIndexList), sqlCurrentYmdh, JSON.toJSONString(isMeetChangeList), new Date());
+                                rawDataDao.addRawData(remainder, rawData);
+                            } catch (Exception e) {
+                                List<String> dataList = new ArrayList<>();
+                                List<String> dataTimeList = new ArrayList<>();
+                                List<Long> dataIndexList = new ArrayList<>();
+                                List<Integer> isMeetChangeList = new ArrayList<>();
+                                for (CursorRawData cursorRawData : cursorRawDataList) {
+                                    dataList.add(cursorRawData.getDataValue());
+                                    dataTimeList.add(cursorRawData.getDataValueTime());
+                                    dataIndexList.add(cursorRawData.getDataIndex());
+                                    isMeetChangeList.add(cursorRawData.getIsMeetChange());
+                                }
+                                RawData rawData = new RawData(itemGroupId, dataSourceId, itemName, dataType, JSON.toJSONString(dataList),
+                                        JSON.toJSONString(dataTimeList), JSON.toJSONString(dataIndexList), sqlCurrentYmdh, JSON.toJSONString(isMeetChangeList), new Date());
+                                rawDataDao.addRawData(remainder, rawData);
                             }
-//                            RawData rawData = new RawData(itemGroupId, dataSourceId, itemName, dataType, JSON.toJSONString(dataList),
-//                                    JSON.toJSONString(dataTimeList), sqlCurrentYmdh, new Date());
-//                            if (Blank.isEmpty(oldRawData)) {
-//                                rawDataDao.addRawData(remainder, rawData);
-//                            } else {
-//                                rawDataDao.updateRawData(remainder, rawData);
-//                            }
                         }
                     }
-                    rawDataDao.delRawDataList(rawDataList, sqlCurrentYmdh);
+                }
+            }
+            //删除临时表中的相应的数据
+            rawDataDao.delCursorRawDataList(itemGroupId, dataSourceId, sqlCurrentYmdh);
+
+            //如果有事件驱动报表,则生成事件驱动报表
+            for (Item item : itemList) {
+                Integer tableReportId = item.getTableReportId();
+                if (Blank.isNotEmpty(tableReportId)) {
+                    eventTable(tableReportId, dataSourceId, itemList, sqlCurrentYmdh);
+                    break;
                 }
             }
         }
@@ -219,89 +233,65 @@ public class OpcAsyncTask {
     /**
      * 事件驱动报表异步添加数据
      */
-    public void eventTable(Integer dataSourceId, List<Item> itemList, String sqlCurrentYmdh) {
-        try {
-            Thread.sleep(5000);
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
-        Integer tableReportId = null;
-        if (Blank.isNotEmpty(itemList)) {
-            for (Item item : itemList) {
-//                if (Blank.isNotEmpty(item.getEventMode()) && Blank.isNotEmpty(item.getEventValue()) && Blank.isNotEmpty(item.getTableReportId())) {
-//                    tableReportId = item.getTableReportId();
-//                    break;
-//                }
-            }
-        }
-        if (Blank.isEmpty(tableReportId)) {
-            return;
-        }
+    public void eventTable(Integer tableReportId,Integer dataSourceId, List<Item> itemList, String sqlCurrentYmdh) {
         ReportTable reportTable = reportTableDao.getReportTableById(tableReportId);
         String reportTableData = reportTable.getReportTableData();
         JSONObject jsonObject = JSONObject.parseObject(reportTableData);
         JSONArray objects = new JSONArray();
         if (Blank.isNotEmpty(itemList)) {
+            Integer remainder = itemList.get(0).getItemGroupId() % ConstantStr.SUB_TABLE_NUM;
             for (int i = 0; i < itemList.size(); i++) {
-                List<RawData> rawDataList = rawDataDao.getEventRawDataList(itemList.get(i), dataSourceId, sqlCurrentYmdh);
-                if (Blank.isNotEmpty(rawDataList)) {
+                RawData rawData = rawDataDao.getRawDataList(itemList.get(i), remainder, dataSourceId, sqlCurrentYmdh);
+                if (Blank.isNotEmpty(rawData)) {
                     JSONObject jsonObject1 = new JSONObject();
-                    RawData rawData = rawDataList.get(0);
                     String dataType = rawData.getDataType();
                     if (dataType.toLowerCase().equals("boolean")) {
-                        List<Boolean> dataList = new ArrayList<>();
-                        List<String> dataTimeList = new ArrayList<>();
-                        for (RawData rawData1 : rawDataList) {
-                            dataList.add(JSON.parseObject(rawData1.getDataValue(), Boolean.class));
-                            dataTimeList.add(rawData1.getDataValueTime());
-                        }
+                        List<Boolean> dataList = JavaTypeUtil.objChangeListBool(rawData.getDataValue());
+                        List<String> dataTimeList = JavaTypeUtil.objChangeListStr(rawData.getDataValueTime());
+                        List<Long> dataIndexList = JavaTypeUtil.objChangeListLong(rawData.getDataIndex());
                         jsonObject1.put("itemGroupId", rawData.getItemGroupId());
                         jsonObject1.put("itemGroupName", itemList.get(i).getItemGroupName());
                         jsonObject1.put("itemName", itemList.get(i).getItemName());
                         jsonObject1.put("describe", Blank.isEmpty(itemList.get(i).getDescribe()) ? itemList.get(i).getItemName() : itemList.get(i).getDescribe());
                         jsonObject1.put("dataList", dataList);
                         jsonObject1.put("dataTimeList", dataTimeList);
+                        jsonObject1.put("dataIndexList", dataIndexList);
                         objects.set(i, jsonObject1);
                     } else {
                         try {
-                            List<BigDecimal> dataList = new ArrayList<>();
-                            List<String> dataTimeList = new ArrayList<>();
-                            for (RawData rawData1 : rawDataList) {
-                                dataList.add(JSON.parseObject(rawData1.getDataValue(), BigDecimal.class));
-                                dataTimeList.add(rawData1.getDataValueTime());
-                            }
+                            List<BigDecimal> dataList = JavaTypeUtil.objChangeListBig(rawData.getDataValue());
+                            List<String> dataTimeList = JavaTypeUtil.objChangeListStr(rawData.getDataValueTime());
+                            List<Long> dataIndexList = JavaTypeUtil.objChangeListLong(rawData.getDataIndex());
                             jsonObject1.put("itemGroupId", rawData.getItemGroupId());
                             jsonObject1.put("itemGroupName", itemList.get(i).getItemGroupName());
                             jsonObject1.put("itemName", itemList.get(i).getItemName());
                             jsonObject1.put("describe", Blank.isEmpty(itemList.get(i).getDescribe()) ? itemList.get(i).getItemName() : itemList.get(i).getDescribe());
                             jsonObject1.put("dataList", dataList);
                             jsonObject1.put("dataTimeList", dataTimeList);
+                            jsonObject1.put("dataIndexList", dataIndexList);
                             objects.set(i, jsonObject1);
                         } catch (Exception e) {
-                            List<String> dataList = new ArrayList<>();
-                            List<String> dataTimeList = new ArrayList<>();
-                            for (RawData rawData1 : rawDataList) {
-                                dataList.add(rawData1.getDataValue());
-                                dataTimeList.add(rawData1.getDataValueTime());
-                            }
+                            List<String> dataList = JavaTypeUtil.objChangeListStr(rawData.getDataValue());
+                            List<String> dataTimeList = JavaTypeUtil.objChangeListStr(rawData.getDataValueTime());
+                            List<Long> dataIndexList = JavaTypeUtil.objChangeListLong(rawData.getDataIndex());
                             jsonObject1.put("itemGroupId", rawData.getItemGroupId());
                             jsonObject1.put("itemGroupName", itemList.get(i).getItemGroupName());
                             jsonObject1.put("itemName", itemList.get(i).getItemName());
                             jsonObject1.put("describe", Blank.isEmpty(itemList.get(i).getDescribe()) ? itemList.get(i).getItemName() : itemList.get(i).getDescribe());
                             jsonObject1.put("dataList", dataList);
                             jsonObject1.put("dataTimeList", dataTimeList);
+                            jsonObject1.put("dataIndexList", dataIndexList);
                             objects.set(i, jsonObject1);
                         }
                     }
                 }
-                rawDataDao.delEventRawDataList(itemList.get(i), dataSourceId, sqlCurrentYmdh);
             }
         }
         jsonObject.put("eventTables", objects);
         ReportTable reportTable1 = new ReportTable();
         reportTable1.setTableTemplateId(reportTable.getTableTemplateId());
         reportTable1.setUserId(reportTable.getUserId());
-        reportTable1.setReportTableName(reportTable.getReportTableName() + "_" + DateUtil.dateChangeStr(new Date(), "yyyyMMddHHmmss"));
+        reportTable1.setReportTableName(reportTable.getReportTableName() + "_" + (Blank.isNotEmpty(itemList) ? itemList.get(0).getItemGroupName() : "") + "_" + DateUtil.dateChangeStr(new Date(), "yyyyMMddHHmmss"));
         reportTable1.setReportTableData(jsonObject.toJSONString());
         reportTable1.setReportValueFormat(reportTable.getReportValueFormat());
         reportTable1.setIsAutoReport(ConstantStr.EVENT_GENERATE_REPORT);

+ 53 - 51
chaunyi_opc/opc_ua/src/main/java/com/example/opc_ua/task/OpcUaChangeTask.java

@@ -2,9 +2,13 @@ package com.example.opc_ua.task;
 
 import com.alibaba.fastjson.JSON;
 import com.example.opc_common.entity.*;
-import com.example.opc_common.util.*;
+import com.example.opc_common.util.Blank;
+import com.example.opc_common.util.ConstantStr;
+import com.example.opc_common.util.DateUtil;
+import com.example.opc_common.util.MathUtil;
 import com.example.opc_ua.dao.ItemGroupDao;
 import com.example.opc_ua.dao.MessageNoticeDao;
+import com.example.opc_ua.dynamicSchedule.CronTaskRegister;
 import com.example.opc_ua.util.OpcUaUtil;
 import com.example.opc_ua.util.RedisUtil;
 import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
@@ -21,6 +25,10 @@ public class OpcUaChangeTask extends TimerTask {
 
     private final OpcAsyncTask opcAsyncTask;
 
+    private final String cronId;
+
+    private final CronTaskRegister cronTaskRegister;
+
     private final ItemGroupDao itemGroupDao;
 
     private final MessageNoticeDao messageNoticeDao;
@@ -39,10 +47,16 @@ public class OpcUaChangeTask extends TimerTask {
 
     private final String timeFormat;
 
-    private Boolean eventFlage = false;
+    private final Long endTime;
+
+    private String sqlCurrentYmdh;
+
+    private Long index = 0L;
 
     public OpcUaChangeTask(RedisUtil redisUtil,
                            OpcAsyncTask opcAsyncTask,
+                           String cronId,
+                           CronTaskRegister cronTaskRegister,
                            ItemGroupDao itemGroupDao,
                            MessageNoticeDao messageNoticeDao,
                            OpcUaClient opcUaClient,
@@ -51,9 +65,13 @@ public class OpcUaChangeTask extends TimerTask {
                            ItemGroup itemGroup,
                            DataSource dataSource,
                            Timer timer,
-                           String timeFormat) {
+                           String timeFormat,
+                           Long endTime
+    ) {
         this.redisUtil = redisUtil;
         this.opcAsyncTask = opcAsyncTask;
+        this.cronId = cronId;
+        this.cronTaskRegister = cronTaskRegister;
         this.itemGroupDao = itemGroupDao;
         this.messageNoticeDao = messageNoticeDao;
         this.opcUaClient = opcUaClient;
@@ -63,19 +81,22 @@ public class OpcUaChangeTask extends TimerTask {
         this.dataSource = dataSource;
         this.timer = timer;
         this.timeFormat = timeFormat;
+        this.endTime = endTime;
     }
 
     @Override
     public void run() {
+        index++;
         Integer id = itemGroup.getId();
+        Integer readModeType = itemGroup.getReadModeType();
+        Double modeValue = itemGroup.getModeValue();
         Integer dataSourceId = dataSource.getId();
         for (Item item : itemList) {
             String itemId = item.getItemReadName();
-            Integer tableReportId = item.getTableReportId();
             NodeId n = new NodeId(item.getNodeIndex(), itemId);
             DataModel dm = map.get(itemId);
             Object redisValue = redisUtil.get(ConstantStr.VALUE + id + itemId);
-            String sqlCurrentYmdh = String.valueOf(redisUtil.get(ConstantStr.VALUE_BELONG_TIME + id + itemId));
+            BigDecimal oldValue = JSON.parseObject(redisValue.toString(), BigDecimal.class);
             try {
                 DataValue dataValue = opcUaClient.readValue(0.0, TimestampsToReturn.Both, n).get();
                 if (!dataValue.getStatusCode().isGood()) {
@@ -87,88 +108,69 @@ public class OpcUaChangeTask extends TimerTask {
 
                 //归属时间
                 String currentYmdh = DateUtil.dateChangeStr(time, timeFormat);
+                String currentYmdhmss = DateUtil.dateChangeStrYmdhmss(time);
                 if (Blank.isEmpty(sqlCurrentYmdh)) {
                     sqlCurrentYmdh = currentYmdh;
-                    redisUtil.set(ConstantStr.VALUE_BELONG_TIME + id, currentYmdh, ConstantStr.TWO_HOUR);
                 } else {
                     if (!sqlCurrentYmdh.equals(currentYmdh)) {
                         //组装相应的数据
-                        opcAsyncTask.packageRawData(item, dataSourceId, sqlCurrentYmdh);
-                        if (Blank.isNotEmpty(tableReportId)) {
-                            opcAsyncTask.eventTable(dataSourceId, itemList, sqlCurrentYmdh);
-                        }
-                        redisUtil.set(ConstantStr.VALUE_BELONG_TIME + id, currentYmdh, ConstantStr.TWO_HOUR);
+                        opcAsyncTask.packageRawDataList(itemList, dataSourceId, sqlCurrentYmdh);
                         sqlCurrentYmdh = currentYmdh;
                     }
                 }
-                String currentYmdhmss = DateUtil.dateChangeStrYmdhmss(time);
                 if (javaType.toLowerCase().equals("boolean")) {
-                    if (JSON.parseObject(redisValue.toString(), Boolean.class) != JSON.parseObject(value.toString(), Boolean.class)) {
-                        redisUtil.set(ConstantStr.VALUE + id + itemId, JSON.parseObject(value.toString(), Boolean.class), ConstantStr.TWO_HOUR);
-                        Boolean dmData = JSON.parseObject(value.toString(), Boolean.class);
-//                        RawData rawData = new RawData(id, dataSourceId, itemId, javaType, JSON.toJSONString(dmData),
-//                                currentYmdhmss, currentYmdh, new Date());
-//                        opcAsyncTask.addTempRawData(rawData);
-
-                    }
+                    Boolean dmData = JSON.parseObject(value.toString(), Boolean.class);
+                    CursorRawData cursorRawData = new CursorRawData(id, dataSourceId, itemId, javaType, JSON.toJSONString(dmData),
+                            currentYmdhmss, index, currentYmdh, ConstantStr.IS_MEET_CHANGE, new Date());
+                    opcAsyncTask.addCursorRawData(cursorRawData);
                 } else {
                     try {
                         BigDecimal bigDecimal = JSON.parseObject(value.toString(), BigDecimal.class);
                         BigDecimal dmData = Blank.isNotEmpty(dm) ?
                                 MathUtil.quadricOperation(dm.getMathParameter(), dm.getOperationRule(), bigDecimal) :
                                 bigDecimal;
-//                        if (JSON.parseObject(redisValue.toString(), BigDecimal.class).compareTo(dmData) != 0) {
-//                            RawData rawData = new RawData(id, dataSourceId, itemId, javaType, JSON.toJSONString(dmData),
-//                                    currentYmdhmss, currentYmdh, new Date());
-//                            opcAsyncTask.addTempRawData(rawData);
-//
-//                        }
+                        CursorRawData cursorRawData = new CursorRawData(id, dataSourceId, itemId, javaType, JSON.toJSONString(dmData),
+                                currentYmdhmss, index, currentYmdh,
+                                Blank.isEmpty(oldValue) ? ConstantStr.NOT_MEET_CHANGE : MathUtil.isMeetChange(oldValue,
+                                        dmData, new BigDecimal(modeValue), readModeType), new Date());
+                        opcAsyncTask.addCursorRawData(cursorRawData);
+                        redisUtil.set(ConstantStr.VALUE + id + itemId, dmData, ConstantStr.TWO_HOUR);
                     } catch (Exception e) {
-//                        RawData rawData = new RawData(id, dataSourceId, itemId, javaType, JSON.toJSONString(value.toString()),
-//                                currentYmdhmss, currentYmdh, new Date());
-//                        opcAsyncTask.addTempRawData(rawData);
-
+                        CursorRawData cursorRawData = new CursorRawData(id, dataSourceId, itemId, javaType, JSON.toJSONString(value.toString()),
+                                currentYmdhmss, index, currentYmdh, ConstantStr.IS_MEET_CHANGE, new Date());
+                        opcAsyncTask.addCursorRawData(cursorRawData);
                     }
                 }
             } catch (Exception e) {
                 //执行组装数据库的数据,以及生成驱动报表
                 opcAsyncTask.packageRawDataList(itemList, dataSourceId, sqlCurrentYmdh);
-                if ( Blank.isNotEmpty(tableReportId)) {
-                    opcAsyncTask.eventTable(dataSourceId, itemList, sqlCurrentYmdh);
-                }
-                redisUtil.del(ConstantStr.VALUE_BELONG_TIME + id);
+                redisUtil.del(ConstantStr.VALUE + id + itemId);
 
                 messageNoticeDao.addMessageNotice(new MessageNotice(itemGroup.getUserId(),
                         itemGroup.getGroupName() + "-" + itemId + DateUtil.dateChangeStrYmdhms(new Date()) + "运行失败",
-                        e.getMessage(),
+                        OpcUaUtil.genException(e.getMessage()),
                         ConstantStr.NO_READ));
+
+                itemGroupDao.stopItemGroupById(id, ConstantStr.EXCEPT_STOP_UP);
                 if (Blank.isNotEmpty(opcUaClient)) {
                     opcUaClient.disconnect();
                 }
-
-                redisUtil.del(ConstantStr.ITEM_GROUP + id);
-                itemGroupDao.runItemGroupById(id, ConstantStr.STOP_IT);
-                timer.cancel();
+                cronTaskRegister.removeCronTask(cronId);
+                if (Blank.isNotEmpty(timer)) {
+                    timer.cancel();
+                }
             }
         }
-        Boolean flage = (Boolean) redisUtil.get(ConstantStr.ITEM_GROUP + id);
-        if (Blank.isEmpty(flage)) {
-            flage = false;
-        }
-        if (!flage) {
+        if (System.currentTimeMillis() >= endTime) {
             //执行组装数据库的数据,以及生成驱动报表
-            String sqlCurrentYmdh = String.valueOf(redisUtil.get(ConstantStr.VALUE_BELONG_TIME + id));
             opcAsyncTask.packageRawDataList(itemList, dataSourceId, sqlCurrentYmdh);
-            opcAsyncTask.eventTable(dataSourceId, itemList, sqlCurrentYmdh);
-            redisUtil.del(ConstantStr.VALUE_BELONG_TIME + id);
 
             if (Blank.isNotEmpty(opcUaClient)) {
                 opcUaClient.disconnect();
             }
-
-            redisUtil.del(ConstantStr.ITEM_GROUP + id);
-            itemGroupDao.runItemGroupById(id, ConstantStr.STOP_IT);
-            timer.cancel();
+            if (Blank.isNotEmpty(timer)) {
+                timer.cancel();
+            }
         }
     }
 }

+ 80 - 77
chaunyi_opc/opc_ua/src/main/java/com/example/opc_ua/task/OpcUaExceedTask.java

@@ -2,11 +2,14 @@ package com.example.opc_ua.task;
 
 import com.alibaba.fastjson.JSON;
 import com.example.opc_common.entity.*;
-import com.example.opc_common.util.*;
+import com.example.opc_common.util.Blank;
+import com.example.opc_common.util.ConstantStr;
+import com.example.opc_common.util.DateUtil;
+import com.example.opc_common.util.MathUtil;
 import com.example.opc_ua.dao.ItemGroupDao;
 import com.example.opc_ua.dao.MessageNoticeDao;
+import com.example.opc_ua.dynamicSchedule.CronTaskRegister;
 import com.example.opc_ua.util.OpcUaUtil;
-import com.example.opc_ua.util.RedisUtil;
 import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
 import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue;
 import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
@@ -17,10 +20,12 @@ import java.util.*;
 
 public class OpcUaExceedTask extends TimerTask {
 
-    private final RedisUtil redisUtil;
-
     private final OpcAsyncTask opcAsyncTask;
 
+    private final String cronId;
+
+    private final CronTaskRegister cronTaskRegister;
+
     private final ItemGroupDao itemGroupDao;
 
     private final MessageNoticeDao messageNoticeDao;
@@ -39,10 +44,15 @@ public class OpcUaExceedTask extends TimerTask {
 
     private final String timeFormat;
 
-    private Boolean eventFlage = false;
+    private final Long endTime;
+
+    private String sqlCurrentYmdh;
+
+    private Long index = 0L;
 
-    public OpcUaExceedTask(RedisUtil redisUtil,
-                           OpcAsyncTask opcAsyncTask,
+    public OpcUaExceedTask(OpcAsyncTask opcAsyncTask,
+                           String cronId,
+                           CronTaskRegister cronTaskRegister,
                            ItemGroupDao itemGroupDao,
                            MessageNoticeDao messageNoticeDao,
                            OpcUaClient opcUaClient,
@@ -51,9 +61,11 @@ public class OpcUaExceedTask extends TimerTask {
                            ItemGroup itemGroup,
                            DataSource dataSource,
                            Timer timer,
-                           String timeFormat) {
-        this.redisUtil = redisUtil;
+                           String timeFormat,
+                           Long endTime) {
         this.opcAsyncTask = opcAsyncTask;
+        this.cronId = cronId;
+        this.cronTaskRegister = cronTaskRegister;
         this.itemGroupDao = itemGroupDao;
         this.messageNoticeDao = messageNoticeDao;
         this.opcUaClient = opcUaClient;
@@ -63,11 +75,15 @@ public class OpcUaExceedTask extends TimerTask {
         this.dataSource = dataSource;
         this.timer = timer;
         this.timeFormat = timeFormat;
+        this.endTime = endTime;
     }
 
     @Override
     public void run() {
+        index++;
         Integer id = itemGroup.getId();
+        Integer eventMode = itemGroup.getEventMode();
+        Double modeValue = itemGroup.getModeValue();
         Integer dataSourceId = dataSource.getId();
         for (Item item : itemList) {
             String itemId = item.getItemReadName();
@@ -75,7 +91,6 @@ public class OpcUaExceedTask extends TimerTask {
             NodeId n = new NodeId(item.getNodeIndex(), itemId);
             DataModel dm = map.get(itemId);
             BigDecimal bigModeValue = new BigDecimal(itemGroup.getModeValue());
-            String sqlCurrentYmdh = String.valueOf(redisUtil.get(ConstantStr.VALUE_BELONG_TIME + id + itemId));
             try {
                 DataValue dataValue = opcUaClient.readValue(0.0, TimestampsToReturn.Both, n).get();
                 if (!dataValue.getStatusCode().isGood()) {
@@ -86,112 +101,100 @@ public class OpcUaExceedTask extends TimerTask {
                 Date time = dataValue.getServerTime().getJavaDate();
 
                 String currentYmdh = DateUtil.dateChangeStr(time, timeFormat);
+                String currentYmdhmss = DateUtil.dateChangeStrYmdhmss(time);
                 if (Blank.isEmpty(sqlCurrentYmdh)) {
                     sqlCurrentYmdh = currentYmdh;
-                    redisUtil.set(ConstantStr.VALUE_BELONG_TIME + id, currentYmdh, ConstantStr.TWO_HOUR);
                 } else {
                     if (!sqlCurrentYmdh.equals(currentYmdh)) {
                         //组装相应的数据
-                        opcAsyncTask.packageRawData(item, dataSourceId, sqlCurrentYmdh);
-                        if ( Blank.isNotEmpty(tableReportId)) {
-                            opcAsyncTask.eventTable(dataSourceId, itemList, sqlCurrentYmdh);
-                        }
-                        redisUtil.set(ConstantStr.VALUE_BELONG_TIME + id, currentYmdh, ConstantStr.TWO_HOUR);
+                        opcAsyncTask.packageRawDataList(itemList, dataSourceId, sqlCurrentYmdh);
                         sqlCurrentYmdh = currentYmdh;
                     }
                 }
-                String currentYmdhmss = DateUtil.dateChangeStrYmdhmss(time);
-                if (!javaType.toLowerCase().equals("boolean")) {
+                if (javaType.toLowerCase().equals("boolean")) {
+                    Boolean dmData = JSON.parseObject(value.toString(), Boolean.class);
+                    CursorRawData cursorRawData = new CursorRawData(id, dataSourceId, itemId, javaType, JSON.toJSONString(dmData),
+                            currentYmdhmss, index, currentYmdh, ConstantStr.IS_MEET_CHANGE, new Date());
+                    opcAsyncTask.addCursorRawData(cursorRawData);
+                } else {
                     try {
                         BigDecimal bigDecimal = JSON.parseObject(value.toString(), BigDecimal.class);
                         BigDecimal dmData = Blank.isNotEmpty(dm) ?
                                 MathUtil.quadricOperation(dm.getMathParameter(), dm.getOperationRule(), bigDecimal) :
                                 bigDecimal;
-                        if (dmData.compareTo(bigModeValue) == 1) {
-//                            RawData rawData = new RawData(id, dataSourceId, itemId, javaType, JSON.toJSONString(dmData),
-//                                    currentYmdhmss, currentYmdh, new Date());
-//                            opcAsyncTask.addTempRawData(rawData);
-
-                            if ( Blank.isNotEmpty(tableReportId)) {
-//                                BigDecimal bigEventValue = new BigDecimal(eventValue);
-//                                if (eventMode.equals(ConstantStr.EVENT_MODEL_EXCEED)) {
-//                                    if (dmData.compareTo(bigEventValue) == 1) {
-//                                        eventFlage = true;
-//                                        opcAsyncTask.addEventRawData(rawData);
-//                                    } else {
-//                                        eventFlage = false;
-//                                    }
-//                                } else if (eventMode.equals(ConstantStr.EVENT_MODEL_LOWER)) {
-//                                    if (dmData.compareTo(bigEventValue) == -1) {
-//                                        eventFlage = true;
-//                                        opcAsyncTask.addEventRawData(rawData);
-//                                    } else {
-//                                        eventFlage = false;
-//                                    }
-//                                }
-                            } else {
-                                if (eventFlage) {
-//                                    opcAsyncTask.addEventRawData(rawData);
+                        if (Blank.isNotEmpty(tableReportId)) {
+                            BigDecimal bigDecimal1 = new BigDecimal(modeValue);
+                            if (eventMode == ConstantStr.EVENT_MODEL_EXCEED) {
+                                if (dmData.compareTo(bigDecimal1) == 1) {
+                                    CursorRawData cursorRawData = new CursorRawData(id, dataSourceId, itemId, javaType, JSON.toJSONString(dmData),
+                                            currentYmdhmss, index, currentYmdh, ConstantStr.NOT_MEET_CHANGE, new Date());
+                                    opcAsyncTask.addCursorRawData(cursorRawData);
+                                } else {
+                                    CursorRawData cursorRawData = new CursorRawData(id, dataSourceId, itemId, javaType, JSON.toJSONString(dmData),
+                                            currentYmdhmss, index, currentYmdh, ConstantStr.IS_MEET_CHANGE, new Date());
+                                    opcAsyncTask.addCursorRawData(cursorRawData);
+                                }
+                            } else if (eventMode == ConstantStr.EVENT_MODEL_LOWER) {
+                                if (dmData.compareTo(bigDecimal1) == -1) {
+                                    CursorRawData cursorRawData = new CursorRawData(id, dataSourceId, itemId, javaType, JSON.toJSONString(dmData),
+                                            currentYmdhmss, index, currentYmdh, ConstantStr.NOT_MEET_CHANGE, new Date());
+                                    opcAsyncTask.addCursorRawData(cursorRawData);
+                                } else {
+                                    CursorRawData cursorRawData = new CursorRawData(id, dataSourceId, itemId, javaType, JSON.toJSONString(dmData),
+                                            currentYmdhmss, index, currentYmdh, ConstantStr.IS_MEET_CHANGE, new Date());
+                                    opcAsyncTask.addCursorRawData(cursorRawData);
+                                }
+                            } else if (eventMode == ConstantStr.EVENT_MODEL_EQUAL) {
+                                if (dmData.compareTo(bigDecimal1) == 0) {
+                                    CursorRawData cursorRawData = new CursorRawData(id, dataSourceId, itemId, javaType, JSON.toJSONString(dmData),
+                                            currentYmdhmss, index, currentYmdh, ConstantStr.NOT_MEET_CHANGE, new Date());
+                                    opcAsyncTask.addCursorRawData(cursorRawData);
+                                } else {
+                                    CursorRawData cursorRawData = new CursorRawData(id, dataSourceId, itemId, javaType, JSON.toJSONString(dmData),
+                                            currentYmdhmss, index, currentYmdh, ConstantStr.IS_MEET_CHANGE, new Date());
+                                    opcAsyncTask.addCursorRawData(cursorRawData);
                                 }
                             }
+                        } else {
+                            CursorRawData cursorRawData = new CursorRawData(id, dataSourceId, itemId, javaType, JSON.toJSONString(dmData),
+                                    currentYmdhmss, index, currentYmdh, ConstantStr.IS_MEET_CHANGE, new Date());
+                            opcAsyncTask.addCursorRawData(cursorRawData);
                         }
                     } catch (Exception e) {
-//                        RawData rawData = new RawData(id, dataSourceId, itemId, javaType, JSON.toJSONString(value.toString()),
-//                                currentYmdhmss, currentYmdh, new Date());
-//                        opcAsyncTask.addTempRawData(rawData);
-//
-//                        if (Blank.isEmpty(eventMode, eventValue, tableReportId)) {
-//                            if (eventFlage) {
-//                                opcAsyncTask.addEventRawData(rawData);
-//                            }
-//                        }
+                        CursorRawData cursorRawData = new CursorRawData(id, dataSourceId, itemId, javaType, JSON.toJSONString(value.toString()),
+                                currentYmdhmss, index, currentYmdh, ConstantStr.IS_MEET_CHANGE, new Date());
+                        opcAsyncTask.addCursorRawData(cursorRawData);
                     }
                 }
             } catch (Exception e) {
                 //执行组装数据库的数据,以及生成驱动报表
                 opcAsyncTask.packageRawDataList(itemList, dataSourceId, sqlCurrentYmdh);
-                if (  Blank.isNotEmpty(tableReportId)) {
-                    opcAsyncTask.eventTable(dataSourceId, itemList, sqlCurrentYmdh);
-                }
-                redisUtil.del(ConstantStr.VALUE_BELONG_TIME + id);
 
                 messageNoticeDao.addMessageNotice(new MessageNotice(itemGroup.getUserId(),
                         itemGroup.getGroupName() + "-" + itemId + DateUtil.dateChangeStrYmdhms(new Date()) + "运行失败",
                         e.getMessage(),
                         ConstantStr.NO_READ));
 
+                itemGroupDao.stopItemGroupById(id, ConstantStr.EXCEPT_STOP_UP);
                 if (Blank.isNotEmpty(opcUaClient)) {
                     opcUaClient.disconnect();
                 }
-
-                redisUtil.del(ConstantStr.ITEM_GROUP + id);
-                itemGroupDao.runItemGroupById(id, ConstantStr.STOP_IT);
-                timer.cancel();
+                cronTaskRegister.removeCronTask(cronId);
+                if (Blank.isNotEmpty(timer)) {
+                    timer.cancel();
+                }
             }
         }
-        Boolean flage = (Boolean) redisUtil.get(ConstantStr.ITEM_GROUP + id);
-        if (Blank.isEmpty(flage)) {
-            flage = false;
-        }
-        if (!flage) {
+        if (System.currentTimeMillis() >= endTime) {
             //执行组装数据库的数据,以及生成驱动报表
-            String sqlCurrentYmdh = String.valueOf(redisUtil.get(ConstantStr.VALUE_BELONG_TIME + id));
             opcAsyncTask.packageRawDataList(itemList, dataSourceId, sqlCurrentYmdh);
-            opcAsyncTask.eventTable(dataSourceId, itemList, sqlCurrentYmdh);
-            redisUtil.del(ConstantStr.VALUE_BELONG_TIME + id);
-
-            messageNoticeDao.addMessageNotice(new MessageNotice(itemGroup.getUserId(),
-                    itemGroup.getGroupName() + DateUtil.dateChangeStrYmdhms(new Date()) + "运行停止",
-                    "请刷新检查组的运行状态",
-                    ConstantStr.NO_READ));
 
             if (Blank.isNotEmpty(opcUaClient)) {
                 opcUaClient.disconnect();
             }
-
-            redisUtil.del(ConstantStr.ITEM_GROUP + id);
-            itemGroupDao.runItemGroupById(id, ConstantStr.STOP_IT);
-            timer.cancel();
+            if (Blank.isNotEmpty(timer)) {
+                timer.cancel();
+            }
         }
     }
 }

+ 56 - 107
chaunyi_opc/opc_ua/src/main/java/com/example/opc_ua/task/OpcUaFrequencyTask.java

@@ -2,11 +2,14 @@ package com.example.opc_ua.task;
 
 import com.alibaba.fastjson.JSON;
 import com.example.opc_common.entity.*;
-import com.example.opc_common.util.*;
+import com.example.opc_common.util.Blank;
+import com.example.opc_common.util.ConstantStr;
+import com.example.opc_common.util.DateUtil;
+import com.example.opc_common.util.MathUtil;
 import com.example.opc_ua.dao.ItemGroupDao;
 import com.example.opc_ua.dao.MessageNoticeDao;
+import com.example.opc_ua.dynamicSchedule.CronTaskRegister;
 import com.example.opc_ua.util.OpcUaUtil;
-import com.example.opc_ua.util.RedisUtil;
 import lombok.extern.slf4j.Slf4j;
 import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
 import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue;
@@ -19,10 +22,12 @@ import java.util.*;
 @Slf4j
 public class OpcUaFrequencyTask extends TimerTask {
 
-    private final RedisUtil redisUtil;
-
     private final OpcAsyncTask opcAsyncTask;
 
+    private final String cronId;
+
+    private final CronTaskRegister cronTaskRegister;
+
     private final ItemGroupDao itemGroupDao;
 
     private final MessageNoticeDao messageNoticeDao;
@@ -41,21 +46,29 @@ public class OpcUaFrequencyTask extends TimerTask {
 
     private final String timeFormat;
 
-    private Boolean eventFlage = false;
-
-    public OpcUaFrequencyTask(RedisUtil redisUtil,
-                              OpcAsyncTask opcAsyncTask,
-                              ItemGroupDao itemGroupDao,
-                              MessageNoticeDao messageNoticeDao,
-                              OpcUaClient opcUaClient,
-                              List<Item> itemList,
-                              Map<String, DataModel> map,
-                              ItemGroup itemGroup,
-                              DataSource dataSource,
-                              Timer timer,
-                              String timeFormat) {
-        this.redisUtil = redisUtil;
+    private final Long endTime;
+
+    private String sqlCurrentYmdh;
+
+    private Long index = 0L;
+
+    public OpcUaFrequencyTask(
+            OpcAsyncTask opcAsyncTask,
+            String cronId,
+            CronTaskRegister cronTaskRegister,
+            ItemGroupDao itemGroupDao,
+            MessageNoticeDao messageNoticeDao,
+            OpcUaClient opcUaClient,
+            List<Item> itemList,
+            Map<String, DataModel> map,
+            ItemGroup itemGroup,
+            DataSource dataSource,
+            Timer timer,
+            String timeFormat,
+            Long endTime) {
         this.opcAsyncTask = opcAsyncTask;
+        this.cronId = cronId;
+        this.cronTaskRegister = cronTaskRegister;
         this.itemGroupDao = itemGroupDao;
         this.messageNoticeDao = messageNoticeDao;
         this.opcUaClient = opcUaClient;
@@ -65,18 +78,18 @@ public class OpcUaFrequencyTask extends TimerTask {
         this.dataSource = dataSource;
         this.timer = timer;
         this.timeFormat = timeFormat;
+        this.endTime = endTime;
     }
 
     @Override
     public void run() {
+        index++;
         Integer id = itemGroup.getId();
         Integer dataSourceId = dataSource.getId();
         for (Item item : itemList) {
             String itemId = item.getItemReadName();
-            Integer tableReportId = item.getTableReportId();
             NodeId n = new NodeId(item.getNodeIndex(), itemId);
             DataModel dm = map.get(itemId);
-            String sqlCurrentYmdh = String.valueOf(redisUtil.get(ConstantStr.VALUE_BELONG_TIME + id + itemId));
             try {
                 DataValue dataValue = opcUaClient.readValue(0.0, TimestampsToReturn.Both, n).get();
                 if (!dataValue.getStatusCode().isGood()) {
@@ -87,131 +100,67 @@ public class OpcUaFrequencyTask extends TimerTask {
                 Date time = dataValue.getServerTime().getJavaDate();
 
                 String currentYmdh = DateUtil.dateChangeStr(time, timeFormat);
+                String currentYmdhmss = DateUtil.dateChangeStrYmdhmss(time);
                 if (Blank.isEmpty(sqlCurrentYmdh)) {
                     sqlCurrentYmdh = currentYmdh;
-                    redisUtil.set(ConstantStr.VALUE_BELONG_TIME + id, currentYmdh, ConstantStr.TWO_HOUR);
                 } else {
                     if (!sqlCurrentYmdh.equals(currentYmdh)) {
                         //组装相应的数据
-                        opcAsyncTask.packageRawData(item, dataSourceId, sqlCurrentYmdh);
-                        if ( Blank.isNotEmpty(tableReportId)) {
-                            opcAsyncTask.eventTable(dataSourceId, itemList, sqlCurrentYmdh);
-                        }
-                        redisUtil.set(ConstantStr.VALUE_BELONG_TIME + id, currentYmdh, ConstantStr.TWO_HOUR);
+                        opcAsyncTask.packageRawDataList(itemList, dataSourceId, sqlCurrentYmdh);
                         sqlCurrentYmdh = currentYmdh;
                     }
                 }
-                String currentYmdhmss = DateUtil.dateChangeStrYmdhmss(time);
+
                 if (javaType.toLowerCase().equals("boolean")) {
                     //存数据
                     Boolean dmData = JSON.parseObject(value.toString(), Boolean.class);
-//                    RawData rawData = new RawData(id, dataSourceId, itemId, javaType, JSON.toJSONString(dmData),
-//                            currentYmdhmss, currentYmdh, new Date());
-//                    opcAsyncTask.addTempRawData(rawData);
-
-                    if ( Blank.isNotEmpty(tableReportId)) {
-//                        if (eventMode.equals(ConstantStr.EVENT_MODEL_BOOLEAN)) {
-//                            if ((dmData ? ConstantStr.BOOLEAN_TRUE : ConstantStr.BOOLEAN_FALSE).equals(eventValue.intValue())) {
-//                                eventFlage = true;
-//                                opcAsyncTask.addEventRawData(rawData);
-//                            } else {
-//                                eventFlage = false;
-//                            }
-//                        }
-                    } else {
-                        if (eventFlage) {
-//                            opcAsyncTask.addEventRawData(rawData);
-                        }
-                    }
+                    CursorRawData cursorRawData = new CursorRawData(id, dataSourceId, itemId, javaType, JSON.toJSONString(dmData),
+                            currentYmdhmss, index, currentYmdh, ConstantStr.NOT_MEET_CHANGE, new Date());
+                    opcAsyncTask.addCursorRawData(cursorRawData);
                 } else {
                     try {
                         BigDecimal bigDecimal = JSON.parseObject(value.toString(), BigDecimal.class);
                         BigDecimal dmData = Blank.isNotEmpty(dm) ?
                                 MathUtil.quadricOperation(dm.getMathParameter(), dm.getOperationRule(), bigDecimal) :
                                 bigDecimal;
-//                        RawData rawData = new RawData(id, dataSourceId, itemId, javaType, JSON.toJSONString(dmData),
-//                                currentYmdhmss, currentYmdh, new Date());
-//                        opcAsyncTask.addTempRawData(rawData);
-
-                        if ( Blank.isNotEmpty(tableReportId)) {
-//                            BigDecimal bigEventValue = new BigDecimal(eventValue);
-//                            if (eventMode.equals(ConstantStr.EVENT_MODEL_EXCEED)) {
-//                                if (dmData.compareTo(bigEventValue) == 1) {
-//                                    eventFlage = true;
-//                                    opcAsyncTask.addEventRawData(rawData);
-//                                } else {
-//                                    eventFlage = false;
-//                                }
-//                            } else if (eventMode.equals(ConstantStr.EVENT_MODEL_LOWER)) {
-//                                if (dmData.compareTo(bigEventValue) == -1) {
-//                                    eventFlage = true;
-//                                    opcAsyncTask.addEventRawData(rawData);
-//                                } else {
-//                                    eventFlage = false;
-//                                }
-//                            }
-                        } else {
-                            if (eventFlage) {
-//                                opcAsyncTask.addEventRawData(rawData);
-                            }
-                        }
+                        CursorRawData cursorRawData = new CursorRawData(id, dataSourceId, itemId, javaType, JSON.toJSONString(dmData),
+                                currentYmdhmss, index, currentYmdh, ConstantStr.NOT_MEET_CHANGE, new Date());
+                        opcAsyncTask.addCursorRawData(cursorRawData);
                     } catch (Exception e) {
-//                        RawData rawData = new RawData(id, dataSourceId, itemId, javaType, JSON.toJSONString(value.toString()),
-//                                currentYmdhmss, currentYmdh, new Date());
-//                        opcAsyncTask.addTempRawData(rawData);
-//
-//                        if (Blank.isEmpty( tableReportId)) {
-//                            if (eventFlage) {
-//                                opcAsyncTask.addEventRawData(rawData);
-//                            }
-//                        }
+                        CursorRawData cursorRawData = new CursorRawData(id, dataSourceId, itemId, javaType, JSON.toJSONString(value.toString()),
+                                currentYmdhmss, index, currentYmdh, ConstantStr.NOT_MEET_CHANGE, new Date());
+                        opcAsyncTask.addCursorRawData(cursorRawData);
                     }
                 }
             } catch (Exception e) {
                 //执行组装数据库的数据,以及生成驱动报表
                 opcAsyncTask.packageRawDataList(itemList, dataSourceId, sqlCurrentYmdh);
-                if ( Blank.isNotEmpty(tableReportId)) {
-                    opcAsyncTask.eventTable(dataSourceId, itemList, sqlCurrentYmdh);
-                }
-                redisUtil.del(ConstantStr.VALUE_BELONG_TIME + id);
 
                 messageNoticeDao.addMessageNotice(new MessageNotice(itemGroup.getUserId(),
-                        itemGroup.getGroupName() + "-" + itemId + DateUtil.dateChangeStrYmdhms(new Date()) + "运行失败",
-                        e.getMessage(),
+                        itemGroup.getGroupName() + "-" + itemId + DateUtil.dateChangeStrYmdhms(new Date()) + "运行停止",
+                        OpcUaUtil.genException(e.getMessage()),
                         ConstantStr.NO_READ));
 
+                itemGroupDao.stopItemGroupById(id, ConstantStr.EXCEPT_STOP_UP);
                 if (Blank.isNotEmpty(opcUaClient)) {
                     opcUaClient.disconnect();
                 }
-
-                redisUtil.del(ConstantStr.ITEM_GROUP + id);
-                itemGroupDao.runItemGroupById(id, ConstantStr.STOP_IT);
-                timer.cancel();
+                cronTaskRegister.removeCronTask(cronId);
+                if (Blank.isNotEmpty(timer)) {
+                    timer.cancel();
+                }
             }
         }
-        Boolean flage = (Boolean) redisUtil.get(ConstantStr.ITEM_GROUP + id);
-        if (Blank.isEmpty(flage)) {
-            flage = false;
-        }
-        if (!flage) {
+        if (System.currentTimeMillis() >= endTime) {
             //执行组装数据库的数据,以及生成驱动报表
-            String sqlCurrentYmdh = String.valueOf(redisUtil.get(ConstantStr.VALUE_BELONG_TIME + id));
             opcAsyncTask.packageRawDataList(itemList, dataSourceId, sqlCurrentYmdh);
-            opcAsyncTask.eventTable(dataSourceId, itemList, sqlCurrentYmdh);
-            redisUtil.del(ConstantStr.VALUE_BELONG_TIME + id);
-
-            messageNoticeDao.addMessageNotice(new MessageNotice(itemGroup.getUserId(),
-                    itemGroup.getGroupName() + DateUtil.dateChangeStrYmdhms(new Date()) + "运行停止",
-                    "请刷新检查组的运行状态",
-                    ConstantStr.NO_READ));
 
             if (Blank.isNotEmpty(opcUaClient)) {
                 opcUaClient.disconnect();
             }
-
-            redisUtil.del(ConstantStr.ITEM_GROUP + id);
-            itemGroupDao.runItemGroupById(id, ConstantStr.STOP_IT);
-            timer.cancel();
+            if (Blank.isNotEmpty(timer)) {
+                timer.cancel();
+            }
         }
     }
 }

+ 0 - 196
chaunyi_opc/opc_ua/src/main/java/com/example/opc_ua/task/OpcUaLowerTask.java

@@ -1,196 +0,0 @@
-package com.example.opc_ua.task;
-
-import com.alibaba.fastjson.JSON;
-import com.example.opc_common.entity.*;
-import com.example.opc_common.util.*;
-import com.example.opc_ua.dao.ItemGroupDao;
-import com.example.opc_ua.dao.MessageNoticeDao;
-import com.example.opc_ua.util.OpcUaUtil;
-import com.example.opc_ua.util.RedisUtil;
-import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
-import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue;
-import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
-import org.eclipse.milo.opcua.stack.core.types.enumerated.TimestampsToReturn;
-
-import java.math.BigDecimal;
-import java.util.*;
-
-public class OpcUaLowerTask extends TimerTask {
-    private final RedisUtil redisUtil;
-
-    private final OpcAsyncTask opcAsyncTask;
-
-    private final ItemGroupDao itemGroupDao;
-
-    private final MessageNoticeDao messageNoticeDao;
-
-    private final OpcUaClient opcUaClient;
-
-    private final List<Item> itemList;
-
-    private final Map<String, DataModel> map;
-
-    private final ItemGroup itemGroup;
-
-    private final DataSource dataSource;
-
-    private final Timer timer;
-
-    private final String timeFormat;
-
-    private Boolean eventFlage = false;
-
-    public OpcUaLowerTask(RedisUtil redisUtil,
-                          OpcAsyncTask opcAsyncTask,
-                          ItemGroupDao itemGroupDao,
-                          MessageNoticeDao messageNoticeDao,
-                          OpcUaClient opcUaClient,
-                          List<Item> itemList,
-                          Map<String, DataModel> map,
-                          ItemGroup itemGroup,
-                          DataSource dataSource,
-                          Timer timer,
-                          String timeFormat) {
-        this.redisUtil = redisUtil;
-        this.opcAsyncTask = opcAsyncTask;
-        this.itemGroupDao = itemGroupDao;
-        this.messageNoticeDao = messageNoticeDao;
-        this.opcUaClient = opcUaClient;
-        this.itemList = itemList;
-        this.map = map;
-        this.itemGroup = itemGroup;
-        this.dataSource = dataSource;
-        this.timer = timer;
-        this.timeFormat = timeFormat;
-    }
-
-    @Override
-    public void run() {
-        Integer id = itemGroup.getId();
-        Integer dataSourceId = dataSource.getId();
-        for (Item item : itemList) {
-            String itemId = item.getItemReadName();
-            Integer tableReportId = item.getTableReportId();
-            NodeId n = new NodeId(item.getNodeIndex(), itemId);
-            DataModel dm = map.get(itemId);
-            BigDecimal bigModeValue = new BigDecimal(itemGroup.getModeValue());
-            String sqlCurrentYmdh = String.valueOf(redisUtil.get(ConstantStr.VALUE_BELONG_TIME + id + itemId));
-            try {
-                DataValue dataValue = opcUaClient.readValue(0.0, TimestampsToReturn.Both, n).get();
-                if (!dataValue.getStatusCode().isGood()) {
-                    continue;
-                }
-                Object value = dataValue.getValue().getValue();
-                String javaType = OpcUaUtil.getValType(dataValue);
-                Date time = dataValue.getServerTime().getJavaDate();
-
-                String currentYmdh = DateUtil.dateChangeStr(time, timeFormat);
-                if (Blank.isEmpty(sqlCurrentYmdh)) {
-                    sqlCurrentYmdh = currentYmdh;
-                    redisUtil.set(ConstantStr.VALUE_BELONG_TIME + id, currentYmdh, ConstantStr.TWO_HOUR);
-                } else {
-                    if (!sqlCurrentYmdh.equals(currentYmdh)) {
-                        //组装相应的数据
-                        opcAsyncTask.packageRawData(item, dataSourceId, sqlCurrentYmdh);
-                        if ( Blank.isNotEmpty(tableReportId)) {
-                            opcAsyncTask.eventTable(dataSourceId, itemList, sqlCurrentYmdh);
-                        }
-                        redisUtil.set(ConstantStr.VALUE_BELONG_TIME + id, currentYmdh, ConstantStr.TWO_HOUR);
-                        sqlCurrentYmdh = currentYmdh;
-                    }
-                }
-                String currentYmdhmss = DateUtil.dateChangeStrYmdhmss(time);
-                if (!javaType.toLowerCase().equals("boolean")) {
-                    try {
-                        BigDecimal bigDecimal = JSON.parseObject(value.toString(), BigDecimal.class);
-                        BigDecimal dmData = Blank.isNotEmpty(dm) ?
-                                MathUtil.quadricOperation(dm.getMathParameter(), dm.getOperationRule(), bigDecimal) :
-                                bigDecimal;
-                        if (dmData.compareTo(bigModeValue) == -1) {
-//                            RawData rawData = new RawData(id, dataSourceId, itemId, javaType, JSON.toJSONString(dmData),
-//                                    currentYmdhmss, currentYmdh, new Date());
-//                            opcAsyncTask.addTempRawData(rawData);
-
-                            if ( Blank.isNotEmpty(tableReportId)) {
-//                                BigDecimal bigEventValue = new BigDecimal(eventValue);
-//                                if (eventMode.equals(ConstantStr.EVENT_MODEL_EXCEED)) {
-//                                    if (dmData.compareTo(bigEventValue) == 1) {
-//                                        eventFlage = true;
-//                                        opcAsyncTask.addEventRawData(rawData);
-//                                    } else {
-//                                        eventFlage = false;
-//                                    }
-//                                } else if (eventMode.equals(ConstantStr.EVENT_MODEL_LOWER)) {
-//                                    if (dmData.compareTo(bigEventValue) == -1) {
-//                                        eventFlage = true;
-//                                        opcAsyncTask.addEventRawData(rawData);
-//                                    } else {
-//                                        eventFlage = false;
-//                                    }
-//                                }
-                            } else {
-//                                if (eventFlage) {
-//                                    opcAsyncTask.addEventRawData(rawData);
-//                                }
-                            }
-                        }
-                    } catch (Exception e) {
-//                        RawData rawData = new RawData(id, dataSourceId, itemId, javaType, JSON.toJSONString(value.toString()),
-//                                currentYmdhmss, currentYmdh, new Date());
-//                        opcAsyncTask.addTempRawData(rawData);
-//
-//                        if (Blank.isEmpty(eventMode, eventValue, tableReportId)) {
-//                            if (eventFlage) {
-//                                opcAsyncTask.addEventRawData(rawData);
-//                            }
-//                        }
-                    }
-                }
-            } catch (Exception e) {
-                //执行组装数据库的数据,以及生成驱动报表
-                opcAsyncTask.packageRawDataList(itemList, dataSourceId, sqlCurrentYmdh);
-                if ( Blank.isNotEmpty(tableReportId)) {
-                    opcAsyncTask.eventTable(dataSourceId, itemList, sqlCurrentYmdh);
-                }
-                redisUtil.del(ConstantStr.VALUE_BELONG_TIME + id);
-
-                messageNoticeDao.addMessageNotice(new MessageNotice(itemGroup.getUserId(),
-                        itemGroup.getGroupName() + "-" + itemId + DateUtil.dateChangeStrYmdhms(new Date()) + "运行失败",
-                        e.getMessage(),
-                        ConstantStr.NO_READ));
-
-                if (Blank.isNotEmpty(opcUaClient)) {
-                    opcUaClient.disconnect();
-                }
-
-                redisUtil.del(ConstantStr.ITEM_GROUP + id);
-                itemGroupDao.runItemGroupById(id, ConstantStr.STOP_IT);
-                timer.cancel();
-            }
-        }
-        Boolean flage = (Boolean) redisUtil.get(ConstantStr.ITEM_GROUP + id);
-        if (Blank.isEmpty(flage)) {
-            flage = false;
-        }
-        if (!flage) {
-            //执行组装数据库的数据,以及生成驱动报表
-            String sqlCurrentYmdh = String.valueOf(redisUtil.get(ConstantStr.VALUE_BELONG_TIME + id));
-            opcAsyncTask.packageRawDataList(itemList, dataSourceId, sqlCurrentYmdh);
-            opcAsyncTask.eventTable(dataSourceId, itemList, sqlCurrentYmdh);
-            redisUtil.del(ConstantStr.VALUE_BELONG_TIME + id);
-
-            messageNoticeDao.addMessageNotice(new MessageNotice(itemGroup.getUserId(),
-                    itemGroup.getGroupName() + DateUtil.dateChangeStrYmdhms(new Date()) + "运行停止",
-                    "请刷新检查组的运行状态",
-                    ConstantStr.NO_READ));
-
-            if (Blank.isNotEmpty(opcUaClient)) {
-                opcUaClient.disconnect();
-            }
-
-            redisUtil.del(ConstantStr.ITEM_GROUP + id);
-            itemGroupDao.runItemGroupById(id, ConstantStr.STOP_IT);
-            timer.cancel();
-        }
-    }
-}

+ 128 - 111
chaunyi_opc/opc_ua/src/main/java/com/example/opc_ua/task/OpcUaTask.java

@@ -1,138 +1,155 @@
 package com.example.opc_ua.task;
 
-import com.example.opc_common.entity.*;
+import com.example.opc_common.entity.DataModel;
+import com.example.opc_common.entity.DataSource;
+import com.example.opc_common.entity.Item;
+import com.example.opc_common.entity.ItemGroup;
 import com.example.opc_common.enums.ResultEnum;
+import com.example.opc_common.exception.CustomException;
 import com.example.opc_common.util.Blank;
 import com.example.opc_common.util.ConstantStr;
 import com.example.opc_common.util.DateUtil;
-import com.example.opc_common.util.Result;
+import com.example.opc_ua.dao.DataModelDao;
 import com.example.opc_ua.dao.ItemGroupDao;
 import com.example.opc_ua.dao.MessageNoticeDao;
+import com.example.opc_ua.dynamicSchedule.CronTaskRegister;
 import com.example.opc_ua.util.OpcUaUtil;
 import com.example.opc_ua.util.RedisUtil;
 import lombok.extern.slf4j.Slf4j;
 import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
-import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
 
 import java.util.*;
 
 @Slf4j
-public class OpcUaTask extends TimerTask {
-
-    private final RedisUtil redisUtil;
-
-    private final OpcAsyncTask opcAsyncTask;
-
-    private final ItemGroupDao itemGroupDao;
-
-    private final MessageNoticeDao messageNoticeDao;
-
-    private final Timer pTimer;
-
-    private final ItemGroup itemGroup;
-
-    private final DataSource dataSource;
-
-    private final Map<String, DataModel> map;
-
-    private final List<Item> itemList;
-
-    private final String timeFormat;
-
-    public OpcUaTask(RedisUtil redisUtil,
-                     OpcAsyncTask opcAsyncTask,
-                     ItemGroupDao itemGroupDao,
-                     MessageNoticeDao messageNoticeDao,
-                     Timer pTimer,
-                     ItemGroup itemGroup,
-                     DataSource dataSource,
-                     Map<String, DataModel> map,
-                     List<Item> itemList,
-                     String timeFormat) {
-        this.redisUtil = redisUtil;
-        this.opcAsyncTask = opcAsyncTask;
-        this.itemGroupDao = itemGroupDao;
-        this.messageNoticeDao = messageNoticeDao;
-        this.pTimer = pTimer;
-        this.itemGroup = itemGroup;
-        this.dataSource = dataSource;
-        this.map = map;
-        this.itemList = itemList;
-        this.timeFormat = timeFormat;
-    }
+@Component
+public class OpcUaTask {
+
+    @Autowired
+    private DataModelDao dataModelDao;
+
+    @Autowired
+    private ItemGroupDao itemGroupDao;
+
+    @Autowired
+    private OpcAsyncTask opcAsyncTask;
+
+    @Autowired
+    private CronTaskRegister cronTaskRegister;
+
+    @Autowired
+    private RedisUtil redisUtil;
+
+    @Autowired
+    private MessageNoticeDao messageNoticeDao;
+
+    @Value("${opc_storage.time_format}")
+    private String timeFormat;
+
+    public void opcUaTask(ItemGroup itemGroup, DataSource dataSource, String cronId) {
+        Integer id = itemGroup.getId();
+        Integer readMode = itemGroup.getReadMode();
+        String startTime = DateUtil.getCurrentYmd() + " " + itemGroup.getStartReadTime();
+        String endTime = DateUtil.getCurrentYmd() + " " + itemGroup.getEndReadTime();
+        Date startDate = DateUtil.strYmdhmsChangeDate(startTime);
+        Date endDate = DateUtil.strYmdhmsChangeDate(endTime);
+
+        Boolean flage = false;
+        //获取组中所有标签的数据模型,并转换为map<itemName,DataModel>
+        List<Item> allItemList = itemGroupDao.getItemListByGroupId(id);
+        itemGroup.setItemList(allItemList);
+        if (Blank.isNotEmpty(allItemList)) {
+            for (Item item : allItemList) {
+                if (Blank.isNotEmpty(item.getDataModelId())) {
+                    flage = true;
+                    break;
+                }
+            }
+        }
+        Map<String, DataModel> map = new HashMap<>();
+        if (flage) {
+            List<DataModel> dmListByItemList = dataModelDao.getDmListByItemList(allItemList);
+            if (Blank.isNotEmpty(dmListByItemList)) {
+                for (int i = 0; i < allItemList.size(); i++) {
+                    for (DataModel dm : dmListByItemList) {
+                        if (dm.getId().equals(allItemList.get(i).getDataModelId())) {
+                            map.put(allItemList.get(i).getItemReadName(), dm);
+                            break;
+                        }
+                    }
+                }
+            }
+        }
 
-    @Override
-    public void run() {
+        List<Item> itemList = itemGroupDao.getItemListByGroupId(id);
         OpcUaClient opcUaClient = null;
+        Timer timer = new Timer();
         try {
             opcUaClient = OpcUaUtil.createClient(dataSource);
-            Timer timer = new Timer();
-            if (itemGroup.getReadMode() == ConstantStr.ON_FREQUENCY) {
-                opcUaClient.connect().get();
-                timer.schedule(new OpcUaFrequencyTask(redisUtil,
-                                opcAsyncTask,
-                                itemGroupDao,
-                                messageNoticeDao,
-                                opcUaClient,
-                                itemList,
-                                map,
-                                itemGroup,
-                                dataSource,
-                                timer,
-                                timeFormat),
-                        0, (int) (Math.round(itemGroup.getModeValue() * 1000)));
-            } else if (itemGroup.getReadMode() == ConstantStr.ON_CHANGE) {
-                timer.schedule(new OpcUaChangeTask(redisUtil,
-                                opcAsyncTask,
-                                itemGroupDao,
-                                messageNoticeDao,
-                                opcUaClient,
-                                itemList,
-                                map,
-                                itemGroup,
-                                dataSource,
-                                timer,
-                                timeFormat),
-                        0, 1000);
-            } else if (itemGroup.getReadMode() == ConstantStr.EXCEED_SET_VALUE) {
-                timer.schedule(new OpcUaExceedTask(redisUtil,
-                                opcAsyncTask,
-                                itemGroupDao,
-                                messageNoticeDao,
-                                opcUaClient,
-                                itemList,
-                                map,
-                                itemGroup,
-                                dataSource,
-                                timer,
-                                timeFormat),
-                        0, 1000);
-            } else if (itemGroup.getReadMode() == ConstantStr.LOWER_SET_VALUE) {
-                timer.schedule(new OpcUaLowerTask(redisUtil,
-                                opcAsyncTask,
-                                itemGroupDao,
-                                messageNoticeDao,
-                                opcUaClient,
-                                itemList,
-                                map,
-                                itemGroup,
-                                dataSource,
-                                timer,
-                                timeFormat),
-                        0, 1000);
-            }
         } catch (Exception e) {
-            String message = OpcUaUtil.genException(e.getMessage());
-            messageNoticeDao.addMessageNotice(new MessageNotice(itemGroup.getUserId(),
-                    itemGroup.getGroupName() + DateUtil.dateChangeStrYmdhms(new Date()) + "运行失败",
-                    message,
-                    ConstantStr.NO_READ));
+            itemGroupDao.stopItemGroupById(id, ConstantStr.EXCEPT_STOP_UP);
             if (Blank.isNotEmpty(opcUaClient)) {
                 opcUaClient.disconnect();
             }
-            redisUtil.del(ConstantStr.ITEM_GROUP + itemGroup.getId());
-            itemGroupDao.runItemGroupById(itemGroup.getId(), ConstantStr.STOP_IT);
-            pTimer.cancel();
+            if (Blank.isNotEmpty(timer)) {
+                timer.cancel();
+            }
+            cronTaskRegister.removeCronTask(cronId);
+        }
+        if (readMode == ConstantStr.ON_FREQUENCY) {
+            timer.schedule(new OpcUaFrequencyTask(
+                            opcAsyncTask,
+                            cronId,
+                            cronTaskRegister,
+                            itemGroupDao,
+                            messageNoticeDao,
+                            opcUaClient,
+                            itemList,
+                            map,
+                            itemGroup,
+                            dataSource,
+                            timer,
+                            timeFormat,
+                            endDate.getTime()),
+                    startDate, (int) (Math.round(itemGroup.getModeValue() * 1000)));
+        } else if (readMode == ConstantStr.ON_CHANGE) {
+            timer.schedule(new OpcUaChangeTask(
+                            redisUtil,
+                            opcAsyncTask,
+                            cronId,
+                            cronTaskRegister,
+                            itemGroupDao,
+                            messageNoticeDao,
+                            opcUaClient,
+                            itemList,
+                            map,
+                            itemGroup,
+                            dataSource,
+                            timer,
+                            timeFormat,
+                            endDate.getTime()),
+                    startDate, (int) (Math.round(itemGroup.getModeValue() * 1000)));
+        } else if (readMode == ConstantStr.EXCEED_SET_VALUE) {
+            timer.schedule(new OpcUaExceedTask(
+                            opcAsyncTask,
+                            cronId,
+                            cronTaskRegister,
+                            itemGroupDao,
+                            messageNoticeDao,
+                            opcUaClient,
+                            itemList,
+                            map,
+                            itemGroup,
+                            dataSource,
+                            timer,
+                            timeFormat,
+                            endDate.getTime()),
+                    startDate, (int) (Math.round(itemGroup.getModeValue() * 1000)));
+        } else {
+            cronTaskRegister.removeCronTask(cronId);
+            throw new CustomException(ResultEnum.REQUEST_WRONGPARAMS.getRespCode(), "目前未适配此种采样模式");
         }
 
     }

+ 0 - 91
chaunyi_opc/opc_ua/src/main/java/com/example/opc_ua/task/OpcUaWeekTask.java

@@ -1,91 +0,0 @@
-package com.example.opc_ua.task;
-
-import com.example.opc_common.entity.DataModel;
-import com.example.opc_common.entity.DataSource;
-import com.example.opc_common.entity.Item;
-import com.example.opc_common.entity.ItemGroup;
-import com.example.opc_common.util.Blank;
-import com.example.opc_common.util.ConstantStr;
-import com.example.opc_common.util.DateUtil;
-import com.example.opc_ua.dao.DataModelDao;
-import com.example.opc_ua.dao.ItemGroupDao;
-import com.example.opc_ua.dao.MessageNoticeDao;
-import com.example.opc_ua.util.RedisUtil;
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.stereotype.Component;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Timer;
-
-@Slf4j
-@Component
-public class OpcUaWeekTask {
-
-    @Autowired
-    private DataModelDao dataModelDao;
-
-    @Autowired
-    private ItemGroupDao itemGroupDao;
-
-    @Autowired
-    private OpcAsyncTask opcAsyncTask;
-
-    @Autowired
-    private RedisUtil redisUtil;
-
-    @Autowired
-    private MessageNoticeDao messageNoticeDao;
-
-    @Value("${opc_storage.time_format}")
-    private String timeFormat;
-
-    public void opcUaTask(ItemGroup itemGroup, DataSource dataSource) {
-        Integer id = itemGroup.getId();
-        Boolean flage = false;
-        //获取组中所有标签的数据模型,并转换为map<itemName,DataModel>
-        List<Item> allItemList = itemGroupDao.getItemListByGroupId(id);
-        itemGroup.setItemList(allItemList);
-        if (Blank.isNotEmpty(allItemList)) {
-            for (Item item : allItemList) {
-                if (Blank.isNotEmpty(item.getDataModelId())) {
-                    flage = true;
-                    break;
-                }
-            }
-        }
-        Map<String, DataModel> map = new HashMap<>();
-        if (flage) {
-            List<DataModel> dmListByItemList = dataModelDao.getDmListByItemList(allItemList);
-            if (Blank.isNotEmpty(dmListByItemList)) {
-                for (int i = 0; i < allItemList.size(); i++) {
-                    for (DataModel dm : dmListByItemList) {
-                        if (dm.getId().equals(allItemList.get(i).getDataModelId())) {
-                            map.put(allItemList.get(i).getItemReadName(), dm);
-                            break;
-                        }
-                    }
-                }
-            }
-        }
-
-        Timer timer = new Timer();
-        List<Item> itemList = itemGroupDao.getItemListByGroupId(id);
-        //异步读取opcUA数据并保存
-        timer.schedule(new OpcUaTask(redisUtil,
-                        opcAsyncTask,
-                        itemGroupDao,
-                        messageNoticeDao,
-                        timer,
-                        itemGroup,
-                        dataSource,
-                        map,
-                        itemList,
-                        timeFormat),
-                DateUtil.strYmdhmsChangeDate(DateUtil.getCurrentYmd() + " " + itemGroup.getStartReadTime()), ConstantStr.PERIOD_DAY);
-
-    }
-}

+ 10 - 1
chaunyi_opc/opc_ua/src/main/resources/mapper/ItemGroupDao.xml

@@ -33,11 +33,20 @@
         select
         <include refid="itemGroup"/>
         from t_item_group
-        where user_id = #{userId}
+        <if test="userId != null and userId != ''">
+            where user_id = #{userId}
+        </if>
     </select>
 
     <update id="runItemGroupById">
         update t_item_group
+        set run_state=#{runState},
+            cron_id=#{cronId}
+        where id = #{id}
+    </update>
+
+    <update id="stopItemGroupById">
+        update t_item_group
         set run_state=#{runState}
         where id = #{id}
     </update>

+ 51 - 1
chaunyi_opc/opc_ua/src/main/resources/mapper/RawDataDao.xml

@@ -2,6 +2,11 @@
 <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
 <mapper namespace="com.example.opc_ua.dao.RawDataDao">
 
+    <sql id="cursorRawData">
+        id
+        , item_group_id, data_source_id, item_name, data_type, data_value, data_value_time, data_index, value_belong_time, is_meet_change, create_time
+    </sql>
+
     <sql id="rawData">
         id
         , item_group_id, data_source_id, item_name, data_type, data_value, data_value_time, value_belong_time, create_time
@@ -27,7 +32,7 @@
         </foreach>
     </insert>
 
-    <insert id="addTempRawData">
+    <insert id="addCursorRawData">
         insert into t_raw_data
         (item_group_id, data_source_id, item_name, data_type, data_value, data_value_time, value_belong_time,
          create_time)
@@ -73,6 +78,14 @@
           and value_belong_time = #{valueBelongTime}
     </delete>
 
+    <delete id="delCursorRawDataList">
+        delete
+        from t_raw_data
+        where item_group_id = #{itemGroupId}
+          and data_source_id = #{dataSourceId}
+          and value_belong_time = #{valueBelongTime}
+    </delete>
+
     <select id="getRawDataList" resultType="com.example.opc_common.entity.RawData">
         select
         <include refid="rawData"/>
@@ -104,4 +117,41 @@
         and value_belong_time = #{valueBelongTime}
     </select>
 
+    <select id="getMeetIndexList" resultType="java.lang.Long">
+        SELECT
+        data_index
+        FROM
+        t_raw_data
+        WHERE
+        item_group_id = #{itemGroupId}
+        AND data_source_id = #{dataSourceId}
+        <if test="itemList!= null and itemList.size() >0">
+            AND item_name IN
+            <foreach collection="itemList" item="item" separator="," open="(" close=")">
+                #{item.itemReadName}
+            </foreach>
+        </if>
+        AND value_belong_time = #{valueBelongTime}
+        AND is_meet_change = #{notMeetChange}
+        GROUP BY
+        data_index;
+    </select>
+
+    <select id="getCursorRawDataList" resultType="com.example.opc_common.entity.CursorRawData">
+        <if test="itemList!= null and itemList.size() >0">
+            select
+            <include refid="cursorRawData"/>
+            from t_raw_data
+            where item_group_id = #{item.itemGroupId}
+            and data_source_id = #{dataSourceId}
+            and item_name = #{item.itemReadName}
+            and value_belong_time = #{valueBelongTime}
+            and data_index in
+            <foreach collection="indexList" item="index" separator="," open="(" close=")">
+                #{index}
+            </foreach>
+            order by data_index desc
+        </if>
+    </select>
+
 </mapper>