浏览代码

修改插入过滤数据,未将空数据处理掉

zhoupeng 1 年之前
父节点
当前提交
8ce06e4e73

+ 22 - 2
industry-system/cqcy-ei-influxdb/src/main/java/com/cqcy/ei/influxdb/service/InFluxDBService.java

@@ -5,6 +5,7 @@ import com.cqcy.ei.influxdb.entity.RangeEnum;
 import com.example.opc_common.entity.ReportDataPolicyItem;
 
 import javax.validation.constraints.NotBlank;
+import java.time.LocalDateTime;
 import java.util.Date;
 import java.util.List;
 
@@ -180,8 +181,27 @@ public interface InFluxDBService {
      */
     Boolean batchInsert(String bucket, String measurement, List<? extends ReportDataPolicyItem> policyItemList);
 
-
+    /**
+     * 查询过滤数据库的实时数据
+     *
+     * @param bucket
+     * @param measurement
+     * @param idList
+     * @param type
+     * @return
+     */
     List<Item> queryLast(String bucket, String measurement, List<String> idList, @NotBlank String type);
 
-    List<Item> queryHistory(String bucket, String measurement, Date startTime, Date endTime, List<String> idList, @NotBlank String type);
+    /**
+     * 查询过滤数据库的历史数据
+     *
+     * @param bucket
+     * @param measurement
+     * @param startDateTime
+     * @param endDateTime
+     * @param idList
+     * @param type
+     * @return
+     */
+    List<Item> queryHistory(String bucket, String measurement, LocalDateTime startDateTime, LocalDateTime endDateTime, List<String> idList, @NotBlank String type);
 }

+ 20 - 13
industry-system/cqcy-ei-influxdb/src/main/java/com/cqcy/ei/influxdb/service/impl/InFluxDBServiceImpl.java

@@ -11,8 +11,8 @@ import com.cqcy.ei.influxdb.service.InFluxDBService;
 import com.cqcy.ei.influxdb.util.ClientUtil;
 import com.cqcy.ei.influxdb.util.InfluxDBUtil;
 import com.example.opc_common.entity.ReportDataPolicyItem;
+import com.example.opc_common.util.Blank;
 import com.example.opc_common.util.ConstantStr;
-import com.example.opc_common.util.DateUtil;
 import com.influxdb.client.InfluxDBClient;
 import com.influxdb.client.InfluxDBClientFactory;
 import com.influxdb.client.InfluxDBClientOptions;
@@ -25,12 +25,15 @@ import lombok.AllArgsConstructor;
 import okhttp3.OkHttpClient;
 import org.springframework.stereotype.Service;
 
+import java.time.LocalDateTime;
 import java.time.OffsetDateTime;
 import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 @Service
 @AllArgsConstructor
