ws 1 年間 前
コミット
0485c6d2c7

+ 1 - 0
industry-system/cqcy-ei-common/src/main/java/com/example/opc_common/util/ClientInfoUtil.java

@@ -39,6 +39,7 @@ public class ClientInfoUtil {
         output.set("token", map.get("token"));
         output.set("organization", map.get("org"));
         output.set("bucket", map.get("bucket"));
+        output.set("type", map.get("subType"));
         return output;
     }
 

+ 1 - 1
industry-system/cqcy-ei-influxdb/src/main/java/com/cqcy/ei/influxdb/entity/Item.java

@@ -29,7 +29,7 @@ public class Item {
     }
 
     public Item time(Instant instant) {
-        long millisecond = instant.getEpochSecond() * 1000;
+        long millisecond = instant.toEpochMilli();
         this.time = InfluxDBUtil.formatter.format(new Date(millisecond));
         return this;
     }

+ 1 - 1
industry-system/cqcy-ei-influxdb/src/main/java/com/cqcy/ei/influxdb/mapper/QueryMapper.java

@@ -19,7 +19,7 @@ public interface QueryMapper {
      */
     @QueryApi
     List<Item> getItemDataByLast(@Param("startTime") Long startTime, @Param("endTime") Long endTime,
-                                 @Param("items") List<String> items, @Param("table") String dataSource);
+                                 @Param("items") List<String> items, @Param("table") String dataSource, @Param("myBucket") String myBucket, @Param("myTable") String myTable);
 
     /**
      * 查询时间范围内的历史数据

+ 12 - 2
industry-system/cqcy-ei-influxdb/src/main/java/com/cqcy/ei/influxdb/service/impl/QueryServiceImpl.java

@@ -1,6 +1,7 @@
 package com.cqcy.ei.influxdb.service.impl;
 
 import cn.hutool.core.convert.Convert;
+import cn.hutool.core.date.DateUtil;
 import com.cqcy.ei.influxdb.config.InfluxDBProperties;
 import com.cqcy.ei.influxdb.entity.Item;
 import com.cqcy.ei.influxdb.entity.RangeEnum;
@@ -28,8 +29,17 @@ public class QueryServiceImpl implements QueryService {
     @Override
     public List<Item> getItemDataByLast(List<String> items, String dataSource) {
         Date startTime = new Date();
-        Date endTime = RangeEnum.DAY.getEndTime(startTime, 1);
-        return queryMapper.getItemDataByLast(endTime.toInstant().getEpochSecond(), startTime.toInstant().getEpochSecond(), items, dataSource);
+        Date endTime = RangeEnum.HOUR.getEndTime(startTime, 1);
+        String bucket = influxDBProperties.getBucket();
+        String measurement = influxDBProperties.getMeasurement();
+        if (influxDBProperties.getSubType() > 0) {
+            Date date = new Date();
+            bucket += "_" + DateUtil.year(date);
+            if(influxDBProperties.getSubType() == 2) {
+                measurement += "_" + DateUtil.year(date) + "_" + (DateUtil.month(date) + 1);
+            }
+        }
+        return queryMapper.getItemDataByLast(endTime.toInstant().getEpochSecond(), startTime.toInstant().getEpochSecond(), items, dataSource, bucket, measurement);
     }
 
     @Override

+ 2 - 2
industry-system/cqcy-ei-influxdb/src/main/java/com/cqcy/ei/influxdb/util/InfluxDBUtil.java

@@ -13,7 +13,7 @@ import java.util.*;
 
 public class InfluxDBUtil {
 
-    public static final SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+    public static final SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
 
     public static Object parse(List<FluxTable> list, String returnType) {
         if ("java.lang.Integer".equals(returnType)) {
@@ -68,7 +68,7 @@ public class InfluxDBUtil {
         List<String> measurements;
         Map<String, Object> params;
         // 小于今天,满足分库条件
-        if (DateUtil.beginOfDay(new Date()).getTime() > startTime.getTime()) {
+        if (properties.getSubType() > 0) {
             int startYear = DateUtil.year(startTime);
             int endYear = DateUtil.year(endTime);
             // 按年分库

+ 101 - 101
industry-system/cqcy-ei-influxdb/src/main/java/com/cqcy/ei/influxdb/util/SysCronUtil.java

@@ -17,112 +17,112 @@ public class SysCronUtil {
      * 分库分表定时任务
      */
     public static void exec() {
-        StaticLog.info("====>开始执行分库分表定时任务");
-        // influxdb client配置信息
-        InfluxDBProperties properties = SpringUtil.getBean(InfluxDBProperties.class);
-        if (properties.getSubType() == 0) {
-            StaticLog.info("====>当前方案为不分库分表,退出定时任务");
-            return;
-        }
-        // 昨天
-        Date yesterday = DateUtil.yesterday();
-        // 开始时间
-        Date startTime = DateUtil.beginOfDay(yesterday);
-        // 结束时间
-        Date endTime = DateUtil.endOfDay(yesterday);
-        // 目标表
-        String targetMeasurement = properties.getMeasurement();
-        // 目标库
-        String targetBucket = properties.getBucket() + "_" + DateUtil.year(yesterday);
-        if (properties.getSubType() == 2) {
-            // 按年分库按月分表
-            targetMeasurement = properties.getMeasurement() + "_" + DateUtil.year(yesterday) + "_" + (DateUtil.month(yesterday) + 1);
-        }
-        InFluxDBService inFluxDBService = SpringUtil.getBean(InFluxDBService.class);
-        // 检查bucket是否存在,不存在则创建
-        inFluxDBService.checkAndCreateBucket(targetBucket);
-        long start = System.currentTimeMillis();
-        // 迁移数据
-        inFluxDBService.moveMeasurementByTime(startTime, endTime, targetBucket, targetMeasurement);
-        StaticLog.info("====>迁移数据执行完成,目标库:{},目标表:{},耗时:{}ms", targetBucket, targetMeasurement, System.currentTimeMillis() - start);
-        start = System.currentTimeMillis();
-        // 删除残留数据
-        inFluxDBService.deleteMeasurementByTime(startTime, endTime);
-        StaticLog.info("====>删除残留数据完成,耗时:{}ms", System.currentTimeMillis() - start);
-        StaticLog.info("====>完成执行分库分表定时任务");
+//        StaticLog.info("====>开始执行分库分表定时任务");
+//        // influxdb client配置信息
+//        InfluxDBProperties properties = SpringUtil.getBean(InfluxDBProperties.class);
+//        if (properties.getSubType() == 0) {
+//            StaticLog.info("====>当前方案为不分库分表,退出定时任务");
+//            return;
+//        }
+//        // 昨天
+//        Date yesterday = DateUtil.yesterday();
+//        // 开始时间
+//        Date startTime = DateUtil.beginOfDay(yesterday);
+//        // 结束时间
+//        Date endTime = DateUtil.endOfDay(yesterday);
+//        // 目标表
+//        String targetMeasurement = properties.getMeasurement();
+//        // 目标库
+//        String targetBucket = properties.getBucket() + "_" + DateUtil.year(yesterday);
+//        if (properties.getSubType() == 2) {
+//            // 按年分库按月分表
+//            targetMeasurement = properties.getMeasurement() + "_" + DateUtil.year(yesterday) + "_" + (DateUtil.month(yesterday) + 1);
+//        }
+//        InFluxDBService inFluxDBService = SpringUtil.getBean(InFluxDBService.class);
+//        // 检查bucket是否存在,不存在则创建
+//        inFluxDBService.checkAndCreateBucket(targetBucket);
+//        long start = System.currentTimeMillis();
+//        // 迁移数据
+//        inFluxDBService.moveMeasurementByTime(startTime, endTime, targetBucket, targetMeasurement);
+//        StaticLog.info("====>迁移数据执行完成,目标库:{},目标表:{},耗时:{}ms", targetBucket, targetMeasurement, System.currentTimeMillis() - start);
+//        start = System.currentTimeMillis();
+//        // 删除残留数据
+//        inFluxDBService.deleteMeasurementByTime(startTime, endTime);
+//        StaticLog.info("====>删除残留数据完成,耗时:{}ms", System.currentTimeMillis() - start);
+//        StaticLog.info("====>完成执行分库分表定时任务");
     }
 
     /**
      * 初始化执行分库分表
      */
     public static void init() {
-        StaticLog.info("====>开始执行分库分表初始化任务");
-        // influxdb client配置信息
-        InfluxDBProperties properties = SpringUtil.getBean(InfluxDBProperties.class);
-        if (properties.getSubType() == 0) {
-            StaticLog.info("====>当前方案为不分库分表,退出初始化任务");
-            return;
-        }
-        if (DateUtil.today().equals(properties.getStartTime())) {
-            StaticLog.info("====>当前时间:{}不需要执行初始化任务,退出初始化任务", DateUtil.today());
-            return;
-        }
-        InFluxDBService inFluxDBService = SpringUtil.getBean(InFluxDBService.class);
-        Date startTime = DateUtil.parse(properties.getStartTime());
-        Date endTime = DateUtil.date();
-        // 检查是否有为迁移历史数据
-        Integer count = inFluxDBService.getMeasurementCountByTime(startTime, endTime);
-        if (count == 0) {
-            StaticLog.info("====>当前没有需要迁移数据,退出初始化任务");
-            return;
-        }
-        // 获取已有表逻辑
-        List<Map<String, Object>> list = InfluxDBUtil.getMeasurement(startTime, endTime, properties);
-        Date start;
-        Date end;
-        long time = System.currentTimeMillis();
-        for (Map<String, Object> params : list) {
-            String bucket = Convert.toStr(params.get("bucket"));
-            if (!bucket.contains("_")) {
-                continue;
-            }
-            inFluxDBService.checkAndCreateBucket(bucket);
-            String[] str = bucket.split("_");
-            if (properties.getSubType() == 1) {
-                // 只分库
-                start = DateUtil.parse(str[1] + "-01-01");
-                end = DateUtil.endOfYear(start);
-                if (end.after(endTime)) {
-                    end = DateUtil.endOfDay(DateUtil.yesterday());
-                }
-                // 迁移数据
-                inFluxDBService.moveMeasurementByTime(start, end, bucket, properties.getMeasurement());
-                StaticLog.info("====>迁移数据执行完成,目标库:{},目标表:{},耗时:{}ms", bucket, properties.getMeasurement(), System.currentTimeMillis() - time);
-                time = System.currentTimeMillis();
-                // 删除残留数据
-                inFluxDBService.deleteMeasurementByTime(start, end);
-                StaticLog.info("====>删除残留数据完成,耗时:{}ms", System.currentTimeMillis() - time);
-                time = System.currentTimeMillis();
-                continue;
-            }
-            List<String> measurements = Convert.toList(String.class, params.get("measurement"));
-            for (String measurement : measurements) {
-                String[] tables = measurement.split("_");
-                start = DateUtil.parse(tables[1] + "-" + tables[2] + "-01");
-                end = DateUtil.endOfMonth(start);
-                if (end.after(endTime)) {
-                    end = DateUtil.endOfDay(DateUtil.yesterday());
-                }
-                // 迁移数据
-                inFluxDBService.moveMeasurementByTime(start, end, bucket, measurement);
-                StaticLog.info("====>迁移数据执行完成,目标库:{},目标表:{},耗时:{}ms", bucket, measurement, System.currentTimeMillis() - time);
-                time = System.currentTimeMillis();
-                // 删除残留数据
-                inFluxDBService.deleteMeasurementByTime(start, end);
-                StaticLog.info("====>删除残留数据完成,耗时:{}ms", System.currentTimeMillis() - time);
-                time = System.currentTimeMillis();
-            }
-        }
-        StaticLog.info("====>完成执行分库分表初始化任务");
+//        StaticLog.info("====>开始执行分库分表初始化任务");
+//        // influxdb client配置信息
+//        InfluxDBProperties properties = SpringUtil.getBean(InfluxDBProperties.class);
+//        if (properties.getSubType() == 0) {
+//            StaticLog.info("====>当前方案为不分库分表,退出初始化任务");
+//            return;
+//        }
+//        if (DateUtil.today().equals(properties.getStartTime())) {
+//            StaticLog.info("====>当前时间:{}不需要执行初始化任务,退出初始化任务", DateUtil.today());
+//            return;
+//        }
+//        InFluxDBService inFluxDBService = SpringUtil.getBean(InFluxDBService.class);
+//        Date startTime = DateUtil.parse(properties.getStartTime());
+//        Date endTime = DateUtil.date();
+//        // 检查是否有为迁移历史数据
+//        Integer count = inFluxDBService.getMeasurementCountByTime(startTime, endTime);
+//        if (count == 0) {
+//            StaticLog.info("====>当前没有需要迁移数据,退出初始化任务");
+//            return;
+//        }
+//        // 获取已有表逻辑
+//        List<Map<String, Object>> list = InfluxDBUtil.getMeasurement(startTime, endTime, properties);
+//        Date start;
+//        Date end;
+//        long time = System.currentTimeMillis();
+//        for (Map<String, Object> params : list) {
+//            String bucket = Convert.toStr(params.get("bucket"));
+//            if (!bucket.contains("_")) {
+//                continue;
+//            }
+//            inFluxDBService.checkAndCreateBucket(bucket);
+//            String[] str = bucket.split("_");
+//            if (properties.getSubType() == 1) {
+//                // 只分库
+//                start = DateUtil.parse(str[1] + "-01-01");
+//                end = DateUtil.endOfYear(start);
+//                if (end.after(endTime)) {
+//                    end = DateUtil.endOfDay(DateUtil.yesterday());
+//                }
+//                // 迁移数据
+//                inFluxDBService.moveMeasurementByTime(start, end, bucket, properties.getMeasurement());
+//                StaticLog.info("====>迁移数据执行完成,目标库:{},目标表:{},耗时:{}ms", bucket, properties.getMeasurement(), System.currentTimeMillis() - time);
+//                time = System.currentTimeMillis();
+//                // 删除残留数据
+//                inFluxDBService.deleteMeasurementByTime(start, end);
+//                StaticLog.info("====>删除残留数据完成,耗时:{}ms", System.currentTimeMillis() - time);
+//                time = System.currentTimeMillis();
+//                continue;
+//            }
+//            List<String> measurements = Convert.toList(String.class, params.get("measurement"));
+//            for (String measurement : measurements) {
+//                String[] tables = measurement.split("_");
+//                start = DateUtil.parse(tables[1] + "-" + tables[2] + "-01");
+//                end = DateUtil.endOfMonth(start);
+//                if (end.after(endTime)) {
+//                    end = DateUtil.endOfDay(DateUtil.yesterday());
+//                }
+//                // 迁移数据
+//                inFluxDBService.moveMeasurementByTime(start, end, bucket, measurement);
+//                StaticLog.info("====>迁移数据执行完成,目标库:{},目标表:{},耗时:{}ms", bucket, measurement, System.currentTimeMillis() - time);
+//                time = System.currentTimeMillis();
+//                // 删除残留数据
+//                inFluxDBService.deleteMeasurementByTime(start, end);
+//                StaticLog.info("====>删除残留数据完成,耗时:{}ms", System.currentTimeMillis() - time);
+//                time = System.currentTimeMillis();
+//            }
+//        }
+//        StaticLog.info("====>完成执行分库分表初始化任务");
     }
 }

