|
@@ -1,20 +1,36 @@
|
|
|
package com.cqcy.ei.influxdb.service.impl;
|
|
|
|
|
|
+import cn.hutool.core.collection.CollUtil;
|
|
|
import cn.hutool.core.util.StrUtil;
|
|
|
import com.cqcy.ei.influxdb.config.InfluxDBProperties;
|
|
|
+import com.cqcy.ei.influxdb.entity.Item;
|
|
|
import com.cqcy.ei.influxdb.entity.MoveMeasurementEntity;
|
|
|
import com.cqcy.ei.influxdb.entity.RangeEnum;
|
|
|
import com.cqcy.ei.influxdb.mapper.StoreMapper;
|
|
|
import com.cqcy.ei.influxdb.service.InFluxDBService;
|
|
|
import com.cqcy.ei.influxdb.util.ClientUtil;
|
|
|
+import com.cqcy.ei.influxdb.util.InfluxDBUtil;
|
|
|
+import com.example.opc_common.entity.ItemParent;
|
|
|
+import com.example.opc_common.util.ConstantStr;
|
|
|
+import com.example.opc_common.util.DateUtil;
|
|
|
+import com.influxdb.client.InfluxDBClient;
|
|
|
+import com.influxdb.client.InfluxDBClientFactory;
|
|
|
+import com.influxdb.client.InfluxDBClientOptions;
|
|
|
import com.influxdb.client.domain.Bucket;
|
|
|
import com.influxdb.client.domain.Organization;
|
|
|
+import com.influxdb.client.domain.WritePrecision;
|
|
|
+import com.influxdb.client.write.Point;
|
|
|
+import com.influxdb.query.FluxTable;
|
|
|
import lombok.AllArgsConstructor;
|
|
|
+import okhttp3.OkHttpClient;
|
|
|
import org.springframework.stereotype.Service;
|
|
|
|
|
|
import java.time.OffsetDateTime;
|
|
|
import java.time.ZoneOffset;
|
|
|
+import java.util.ArrayList;
|
|
|
import java.util.Date;
|
|
|
+import java.util.List;
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
|
|
|
@Service
|
|
|
@AllArgsConstructor
|
|
@@ -87,20 +103,20 @@ public class InFluxDBServiceImpl implements InFluxDBService {
|
|
|
@Override
|
|
|
public boolean deleteMeasurementByTime(Integer time, RangeEnum rangeEnum, String measurement, String bucket) {
|
|
|
Date startTime = new Date();
|
|
|
- Date endTime = rangeEnum.getEndTime(startTime,time);
|
|
|
+ Date endTime = rangeEnum.getEndTime(startTime, time);
|
|
|
return deleteMeasurementByTime(endTime, startTime, measurement, bucket);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public boolean moveMeasurementByTime(Date startTime, Date endTime, String bucket, String measurement, String targetBucket, String targetMeasurement) {
|
|
|
return storeMapper.subMeterMainByDate(
|
|
|
- new MoveMeasurementEntity()
|
|
|
- .startTime(startTime)
|
|
|
- .endTime(endTime)
|
|
|
- .bucket(bucket)
|
|
|
- .measurement(measurement)
|
|
|
- .targetBucket(targetBucket)
|
|
|
- .targetMeasurement(targetMeasurement)
|
|
|
+ new MoveMeasurementEntity()
|
|
|
+ .startTime(startTime)
|
|
|
+ .endTime(endTime)
|
|
|
+ .bucket(bucket)
|
|
|
+ .measurement(measurement)
|
|
|
+ .targetBucket(targetBucket)
|
|
|
+ .targetMeasurement(targetMeasurement)
|
|
|
);
|
|
|
}
|
|
|
|
|
@@ -126,7 +142,7 @@ public class InFluxDBServiceImpl implements InFluxDBService {
|
|
|
|
|
|
@Override
|
|
|
public boolean moveMeasurementByTime(Date startTime, Integer time, RangeEnum rangeEnum, String bucket, String measurement, String targetBucket, String targetMeasurement) {
|
|
|
- Date endTime = rangeEnum.getEndTime(startTime,time);
|
|
|
+ Date endTime = rangeEnum.getEndTime(startTime, time);
|
|
|
return moveMeasurementByTime(endTime, startTime, bucket, measurement, targetBucket, targetMeasurement);
|
|
|
}
|
|
|
|
|
@@ -134,4 +150,140 @@ public class InFluxDBServiceImpl implements InFluxDBService {
|
|
|
public Integer getMeasurementCountByTime(Date startTime, Date endTime) {
|
|
|
return storeMapper.getMeasurementCountByTime(startTime.toInstant().getEpochSecond(), endTime.toInstant().getEpochSecond());
|
|
|
}
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Boolean batchInsert(String bucket, String measurement, List<? extends ItemParent> itemParentList) {
|
|
|
+ OkHttpClient.Builder okHttpClientBuilder = new OkHttpClient.Builder()
|
|
|
+ .readTimeout(influxDBProperties.getReadTimeout(), TimeUnit.SECONDS)
|
|
|
+ .writeTimeout(influxDBProperties.getWriteTimeout(), TimeUnit.SECONDS)
|
|
|
+ .connectTimeout(influxDBProperties.getConnectTimeout(), TimeUnit.SECONDS);
|
|
|
+ // 设置客户端信息
|
|
|
+ InfluxDBClientOptions options = InfluxDBClientOptions.builder()
|
|
|
+ .okHttpClient(okHttpClientBuilder)
|
|
|
+ .bucket(bucket)
|
|
|
+ .url(influxDBProperties.getUrl())
|
|
|
+ .authenticateToken(influxDBProperties.getToken().toCharArray())
|
|
|
+ .org(influxDBProperties.getOrg())
|
|
|
+ .build();
|
|
|
+ // 创建连接配置对象
|
|
|
+ InfluxDBClient influxDBClient = InfluxDBClientFactory.create(options);
|
|
|
+ if (!checkBucketExist(bucket)) {
|
|
|
+ createBucket(bucket);
|
|
|
+ }
|
|
|
+ try {
|
|
|
+ // 构造要写入的Point集合
|
|
|
+ List<Point> points = new ArrayList<Point>();
|
|
|
+ itemParentList.forEach(itemParent -> {
|
|
|
+ points.add(Point.measurement(measurement)
|
|
|
+ .addTag("type", ConstantStr.ORIGINAL_VALUE + "")
|
|
|
+ .addField(itemParent.getItemReadName(), itemParent.getDataValue())
|
|
|
+ .time(DateUtil.strChangeDate(itemParent.getDataTime(),
|
|
|
+ "yyyy-MM-dd HH:mm:ss.SSS").getTime(), WritePrecision.MS));
|
|
|
+ points.add(Point.measurement(measurement)
|
|
|
+ .addTag("type", ConstantStr.CALCULATED_VALUE + "")
|
|
|
+ .addField(itemParent.getItemReadName(), itemParent.getCountDataValue())
|
|
|
+ .time(DateUtil.strChangeDate(itemParent.getDataTime(),
|
|
|
+ "yyyy-MM-dd HH:mm:ss.SSS").getTime(), WritePrecision.MS));
|
|
|
+ });
|
|
|
+ influxDBClient.getWriteApiBlocking().writePoints(points);
|
|
|
+ } finally {
|
|
|
+ influxDBClient.close();
|
|
|
+ }
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public List<Item> queryLast(String bucket, String measurement, List<String> items, String type) {
|
|
|
+ OkHttpClient.Builder okHttpClientBuilder = new OkHttpClient.Builder()
|
|
|
+ .readTimeout(influxDBProperties.getReadTimeout(), TimeUnit.SECONDS)
|
|
|
+ .writeTimeout(influxDBProperties.getWriteTimeout(), TimeUnit.SECONDS)
|
|
|
+ .connectTimeout(influxDBProperties.getConnectTimeout(), TimeUnit.SECONDS);
|
|
|
+ // 设置客户端信息
|
|
|
+ InfluxDBClientOptions options = InfluxDBClientOptions.builder()
|
|
|
+ .okHttpClient(okHttpClientBuilder)
|
|
|
+ .bucket(bucket)
|
|
|
+ .url(influxDBProperties.getUrl())
|
|
|
+ .authenticateToken(influxDBProperties.getToken().toCharArray())
|
|
|
+ .org(influxDBProperties.getOrg())
|
|
|
+ .build();
|
|
|
+ // 创建连接配置对象
|
|
|
+ InfluxDBClient influxDBClient = InfluxDBClientFactory.create(options);
|
|
|
+ try {
|
|
|
+ // 构造要查询的sql
|
|
|
+ StringBuilder sql = new StringBuilder();
|
|
|
+ Date endTime = new Date();
|
|
|
+ Date startTime = RangeEnum.HOUR.getEndTime(endTime, 1);
|
|
|
+ //from 指定数据源bucket
|
|
|
+ sql.append("from(bucket: \"" + bucket + "\")")
|
|
|
+ //|> 管道连接符
|
|
|
+ //range 指定起始时间段
|
|
|
+ .append("|> range(start: " + startTime.getTime() + ", stop: " + endTime.getTime() + ")")
|
|
|
+ .append("|> filter(fn: (r) => r[\"type\"] == \"" + type + "\")")
|
|
|
+ //filter 过滤
|
|
|
+ .append("|> filter(fn: (r) => r[\"_measurement\"] == \"" + measurement + "\")");
|
|
|
+ if (CollUtil.isNotEmpty(items)) {
|
|
|
+ sql.append("|> filter(fn: (r) => ");
|
|
|
+ for (int i = 0; i < items.size(); i++) {
|
|
|
+ sql.append("r[\"_field\"] ==\"" + items.get(i) + "\"");
|
|
|
+ if (i != items.size() - 1) sql.append(" or ");
|
|
|
+
|
|
|
+ }
|
|
|
+ sql.append(")");
|
|
|
+ }
|
|
|
+ //排序
|
|
|
+ sql.append("|> sort(columns:[\"_field\",\"_time\"])")
|
|
|
+ .append("|> last()");
|
|
|
+ System.out.println(sql);
|
|
|
+ List<FluxTable> fluxTableList = ClientUtil.client().getQueryApi().query(sql.toString());
|
|
|
+ return InfluxDBUtil.parseList(fluxTableList);
|
|
|
+ } finally {
|
|
|
+ influxDBClient.close();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public List<Item> queryHistory(String bucket, String measurement, Date startTime, Date endTime, List<String> items, String type) {
|
|
|
+ OkHttpClient.Builder okHttpClientBuilder = new OkHttpClient.Builder()
|
|
|
+ .readTimeout(influxDBProperties.getReadTimeout(), TimeUnit.SECONDS)
|
|
|
+ .writeTimeout(influxDBProperties.getWriteTimeout(), TimeUnit.SECONDS)
|
|
|
+ .connectTimeout(influxDBProperties.getConnectTimeout(), TimeUnit.SECONDS);
|
|
|
+ // 设置客户端信息
|
|
|
+ InfluxDBClientOptions options = InfluxDBClientOptions.builder()
|
|
|
+ .okHttpClient(okHttpClientBuilder)
|
|
|
+ .bucket(bucket)
|
|
|
+ .url(influxDBProperties.getUrl())
|
|
|
+ .authenticateToken(influxDBProperties.getToken().toCharArray())
|
|
|
+ .org(influxDBProperties.getOrg())
|
|
|
+ .build();
|
|
|
+ // 创建连接配置对象
|
|
|
+ InfluxDBClient influxDBClient = InfluxDBClientFactory.create(options);
|
|
|
+ try {
|
|
|
+ // 构造要查询的sql
|
|
|
+ StringBuilder sql = new StringBuilder();
|
|
|
+ //from 指定数据源bucket
|
|
|
+ sql.append("from(bucket: \"" + bucket + "\")")
|
|
|
+ //|> 管道连接符
|
|
|
+ //range 指定起始时间段
|
|
|
+ .append("|> range(start: " + startTime.getTime() + ", stop: " + endTime.getTime() + ")")
|
|
|
+ .append("|> filter(fn: (r) => r[\"type\"] == \"" + type + "\")")
|
|
|
+ //filter 过滤
|
|
|
+ .append("|> filter(fn: (r) => r[\"_measurement\"] == \"" + measurement + "\")");
|
|
|
+ if (CollUtil.isNotEmpty(items)) {
|
|
|
+ sql.append("|> filter(fn: (r) => ");
|
|
|
+ for (int i = 0; i < items.size(); i++) {
|
|
|
+ sql.append("r[\"_field\"] ==\"" + items.get(i) + "\"");
|
|
|
+ if (i != items.size() - 1) sql.append(" or ");
|
|
|
+
|
|
|
+ }
|
|
|
+ sql.append(")");
|
|
|
+ }
|
|
|
+ //排序
|
|
|
+ sql.append("|> sort(columns:[\"_field\",\"_time\"])");
|
|
|
+ System.out.println(sql);
|
|
|
+ List<FluxTable> fluxTableList = ClientUtil.client().getQueryApi().query(sql.toString());
|
|
|
+ return InfluxDBUtil.parseList(fluxTableList);
|
|
|
+ } finally {
|
|
|
+ influxDBClient.close();
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|