@@ -173,17 +176,21 @@ public class InFluxDBServiceImpl implements InFluxDBService {
         try {
             // 构造要写入的Point集合
             List<Point> points = new ArrayList<Point>();
-            policyItemList.forEach(policyItem -> {
+            //将空数据过滤掉
+            List<? extends ReportDataPolicyItem> collect = policyItemList.stream().filter(p -> Blank.isNotEmpty(p.getDataTime())).collect(Collectors.toList());
+            collect.forEach(c -> {
                 points.add(Point.measurement(measurement)
                         .addTag("type", ConstantStr.ORIGINAL_VALUE + "")
-                        .addField(policyItem.getId().toString(), policyItem.getDataValue())
-                        .time(DateUtil.strChangeDate(policyItem.getDataTime(),
-                                "yyyy-MM-dd HH:mm:ss.SSS").getTime(), WritePrecision.MS));
+                        .addField(c.getId().toString(), c.getDataValue())
+                        .time(LocalDateTime.parse(c.getDataTime(), DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"))
+                                        .toInstant(ZoneOffset.of("+8")).toEpochMilli(),
+                                WritePrecision.MS));
                 points.add(Point.measurement(measurement)
                         .addTag("type", ConstantStr.CALCULATED_VALUE + "")
-                        .addField(policyItem.getId().toString(), policyItem.getCountDataValue())
-                        .time(DateUtil.strChangeDate(policyItem.getDataTime(),
-                                "yyyy-MM-dd HH:mm:ss.SSS").getTime(), WritePrecision.MS));
+                        .addField(c.getId().toString(), c.getCountDataValue())
+                        .time(LocalDateTime.parse(c.getDataTime(), DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"))
+                                        .toInstant(ZoneOffset.of("+8")).toEpochMilli(),
+                                WritePrecision.MS));
             });
             influxDBClient.getWriteApiBlocking().writePoints(points);
         } finally {
@@ -211,13 +218,13 @@ public class InFluxDBServiceImpl implements InFluxDBService {
         try {
             // 构造要查询的sql
             StringBuilder sql = new StringBuilder();
-            Date endTime = new Date();
-            Date startTime = RangeEnum.HOUR.getEndTime(endTime, 1);
+            LocalDateTime endDateTime = LocalDateTime.now();
+            LocalDateTime startDateTime = endDateTime.minusHours(1);
             //from 指定数据源bucket
             sql.append("from(bucket: \"" + bucket + "\")")
                     //|> 管道连接符
                     //range 指定起始时间段
-                    .append("|> range(start: " + startTime.getTime() + ", stop: " + endTime.getTime() + ")")
+                    .append("|> range(start: " + startDateTime.toEpochSecond(ZoneOffset.of("+8")) + ", stop: " + endDateTime.toEpochSecond(ZoneOffset.of("+8")) + ")")
                     .append("|> filter(fn: (r) => r[\"type\"] == \"" + type + "\")")
                     //filter 过滤
                     .append("|> filter(fn: (r) => r[\"_measurement\"] == \"" + measurement + "\")");
@@ -242,7 +249,7 @@ public class InFluxDBServiceImpl implements InFluxDBService {
     }
 
     @Override
-    public List<Item> queryHistory(String bucket, String measurement, Date startTime, Date endTime, List<String> idList, String type) {
+    public List<Item> queryHistory(String bucket, String measurement, LocalDateTime startDateTime, LocalDateTime endDateTime, List<String> idList, String type) {
         OkHttpClient.Builder okHttpClientBuilder = new OkHttpClient.Builder()
                 .readTimeout(influxDBProperties.getReadTimeout(), TimeUnit.SECONDS)
                 .writeTimeout(influxDBProperties.getWriteTimeout(), TimeUnit.SECONDS)
@@ -264,7 +271,7 @@ public class InFluxDBServiceImpl implements InFluxDBService {
             sql.append("from(bucket: \"" + bucket + "\")")
                     //|> 管道连接符
                     //range 指定起始时间段
-                    .append("|> range(start: " + startTime.toInstant().getEpochSecond() + ", stop: " + endTime.toInstant().getEpochSecond() + ")")
+                    .append("|> range(start: " + startDateTime.toEpochSecond(ZoneOffset.of("+8")) + ", stop: " + endDateTime.toEpochSecond(ZoneOffset.of("+8")) + ")")
                     .append("|> filter(fn: (r) => r[\"type\"] == \"" + type + "\")")
                     //filter 过滤
                     .append("|> filter(fn: (r) => r[\"_measurement\"] == \"" + measurement + "\")");

+ 3 - 2
industry-system/industry-da/src/main/java/com/example/opc_da/util/InFluxDBServiceUtil.java

@@ -11,6 +11,7 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Component;
 
+import java.time.LocalDateTime;
 import java.util.*;
 import java.util.stream.Collectors;
 
@@ -63,7 +64,7 @@ public class InFluxDBServiceUtil {
     }
 
     public List<? extends ReportDataPolicyItem> exchangeHistoryData(List<? extends ReportDataPolicyItem> tList,
-                                                                    String type, Date startTime, Date endTime) {
+                                                                    String type, LocalDateTime startDateTime, LocalDateTime endDateTime) {
         if (Blank.isNotEmpty(tList)) {
             //通过传入的泛型,得到相应的数据项信息
             List<com.example.opc_common.entity.Item> itemList = itemGroupDao.getItemParentByTList(tList);
@@ -82,7 +83,7 @@ public class InFluxDBServiceUtil {
             Map<String, Map<String, List<Item>>> mapData = new HashMap<>();
             for (String policyId : policyData.keySet()) {
                 List<Item> itemDataHistory = inFluxDBService
-                        .queryHistory(bucket, policyId, startTime, endTime, policyData.get(policyId), type);
+                        .queryHistory(bucket, policyId, startDateTime, endDateTime, policyData.get(policyId), type);
                 mapData.put(policyId, itemDataHistory.stream().collect(Collectors.groupingBy(Item::getName)));
             }