소스 검색

1、报表数据采集策略定义一个读取频率为5s/条的策略组(选择的是一个5s/条的数据组),但报表数据库中存储的数据为10s/条
2、采集器采集频率以天为单位启动报错问题

lhy 8 달 전
부모
커밋
ac03a00cd9
18개의 변경된 파일415개의 추가작업 그리고 26개의 파일을 삭제
  1. 1 1
      industry-admin/.env.production
  2. 7 7
      industry-admin/src/views/collector/index.vue
  3. 22 0
      industry-system/cqcy-ei-influxdb/src/main/java/com/cqcy/ei/influxdb/mapper/QueryMapper.java
  4. 1 1
      industry-system/cqcy-ei-influxdb/src/main/java/com/cqcy/ei/influxdb/service/QueryService.java
  5. 19 2
      industry-system/cqcy-ei-influxdb/src/main/java/com/cqcy/ei/influxdb/service/impl/QueryServiceImpl.java
  6. 46 1
      industry-system/cqcy-ei-influxdb/src/main/resources/flux/QueryMapper.xml
  7. 4 4
      industry-system/industry-da/pom.xml
  8. 1 1
      industry-system/industry-da/src/main/java/com/example/opc_da/alarmConfig/DatasouceAlarmTask.java
  9. 6 1
      industry-system/industry-da/src/main/java/com/example/opc_da/dao/CollectorDao.java
  10. 75 0
      industry-system/industry-da/src/main/java/com/example/opc_da/policy/ChangeReportDataPolicyTask.java
  11. 87 0
      industry-system/industry-da/src/main/java/com/example/opc_da/policy/EventReportDataPolicyTask.java
  12. 99 4
      industry-system/industry-da/src/main/java/com/example/opc_da/policy/FreReportDataPolicyTask.java
  13. 6 1
      industry-system/industry-da/src/main/java/com/example/opc_da/policy/ReportDataPolicyTask.java
  14. 8 0
      industry-system/industry-da/src/main/java/com/example/opc_da/service/CollectorService.java
  15. 9 0
      industry-system/industry-da/src/main/java/com/example/opc_da/service/impl/CollectorServiceImpl.java
  16. 1 1
      industry-system/industry-da/src/main/java/com/example/opc_da/util/QueryServiceUtil.java
  17. 8 0
      industry-system/industry-da/src/main/resources/mapper/CollectorDao.xml
  18. 15 2
      新采集器/fast-api/src/main/java/com/ws/fastapi/util/ConfUtil.java

+ 1 - 1
industry-admin/.env.production

@@ -2,5 +2,5 @@
 ENV = 'production'
 
 # base api
+#VUE_APP_BASE_API = 'http://localhost:8081'
 VUE_APP_BASE_API = 'http://10.220.0.124:8081'
-

+ 7 - 7
industry-admin/src/views/collector/index.vue

@@ -111,7 +111,7 @@
           </el-col>
           <el-col :span="1">
             <el-tooltip class="item" effect="dark"
-                        content="将采集的间隔时间取整。比如,如果interval设置为10s,但我们在1分02秒启动了telegraf服务,那么采集的时间会取整到1分10秒,1分20秒,1分30秒"
+                        content="将采集的间隔时间取整。比如,如果采集间隔设置为10s,但我们在1分02秒启动了采集器服务,那么采集的时间会取整到1分10秒,1分20秒,1分30秒"
                         placement="top">
               <i class="el-icon-question" style="line-height: 36px;margin-left: 5px;"></i>
             </el-tooltip>
@@ -125,7 +125,7 @@
           </el-col>
           <el-col :span="1">
             <el-tooltip class="item" effect="dark"
-                        content="所有output的输出间隔,这个参数不应该设的比interval(所有input组件的采集间隔)小"
+                        content="数据缓冲区的输出间隔,这个参数不应设的比数据组所有采集间隔小"
                         placement="top">
               <i class="el-icon-question" style="line-height: 36px;margin-left: 5px;"></i>
             </el-tooltip>
@@ -141,7 +141,7 @@
           </el-col>
           <el-col :span="1">
             <el-tooltip class="item" effect="dark"
-                        content="telegraf一批次从output组件向外发送数据的大小,网络不稳定时可以减小此参数。"
+                        content="采集器一批次从缓冲区向外发送数据的大小,网络不稳定时可以减小此参数。"
                         placement="top">
               <i class="el-icon-question" style="line-height: 36px;margin-left: 5px;"></i>
             </el-tooltip>
@@ -155,7 +155,7 @@
           </el-col>
           <el-col :span="1">
             <el-tooltip class="item" effect="dark"