+ 0 - 1
industry-system/cqcy-ei-influxdb/src/main/resources/flux/StoreMapper.xml

@@ -12,7 +12,6 @@
                 })
             )
             |> to(bucket: #{targetBucket})
-            |> count()
     </select>
 
     <!--查询源表是否含有历史数据-->

+ 1 - 0
telegraf-client/exe/conf/client.setting

@@ -33,6 +33,7 @@ url = http://192.168.1.168:8086
 token = zAyO0a0gdTHH8j7lo520TQFsoNuFhMEPkDLiurCr__uTbKrAUPyx1O4hMdwWT5eed-dIiHbKOJHOd7E6JpqifA==
 organization = jd
 bucket = test
+type = 0
 
 [telegraf.input]
 restartDelay = 10

+ 2 - 1
telegraf-client/src/main/java/com/cqcy/ei/telegraf/client/ClientApplication.java

@@ -11,8 +11,9 @@ import javafx.scene.image.Image;
 import javafx.stage.Stage;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
 import org.springframework.context.annotation.Import;
+import org.springframework.scheduling.annotation.EnableScheduling;
 
-
+@EnableScheduling
 @SpringBootApplication
 @Import(SpringUtil.class)
 public class ClientApplication extends AbstractJavaFxApplicationSupport {

+ 3 - 0
telegraf-client/src/main/java/com/cqcy/ei/telegraf/client/controller/MainController.java

@@ -1,6 +1,7 @@
 package com.cqcy.ei.telegraf.client.controller;
 
 import cn.hutool.core.io.file.FileReader;
+import cn.hutool.core.io.file.FileWriter;
 import cn.hutool.core.util.StrUtil;
 import cn.hutool.json.JSONObject;
 import cn.hutool.json.JSONUtil;
@@ -153,6 +154,8 @@ public class MainController implements Initializable {
             return Result.miss();
         }
         JSONObject json = JSONUtil.parseObj(configJson);
+        FileWriter writer = new FileWriter(ClientInfoUtil.path + "/configer.json");
+        writer.write(configJson);
         // 生成配置文件
         ConfUtil.createTelegrafConf(json);
         // 保存默认配置

+ 2 - 0
telegraf-client/src/main/java/com/cqcy/ei/telegraf/client/entity/TelegrafOutput.java

@@ -12,4 +12,6 @@ public class TelegrafOutput {
     private String organization;
 
     private String bucket;
+
+    private Integer type;
 }

+ 46 - 0
telegraf-client/src/main/java/com/cqcy/ei/telegraf/client/task/InfluxDbTask.java

@@ -0,0 +1,46 @@
+package com.cqcy.ei.telegraf.client.task;
+
+import cn.hutool.core.date.DateUtil;
+import cn.hutool.core.io.file.FileReader;
+import cn.hutool.json.JSONArray;
+import cn.hutool.json.JSONObject;
+import cn.hutool.json.JSONUtil;
+import com.cqcy.ei.telegraf.client.util.ClientInfoUtil;
+import com.cqcy.ei.telegraf.client.util.ConfUtil;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Component;
+
+import java.util.Date;
+
+@Component
+public class InfluxDbTask {
+
+    @Scheduled(cron = "0 0 0 * * ?")
+    private void process() {
+        FileReader reader = new FileReader(ClientInfoUtil.path + "/configer.json");
+        JSONObject obj = JSONUtil.parseObj(reader.readString());
+        JSONObject output = obj.getJSONObject("output");
+        Integer type = output.getInt("type");
+       if (type > 0) {
+           Date date = DateUtil.date();
+
+
+           output.set("bucket", output.getStr("bucket") + "_" + DateUtil.year(date));
+           obj.set("output", output);
+
+           if (type == 2) {
+               JSONArray input = obj.getJSONArray("input");
+               for (int i = 0; i < input.size(); i++) {
+                   JSONObject item = input.getJSONObject(i);
+                   item.set("name", item.getStr("name") + "_" + DateUtil.year(date) + "_" + (DateUtil.month(date) + 1));
+                   input.set(i, item);
+               }
+               obj.set("input", input);
+           }
+           // 生成配置文件
+           ConfUtil.createTelegrafConf(obj);
+       }
+
+    }
+
+}

+ 1 - 0
telegraf-client/src/main/java/com/cqcy/ei/telegraf/client/util/ClientInfoUtil.java

@@ -85,6 +85,7 @@ public class ClientInfoUtil {
             setting.setByGroup("token", group, output.getStr("token", ""));
             setting.setByGroup("organization", group, output.getStr("organization", ""));
             setting.setByGroup("bucket", group, output.getStr("bucket", ""));
+            setting.setByGroup("type", group, output.getStr("type", "0"));
         }
         setting.store(new File("conf/client.setting"));
         update();

+ 1 - 1
telegraf-client/src/main/java/com/cqcy/ei/telegraf/client/util/ExecUtil.java

@@ -67,7 +67,7 @@ public class ExecUtil {
         try {
             String path = ClientInfoUtil.path;
             // 启动进程
-            process = Runtime.getRuntime().exec(path + "/telegraf.exe --config " + path + "/telegraf.conf");
+            process = Runtime.getRuntime().exec(path + "/telegraf.exe --config " + path + "/telegraf.conf --watch-config");
             String logFile = path + "/log/telegraf_" + System.currentTimeMillis() + ".log";
             FileUtil.touch(logFile);
             writer = new FileWriter(logFile);