-                        content="telegraf会为每个output插件创建一个缓冲区,来缓存指标数据,并在output成功将数据发送后,将成功发送的数据从缓冲区删除。所以,metriac_buffer_limit参数应该至少是metric_batch_size参数的两倍"
+                        content="采集器会创建一个缓冲区,来缓存指标数据,并在缓冲区成功将数据发送后,将成功发送的数据从缓冲区删除。所以,缓存量参数应该至少是发送量参数的两倍"
                         placement="top">
               <i class="el-icon-question" style="line-height: 36px;margin-left: 5px;"></i>
             </el-tooltip>
@@ -185,7 +185,7 @@
           </el-col>
           <el-col :span="1">
             <el-tooltip class="item" effect="dark"
-                        content="对output的输出时间加上一个随机的抖动,这主要是为了避免大量的Telegraf实例在同样的时间同时执行写入操作,出现较大的写入峰值。比如,flush_jitter设为5s,flush_interval设为10s意味着会在10~15秒的时候进行一次输出。"
+                        content="对缓冲区的输出时间加上一个随机的抖动,这主要是为了避免大量的采集器实例在同样的时间同时执行写入操作,出现较大的写入峰值。比如,刷新抖动设为5s,刷新间隔设为10s意味着会在10~15秒的时候进行一次输出。"
                         placement="top">
               <i class="el-icon-question" style="line-height: 36px;margin-left: 5px;"></i>
             </el-tooltip>
@@ -345,7 +345,7 @@ export default {
         // 在采集的时间点上加一个随机的抖动,这样可以避免很多插件同时查询一些消耗资源的指标
         collectionJitter: '0',
         // output的输出间隔,这个参数不应该设的比interval
-        flushInterval: '10',
+        flushInterval: '1',
         // output的输出时间加上一个随机的抖动,这主要是为了避免大量的Telegraf实例在同样的时间同时执行写入操作
         flushJitter: '0',
         // 收集的指标四舍五入为指定的精度
@@ -570,7 +570,7 @@ export default {
           metricBatchSize: (data.metricBatchSize || 1000).toString(),
           metricBufferLimit: (data.metricBufferLimit || 10000).toString(),
           collectionJitter: (data.collectionJitter || 0).toString(),
-          flushInterval: (data.flushInterval || 10).toString(),
+          flushInterval: (data.flushInterval || 1).toString(),
           flushJitter: (data.flushJitter || 0).toString(),
           precision: data.precision || '',
           hostname: data.hostname || '',

+ 22 - 0
industry-system/cqcy-ei-influxdb/src/main/java/com/cqcy/ei/influxdb/mapper/QueryMapper.java

@@ -21,6 +21,28 @@ public interface QueryMapper {
     List<Item> getItemDataByLast(@Param("startTime") Long startTime, @Param("endTime") Long endTime,
                                  @Param("items") List<String> items, @Param("table") String dataSource, @Param("myBucket") String myBucket, @Param("myTable") String myTable);
 
+
+
+    /**
+     * 查询时间范围内的最新一批数据
+     * @param startTime 开始时间
+     * @param endTime 结束时间
+     * @param items 点位
+     * @param dataSource 标识
+     * @param groupByInterval 间隔秒数, 返回数据聚合秒数 如:间隔秒数为"15s",返回数据时的秒数位为 0 15 30 45。
+     * @return List<Item>
+     */
+    @QueryApi
+    List<Item> getItemDataByLastN(@Param("startTime") Long startTime, @Param("endTime") Long endTime,
+                                 @Param("items") List<String> items, @Param("table") String dataSource,
+                                  @Param("myBucket") String myBucket, @Param("myTable") String myTable,
+                                  @Param("groupByInterval") String groupByInterval);
+    @QueryApi
+    List<Object> getItemDataByLastN2(@Param("startTime") Long startTime, @Param("endTime") Long endTime,
+                                  @Param("items") List<String> items, @Param("table") String dataSource,
+                                     @Param("myBucket") String myBucket, @Param("myTable") String myTable,
+                                     @Param("groupByInterval") String groupByInterval);
+
     /**
      * 查询时间范围内的历史数据
      * @param startTime 开始时间

+ 1 - 1
industry-system/cqcy-ei-influxdb/src/main/java/com/cqcy/ei/influxdb/service/QueryService.java

@@ -23,7 +23,7 @@ public interface QueryService {
      * @param dataSource 点位数据源
      * @return List<Item>
      */
-    List<Item> getItemDataHistoryByLast(List<String> items, String dataSource);
+    List<Item> getItemDataHistoryByLast(List<String> items, String dataSource, String groupByInterval);
 
     /**
      * 查询点位集合历史数据

+ 19 - 2
industry-system/cqcy-ei-influxdb/src/main/java/com/cqcy/ei/influxdb/service/impl/QueryServiceImpl.java

@@ -1,7 +1,9 @@
 package com.cqcy.ei.influxdb.service.impl;
 
 import cn.hutool.core.convert.Convert;
+//import cn.hutool.core.date.DatePattern;
 import cn.hutool.core.date.DateUtil;
+//import cn.hutool.core.util.StrUtil;
 import com.cqcy.ei.influxdb.config.InfluxDBProperties;
 import com.cqcy.ei.influxdb.entity.Item;
 import com.cqcy.ei.influxdb.entity.RangeEnum;
@@ -12,6 +14,7 @@ import com.cqcy.ei.influxdb.util.InfluxDBUtil;
 import lombok.AllArgsConstructor;
 import org.springframework.stereotype.Service;
 
+//import java.text.DateFormat;
 import java.util.Date;
 import java.util.List;
 import java.util.Map;
@@ -44,9 +47,13 @@ public class QueryServiceImpl implements QueryService {
 
 
     @Override
-    public List<Item> getItemDataHistoryByLast(List<String> items, String dataSource) {
+    public List<Item> getItemDataHistoryByLast(List<String> items, String dataSource, String groupByInterval) {
         Date startTime = new Date();
         Date endTime = RangeEnum.HOUR.getEndTime(startTime, 24);
+        //System.out.println("startTime:" + endTime.toInstant().getEpochSecond());
+        //System.out.println("startTime:" + DateUtil.format(endTime, DatePattern.NORM_DATETIME_MS_FORMAT));
+        //System.out.println("endTime:" + startTime.toInstant().getEpochSecond());
+        //System.out.println("endTime:" + DateUtil.format(startTime, DatePattern.NORM_DATETIME_MS_FORMAT));
         String bucket = influxDBProperties.getBucket();
         String measurement = influxDBProperties.getMeasurement();
         if (influxDBProperties.getSubType() > 0) {
@@ -56,7 +63,17 @@ public class QueryServiceImpl implements QueryService {
                 measurement += "_" + DateUtil.year(date) + "_" + (DateUtil.month(date) + 1);
             }
         }
-        return queryMapper.getItemDataByLast(endTime.toInstant().getEpochSecond(), startTime.toInstant().getEpochSecond() - 2, items, dataSource, bucket, measurement);
+        /*if(StrUtil.isEmpty(groupByInterval)){
+            groupByInterval = "1ms";
+        }else if(groupByInterval.indexOf("s") < 0){
+            groupByInterval += "s";
+        }*/
+        List<Item> itemDataByLastN = queryMapper.getItemDataByLastN(endTime.toInstant().getEpochSecond(),
+                startTime.toInstant().getEpochSecond(), items, dataSource, bucket, measurement, groupByInterval);
+        /*System.out.println(queryMapper.getItemDataByLastN2(endTime.toInstant().getEpochSecond(),
+                startTime.toInstant().getEpochSecond(), items, dataSource, bucket, measurement, groupByInterval));*/
+
+        return itemDataByLastN;
     }
 
     @Override

+ 46 - 1
industry-system/cqcy-ei-influxdb/src/main/resources/flux/QueryMapper.xml

@@ -21,6 +21,51 @@
             |> last()
     </select>
 
+    <!--查询时间范围内的最新一批数据-->
+    <select id="getItemDataByLastN">
+        from(bucket: #{myBucket})
+        |> range(start: ${startTime}, stop: ${endTime})
+        |> filter(fn: (r) => r["_measurement"] == #{myTable})
+        |> filter(fn: (r) => r["name"] == #{table} or r["ItemGroup"] == #{table})
+        |> filter(fn: (r) => r["_field"] =~
+        <trim suffixOverrides=" " prefixOverrides=" ">
+            <foreach collection="items" item="item" separator="|" open="/" close="/">${item}</foreach>
+        </trim>
+        )
+        <!--<if test="groupByInterval != null and groupByInterval != ''">
+            |> aggregateWindow(every: ${groupByInterval}, fn: last, createEmpty: false)
+        </if>-->
+        |> map(fn: (r) => ({
+        _field: r["_field"],
+        _value: r["_value"],
+        _time: r["_time"]
+        }))
+        |> last()
+    </select>
+
+    <!--查询时间范围内的最新一批数据-->
+    <select id="getItemDataByLastN2">
+        from(bucket: #{myBucket})
+        |> range(start: ${startTime}, stop: ${endTime})
+        |> filter(fn: (r) => r["_measurement"] == #{myTable})
+        |> filter(fn: (r) => r["name"] == #{table} or r["ItemGroup"] == #{table})
+        |> filter(fn: (r) => r["_field"] =~
+        <trim suffixOverrides=" " prefixOverrides=" ">
+            <foreach collection="items" item="item" separator="|" open="/" close="/">${item}</foreach>
+        </trim>
+        )
+        <!--<if test="groupByInterval != null and groupByInterval != ''">
+            |> aggregateWindow(every: ${groupByInterval}, fn: last, createEmpty: false)
+        </if>-->
+        |> map(fn: (r) => ({
+        _field: r["_field"],
+        _value: r["_value"],
+        _time: r["_time"]
+        }))
+        |> sort(columns: ["_time"] ,desc: true)
+        |> limit(n:1)
+    </select>
+
     <!--查询时间范围内的历史数据-->
     <select id="getItemDataHistory">
         <foreach collection="list" item="item">
@@ -71,4 +116,4 @@
             |> sort(columns: ["ItemGroup", "_field", "_time"])
     </select>
 
-</mapper>
+</mapper>

+ 4 - 4
industry-system/industry-da/pom.xml

@@ -9,12 +9,12 @@
     </parent>
 
     <groupId>com.example</groupId>
-    <artifactId>industry-da</artifactId>
-    <version>0.0.1-SNAPSHOT</version>
+    <artifactId>ei-platform</artifactId>
+    <version>1.0.0</version>
     <!-- 修改opc_da模块的打包方式为jar,为mapper模块提供依赖 -->
     <packaging>jar</packaging>
-    <name>industry-da</name>
-    <description>industry-da</description>
+    <name>ei-platform</name>
+    <description>ei-platform</description>
 
     <dependencies>
         <dependency>

+ 1 - 1
industry-system/industry-da/src/main/java/com/example/opc_da/alarmConfig/DatasouceAlarmTask.java

@@ -81,7 +81,7 @@ public class DatasouceAlarmTask implements DisposableBean {
             }
             future = threadPool.scheduleAtFixedRate(() -> {
                 //获取点位最新的数据
-                List<com.cqcy.ei.influxdb.entity.Item> itemDataByLast = queryService.getItemDataHistoryByLast(items, dataSourceName);
+                List<com.cqcy.ei.influxdb.entity.Item> itemDataByLast = queryService.getItemDataByLast(items, dataSourceName);
                 if (CollUtil.isNotEmpty(alarmConfigList) && CollUtil.isNotEmpty(itemDataByLast)) {
                     if (CollUtil.isEmpty(lastItemDataList)) {
                         generateAlarms(alarmConfigList, itemDataByLast);

+ 6 - 1
industry-system/industry-da/src/main/java/com/example/opc_da/dao/CollectorDao.java

@@ -115,4 +115,9 @@ public interface CollectorDao {
      * @return
      */
     List<String> getReportDataPolicyNameByClientId(Integer clientId);
-}
+
+    /**
+     * 查询采集数据组的采集间隔
+     * @return
+     */
+    /**List<Map<String, Object>> getItemIntervalByReportDataPolicyId(@Param("reportDataPolicyId") Integer reportDataPolicyId);**/

+ 75 - 0
industry-system/industry-da/src/main/java/com/example/opc_da/policy/ChangeReportDataPolicyTask.java

@@ -1,6 +1,9 @@
 package com.example.opc_da.policy;
 
 import cn.hutool.core.collection.CollUtil;
+//import cn.hutool.core.date.DatePattern;
+//import cn.hutool.core.date.DateUtil;
+//import cn.hutool.core.date.StopWatch;
 import com.cqcy.ei.influxdb.entity.Item;
 import com.example.opc_common.entity.ReportDataPolicy;
 import com.example.opc_common.entity.ReportDataPolicyItem;
@@ -17,6 +20,7 @@ import java.util.Map;
 import java.util.TimerTask;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
+//import java.util.stream.IntStream;
 
 @Getter
 public class ChangeReportDataPolicyTask extends ReportDataPolicyTask {
@@ -88,6 +92,77 @@ public class ChangeReportDataPolicyTask extends ReportDataPolicyTask {
                 lastPolicyItemList = currentPolicyItemList;
                 currentPolicyItemList = new ArrayList<>(policyItemList);
             }
+            /*@Override
+            public void run() {
+                try {
+                //判断数据组是否到达了,今天的结束时间
+                Long currentTime = System.currentTimeMillis();
+                if (currentTime<=startTime || currentTime >= endTime){
+                    stop();
+                    return;
+                }
+                //查询此报表策略最新一轮数据
+                List<Item> itemDataByLast = getItemDataHistoryByLast();
+
+                // 过滤已经插入时间对应的数据
+                // 拆分为多条数据
+                Map<String, List<Item>> itemDataMapByTime = itemDataByLast.stream().filter(item -> {
+                    if(lastItemDataByLast.size() == 0){
+                        return true;
+                    }
+                    return DateUtil.parse(lastItemDataByLast.get(0).getTime(), DatePattern.NORM_DATETIME_MS_FORMAT)
+                            .before(DateUtil.parse(item.getTime(), DatePattern.NORM_DATETIME_MS_FORMAT));
+                }).collect(Collectors.groupingBy(Item::getTime));
+
+                if(itemDataMapByTime.isEmpty()){
+                    // 无新增数据
+                    return;
+                }
+
+                // 数据时间正排
+                List<String> collect = itemDataMapByTime.keySet().stream().sorted().collect(Collectors.toList());
+
+                // 循环插入新增的数据
+                IntStream.range(0,collect.size()).forEach(index -> {
+
+                    //将得到的数据转换为Map<点位名称,Item实体类>
+                    Map<String, Item> map = itemDataMapByTime.get(collect.get(index)).stream().collect(Collectors.toMap(Item::getName, i -> i));
+                    //将报表采集策略的点位,赋值相应的值
+                    for(ReportDataPolicyItem policyItem : currentPolicyItemList){
+                        Item item = map.get(policyItem.getItemReadName());
+                        QueryServiceUtil.itemParentCountValue(policyItem, item);
+                    }
+
+                    if(index > 0){
+                        try {
+                            Thread.sleep(1);
+                        } catch (InterruptedException e) {
+                            throw new RuntimeException(e);
+                        }
+                    }
+
+                    //判断数据是否符合变化率,并将符合的往过滤数据库添加
+                    addPolicyItemList(currentPolicyItemList.stream()
+                            .filter(p ->
+                                    MathUtil.isMeetChange(
+                                            new BigDecimal(lastPolicyItemList.stream()
+                                                    .filter(i -> p.getId().equals(i.getId()))
+                                                    .findFirst().get().getDataValue())
+                                            , new BigDecimal(p.getDataValue())
+                                            , modelValue
+                                            , readModeType
+                                    ))
+                            .collect(Collectors.toList()));
+
+                    lastItemDataByLast = itemDataMapByTime.get(collect.get(index));
+                    lastPolicyItemList = currentPolicyItemList;
+                    currentPolicyItemList = new ArrayList<>(policyItemList);
+                });
+
+                }catch (Exception e){
+                    e.printStackTrace();
+                }
+            }*/
         };
     }
 

+ 87 - 0
industry-system/industry-da/src/main/java/com/example/opc_da/policy/EventReportDataPolicyTask.java

@@ -2,6 +2,8 @@ package com.example.opc_da.policy;
 
 import cn.hutool.core.collection.CollUtil;
 import cn.hutool.core.convert.Convert;
+//import cn.hutool.core.date.DatePattern;
+//import cn.hutool.core.date.DateUtil;
 import cn.hutool.core.util.StrUtil;
 import cn.hutool.extra.servlet.ServletUtil;
 import com.cqcy.ei.influxdb.entity.Item;
@@ -23,6 +25,7 @@ import java.math.BigDecimal;
 import java.util.*;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
+//import java.util.stream.IntStream;
 
 @Slf4j
 @Getter
@@ -178,6 +181,90 @@ public class EventReportDataPolicyTask extends ReportDataPolicyTask {
                 lastPolicyItemList = currentPolicyItemList;
                 currentPolicyItemList = new ArrayList<>(policyItemList);
             }
+
+
+            /*@Override
+            public void run() {
+                try {
+                //判断数据组是否到达了,今天的结束时间
+                Long currentTime = System.currentTimeMillis();
+                if (currentTime<=startTime || currentTime >= endTime){
+                    stop();
+                    return;
+                }
+                //查询此报表策略最新一轮数据
+                List<Item> itemDataByLast = getItemDataHistoryByLast();
+
+                // 过滤已经插入时间对应的数据
+                // 拆分为多条数据
+                Map<String, List<Item>> itemDataMapByTime = itemDataByLast.stream().filter(item -> {
+                    if(lastItemDataByLast.size() == 0){
+                        return true;
+                    }
+                    return DateUtil.parse(lastItemDataByLast.get(0).getTime(), DatePattern.NORM_DATETIME_MS_FORMAT)
+                            .before(DateUtil.parse(item.getTime(), DatePattern.NORM_DATETIME_MS_FORMAT));
+                }).collect(Collectors.groupingBy(Item::getTime));
+
+                if(itemDataMapByTime.isEmpty()){
+                    // 无新增数据
+                    return;
+                }
+                // 数据时间正排
+                List<String> collect = itemDataMapByTime.keySet().stream().sorted().collect(Collectors.toList());
+
+                // 循环插入新增的数据
+                IntStream.range(0,collect.size()).forEach(index -> {
+
+                    //将得到的数据转换为Map<点位名称,Item实体类>
+                    Map<String, Item> map = itemDataMapByTime.get(collect.get(index)).stream().collect(Collectors.toMap(Item::getName, i -> i));
+                    //将报表采集策略的点位,赋值相应的值
+                    for(ReportDataPolicyItem policyItem : currentPolicyItemList){
+                        Item item = map.get(policyItem.getItemReadName());
+                        QueryServiceUtil.itemParentCountValue(policyItem, item);
+                    }
+                    if(index > 0){
+                        try {
+                            Thread.sleep(1);
+                        } catch (InterruptedException e) {
+                            throw new RuntimeException(e);
+                        }
+                    }
+
+                    //判断事件驱动项是否符合事件驱动条件
+                    BigDecimal dataValue = getEventValue(eventItemId, currentPolicyItemList);
+                    Boolean eventFlag = false;
+                    //如果条件模式是,大于
+                    if (readModeType.equals(ConstantStr.EVENT_MODEL_EXCEED)) {
+                        if (dataValue.compareTo(modeValue) == 1) eventFlag = true;
+                        //如果条件模式是,小于
+                    } else if (readModeType.equals(ConstantStr.EVENT_MODEL_LOWER)) {
+                        if (dataValue.compareTo(modeValue) == -1) eventFlag = true;
+                        //如果条件模式是,等于
+                    } else if (readModeType.equals(ConstantStr.EVENT_MODEL_EQUAL)) {
+                        if (dataValue.compareTo(modeValue) == 0) eventFlag = true;
+                        //如果条件模式是,动态大于
+                    } else if (readModeType.equals(ConstantStr.EVENT_TRENDS_EXCEED)) {
+                        if (getEventValue(eventItemId, lastPolicyItemList).compareTo(modeValue) != 1) {
+                            if (dataValue.compareTo(modeValue) == 1) eventFlag = true;
+                        }
+                        //如果条件模式是,动态小于
+                    } else if (readModeType.equals(ConstantStr.EVENT_TRENDS_LOWER)) {
+                        if (getEventValue(eventItemId, lastPolicyItemList).compareTo(modeValue) != -1) {
+                            if (dataValue.compareTo(modeValue) == -1) eventFlag = true;
+                        }
+                    }
+                    if (eventFlag) {
+                        addPolicyItemList(currentPolicyItemList);
+                    }
+                    lastItemDataByLast = itemDataMapByTime.get(collect.get(index));
+                    lastPolicyItemList = currentPolicyItemList;
+                    currentPolicyItemList = new ArrayList<>(policyItemList);
+                });
+
+                }catch (Exception e){
+                    e.printStackTrace();
+                }
+            }*/
         };
     }
 

+ 99 - 4
industry-system/industry-da/src/main/java/com/example/opc_da/policy/FreReportDataPolicyTask.java

@@ -1,23 +1,27 @@
 package com.example.opc_da.policy;
 
 import cn.hutool.core.collection.CollUtil;
+//import cn.hutool.core.convert.Convert;
+//import cn.hutool.core.date.DatePattern;
+//import cn.hutool.core.date.DateUtil;
+//import cn.hutool.core.date.StopWatch;
 import com.cqcy.ei.influxdb.entity.Item;
 import com.example.opc_common.entity.ReportDataPolicy;
 import com.example.opc_common.entity.ReportDataPolicyItem;
 import com.example.opc_da.config.SpringContextUtils;
+//import com.example.opc_da.service.CollectorService;
 import com.example.opc_da.task.AsyncTask;
 import com.example.opc_da.util.QueryServiceUtil;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.TimerTask;
+import java.util.*;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
+//import java.util.stream.IntStream;
 
 public class FreReportDataPolicyTask extends ReportDataPolicyTask {
 
     private final AsyncTask asyncTask = SpringContextUtils.getBean(AsyncTask.class);
+    //private final CollectorService collectorService = SpringContextUtils.getBean(CollectorService.class);
 
     public FreReportDataPolicyTask(ReportDataPolicy reportDataPolicy) {
         super(reportDataPolicy);
@@ -48,6 +52,8 @@ public class FreReportDataPolicyTask extends ReportDataPolicyTask {
 
                 //查询此报表策略最新一轮数据
                 List<Item> itemDataByLast = getItemDataHistoryByLast();
+                //System.out.println("time0:" + itemDataByLast.get(0).getTime());
+                //System.out.println("time1:" + itemDataByLast.get(1).getTime());
 
                 //过滤已经存在的数据
                 List<Item> validItemDataList = itemDataByLast.stream().filter(i ->
@@ -67,6 +73,95 @@ public class FreReportDataPolicyTask extends ReportDataPolicyTask {
                 lastItemDataByLast = itemDataByLast;
 
             }
+            /*@Override
+            public void run() {
+                try {
+                //判断数据组是否到达了,今天的结束时间
+                Long currentTime = System.currentTimeMillis();
+                if (currentTime<=startTime || currentTime >= endTime){
+                    stop();
+                    return;
+                }
+
+                // 采集器采集多条数据,而策略采集只需要一条数据的条件判断
+                List<Map<String, Object>> itemIntervalMap = collectorService.getItemIntervalByReportDataPolicyId(policyItemList.get(0).getReportDataPolicyId());
+                if(itemIntervalMap == null || itemIntervalMap.size() == 0){
+                    throw new RuntimeException("采集器采集间隔设置错误");
+                }
+                Long collectorItemInterval = Convert.toLong(itemIntervalMap.get(0).get("itemInterval"));
+                String collectorItemUnit = Convert.toStr(itemIntervalMap.get(0).get("itemUnit"));
+                switch (collectorItemUnit){
+                    case "d" :
+                        collectorItemInterval *= 24 * 60 * 60 * 1000;
+                        break;
+                    case "h" :
+                        collectorItemInterval *= 60 * 60 * 1000;
+                        break;
+                    case "m" :
+                        collectorItemInterval *= 60 * 1000;
+                        break;
+                    case "s" :
+                        collectorItemInterval *= 1000;
+                        break;
+                    case "ms" :
+                        break;
+                    default:
+                        throw new RuntimeException("采集器采集间隔设置错误");
+                }
+                Long policyItemInterval = getPeriod() * 1000;
+                String groupByInterval = null;
+                if(collectorItemInterval < policyItemInterval){
+                    groupByInterval = getPeriod() + "s";
+                }else{
+                    groupByInterval = 1 + "ms";
+                }
+
+                //查询此报表策略最新一轮数据
+                List<Item> itemDataByLast = getItemDataHistoryByLast(groupByInterval);
+
+                // 过滤已经插入时间对应的数据
+                // 拆分为多条数据
+                Map<String, List<Item>> itemDataMapByTime = itemDataByLast.stream().filter(item -> {
+                    if(lastItemDataByLast.size() == 0){
+                        return true;
+                    }
+                    return DateUtil.parse(lastItemDataByLast.get(0).getTime(), DatePattern.NORM_DATETIME_MS_FORMAT)
+                            .before(DateUtil.parse(item.getTime(), DatePattern.NORM_DATETIME_MS_FORMAT));
+                }).collect(Collectors.groupingBy(Item::getTime));
+
+                if(itemDataMapByTime.isEmpty()){
+                    // 无新增数据
+                    return;
+                }
+
+                // 数据时间正排
+                List<String> collect = itemDataMapByTime.keySet().stream().sorted().collect(Collectors.toList());
+
+                // 循环插入新增的数据
+                IntStream.range(0,collect.size()).forEach(index -> {
+
+                    //将得到的数据转换为Map<点位名称,Item实体类>
+                    Map<String, Item> map = itemDataMapByTime.get(collect.get(index)).stream().collect(Collectors.toMap(Item::getName, i -> i));
+                    //将报表采集策略的点位,赋值相应的值
+                    for(ReportDataPolicyItem policyItem : policyItemList){
+                        Item item = map.get(policyItem.getItemReadName());
+                        QueryServiceUtil.itemParentCountValue(policyItem, item);
+                    }
+                    lastItemDataByLast = itemDataMapByTime.get(collect.get(index));
+                    if(index > 0){
+                        try {
+                            Thread.sleep(getPeriod() * 1000);
+                        } catch (InterruptedException e) {
+                            throw new RuntimeException(e);
+                        }
+                    }
+                    addPolicyItemList(policyItemList);
+                });
+
+                }catch (Exception e){
+                    e.printStackTrace();
+                }
+            }*/
         };
     }
 

+ 6 - 1
industry-system/industry-da/src/main/java/com/example/opc_da/policy/ReportDataPolicyTask.java

@@ -88,7 +88,12 @@ public abstract class ReportDataPolicyTask implements DisposableBean {
 
     public List<Item> getItemDataHistoryByLast() {
         //查询点位相关的数据
-        return queryService.getItemDataHistoryByLast(items, itemGroupId);
+        return getItemDataHistoryByLast("1ms");
+    }
+
+    public List<Item> getItemDataHistoryByLast(String groupByInterval) {
+        //查询点位相关的数据
+        return queryService.getItemDataHistoryByLast(items, itemGroupId, groupByInterval);
     }
 
     public abstract void run();

+ 8 - 0
industry-system/industry-da/src/main/java/com/example/opc_da/service/CollectorService.java

@@ -3,6 +3,7 @@ package com.example.opc_da.service;
 import com.example.opc_common.entity.Collector;
 import com.example.opc_common.util.Result;
 
+import java.util.List;
 import java.util.Map;
 
 public interface CollectorService {
@@ -84,4 +85,11 @@ public interface CollectorService {
      */
     String getClientNameById(Integer id);
 
+
+    /**
+     * 查询采集数据组的采集间隔
+     * @return
+     */
+    /*List<Map<String, Object>> getItemIntervalByReportDataPolicyId(Integer reportDataPolicyId);*/
+
 }

+ 9 - 0
industry-system/industry-da/src/main/java/com/example/opc_da/service/impl/CollectorServiceImpl.java

@@ -327,4 +327,13 @@ public class CollectorServiceImpl implements CollectorService {
     public String getClientNameById(Integer id){
         return collectorDao.getClientNameById(id);
     }
+
+    /**
+     * 查询采集数据组的采集间隔
+     * @return
+     */
+    /*@Override
+    public List<Map<String, Object>> getItemIntervalByReportDataPolicyId(Integer reportDataPolicyId){
+        return collectorDao.getItemIntervalByReportDataPolicyId(reportDataPolicyId);
+    }*/
 }

+ 1 - 1
industry-system/industry-da/src/main/java/com/example/opc_da/util/QueryServiceUtil.java

@@ -73,7 +73,7 @@ public class QueryServiceUtil {
                 Map<String, Map<String, Item>> mapData = new HashMap<>();
                 for (String itemGroupId : itemData.keySet()) {
                     Map<String, Item> map = new HashMap<>();
-                    List<Item> itemDataByLast = queryService.getItemDataHistoryByLast(itemData.get(itemGroupId), itemGroupId);
+                    List<Item> itemDataByLast = queryService.getItemDataByLast(itemData.get(itemGroupId), itemGroupId);
                     if (Blank.isNotEmpty(itemDataByLast)) {
                         for (Item item : itemDataByLast) {
                             map.put(item.getName(), item);

+ 8 - 0
industry-system/industry-da/src/main/resources/mapper/CollectorDao.xml

@@ -182,4 +182,12 @@
         where ftci.item_group_id = trdp.item_group_id and ftci.client_id = #{clientId}
         ORDER BY trdp.report_data_policy_name
     </select>
+
+
+    <!--查询采集数据组-->
+    <!--<select id="getItemIntervalByReportDataPolicyId" resultType="java.util.Map">
+        select ftci.item_interval itemInterval,ftci.item_unit itemUnit from t_report_data_policy trdp
+        inner join f_telegraf_client_item ftci on ftci.client_id = trdp.client_id and ftci.item_group_id = trdp.item_group_id
+        where trdp.id = #{reportDataPolicyId}
+    </select>-->
 </mapper>

+ 15 - 2
新采集器/fast-api/src/main/java/com/ws/fastapi/util/ConfUtil.java

@@ -1,5 +1,6 @@
 package com.ws.fastapi.util;
 
+import cn.hutool.core.convert.Convert;
 import cn.hutool.core.io.file.FileWriter;
 import cn.hutool.core.util.StrUtil;
 import cn.hutool.json.JSONArray;
@@ -55,6 +56,15 @@ public class ConfUtil {
     public static void createTelegrafConf(JSONObject obj) {
         String path = ConfUtil.getPath();
         JSONObject agent = obj.getJSONObject("agent");
+
+        String interval = agent.getStr("interval","");
+        if(StrUtil.isNotEmpty(interval) && interval.endsWith("d")){
+            Integer day = Convert.toInt(interval.substring(0,interval.length() - 1),null);
+            if(day == null || day == 0){
+                day = 1;
+            }
+            interval = day * 24 + "h";
+        }
         StringBuilder buffer = new StringBuilder();
         // 默认数据收集间隔
         // 是否在整点收集数据
@@ -68,7 +78,7 @@ public class ConfUtil {
         // 不要在 Telegraf 代理中设置host标记
         buffer.append("[agent]\n")
                 // 默认数据收集间隔
-                .append("interval = \"").append(agent.getStr("interval")).append("\"\n")
+                .append("interval = \"").append(interval).append("\"\n")
                 // 是否在整点收集数据
                 .append("round_interval = ").append(agent.getBool("roundInterval")).append("\n")
                 // telegraf一批次从output组件向外发送数据的大小
@@ -95,7 +105,10 @@ public class ConfUtil {
             JSONObject item = input.getJSONObject(i);
             if (Pact.OPC_DA.getValue().equals(item.getStr("type"))) {
                 buffer.append("[[inputs.execd]]\n")
-                        .append("command = [\"").append(getPath()).append("/opcda.exe\", \"-config\", \"").append(createOpcDaConf(item)).append("\"]\n")
+                        .append("command = [\"").append(getPath()).append("/opcda.exe\", \"-poll_interval=")
+                        .append(interval)
+                        .append("\", \"-config\", \"")
+                        .append(createOpcDaConf(item)).append("\"]\n")
                         .append("signal = \"none\"\n")
                         .append("restart_delay = \"").append(agent.getInt("restartDelay")).append("s\"\n")
                         .append("data_format = \"influx\"\n");