Forráskód Böngészése

1、分库分表定时任务自动创建bucket(启动采集器、启动采集服务、跨年新bucket)。
2、后台首页添加帮助文档下载功能。
3、检查采集器是否需要自动启动

lhy 7 hónapja
szülő
commit
1431d698d5

+ 2 - 0
industry-admin/src/layout/components/Navbar.vue

@@ -199,6 +199,7 @@ import Breadcrumb from "@/components/Breadcrumb";
 import Hamburger from "@/components/Hamburger";
 import { getIndexNotice } from "@/api/dashboard";
 import {queryDictTypeByDictKeyTypes} from "@/api/system/dictType";
+import {downloadFileByStaticFile} from "@/utils/upload";
 // import ErrorLog from '@/components/ErrorLog'
 // import Screenfull from '@/components/Screenfull'
 // import SizeSelect from '@/components/SizeSelect'
@@ -355,6 +356,7 @@ export default {
       //   loading.close()
       //   showAlertWin(this, null, e)
       // })
+      downloadFileByStaticFile(process.env.VUE_APP_BASE_API + '/downloadFile/' + encodeURI("EI工业数据平台帮助手册V1.0.0.doc"))
     },
     /** 弹出层关闭事件 */
     dialogClose(done) {

+ 1 - 1
industry-system/cqcy-ei-influxdb/src/main/java/com/cqcy/ei/influxdb/service/impl/QueryServiceImpl.java

@@ -98,7 +98,7 @@ public class QueryServiceImpl implements QueryService {
         List<Map<String, Object>> list = InfluxDBUtil.getMeasurement(startTime, endTime, influxDBProperties);
         // 处理bucket可能不存在产生的报错
         for (int i = 0; i < list.size(); i++) {
-            inFluxDBService.checkAndCreateBucket(Convert.toStr(list.get(0).get("bucket")));
+            inFluxDBService.checkAndCreateBucket(Convert.toStr(list.get(i).get("bucket")));
         }
         if (list.size() == 1) {
             return queryMapper.getItemDataHistoryOne(startTime.toInstant().getEpochSecond(), endTime.toInstant().getEpochSecond(), items, dataSource);

+ 13 - 2
新采集器/fast-api/pom.xml

@@ -15,6 +15,7 @@
 	<description>Demo project for Spring Boot</description>
 	<properties>
 		<java.version>1.8</java.version>
+		<com.influxdb.influxdb-client-java>6.11.0</com.influxdb.influxdb-client-java>
 	</properties>
 	<dependencies>
 		<dependency>
@@ -31,13 +32,13 @@
 			<groupId>org.springframework.boot</groupId>
 			<artifactId>spring-boot-starter-web</artifactId>
 		</dependency>
-	
+
 		<dependency>
 			<groupId>cn.hutool</groupId>
 			<artifactId>hutool-all</artifactId>
 			<version>5.7.20</version>
 		</dependency>
-		
+
 		<dependency>
 			<groupId>org.springframework.boot</groupId>
 			<artifactId>spring-boot-starter-log4j2</artifactId>
@@ -54,6 +55,16 @@
 			<artifactId>spring-boot-starter-test</artifactId>
 			<scope>test</scope>
 		</dependency>
+		<dependency>
+			<groupId>com.influxdb</groupId>
+			<artifactId>influxdb-client-java</artifactId>
+			<version>${com.influxdb.influxdb-client-java}</version>
+		</dependency>
+		<dependency>
+			<groupId>com.squareup.okhttp3</groupId>
+			<artifactId>okhttp</artifactId>
+			<version>4.9.3</version>
+		</dependency>
 	</dependencies>
 
 	<build>

+ 63 - 0
新采集器/fast-api/src/main/java/com/ws/fastapi/FastApiApplication.java

@@ -1,5 +1,18 @@
 package com.ws.fastapi;
 
+import cn.hutool.core.date.DateUtil;
+import cn.hutool.core.io.FileUtil;
+import cn.hutool.core.io.file.FileReader;
+import cn.hutool.core.io.file.FileWriter;
+import cn.hutool.core.util.StrUtil;
+import cn.hutool.json.JSONArray;
+import cn.hutool.json.JSONObject;
+import cn.hutool.json.JSONUtil;
+import cn.hutool.log.StaticLog;
+import com.ws.fastapi.controller.ClientController;
+import com.ws.fastapi.entity.Result;
+import com.ws.fastapi.util.InfluxDBClientUtil;
+import com.ws.fastapi.util.ConfUtil;
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
 import org.springframework.context.annotation.Import;
@@ -9,6 +22,12 @@ import com.ws.fastapi.util.ExecUtil;
 
 import cn.hutool.extra.spring.SpringUtil;
 
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
 @EnableScheduling
 @SpringBootApplication
 @Import(SpringUtil.class)
@@ -20,6 +39,50 @@ public class FastApiApplication {
 		ExecUtil.registerDrive();
 		// 停止采集进程
 		ExecUtil.stopTelegraf();
+
+		// 检查分库,未创建则创建分库
+		InfluxDBClientUtil influxDBClientUtil = SpringUtil.getBean(InfluxDBClientUtil.class);
+		String path = ConfUtil.getPath();
+		if(FileUtil.exist(path + "/configer.json")){
+			FileReader reader = new FileReader(path + "/configer.json");
+			if(StrUtil.isNotEmpty(reader.readString())){
+				JSONObject json = JSONUtil.parseObj(reader.readString());
+				JSONObject output = json.getJSONObject("output");
+				Integer type = output.getInt("type", 0);
+				if (type > 0) {
+					// 创建分库
+					influxDBClientUtil.initClient(json).checkAndCreateBucket();
+				}
+
+				// 检查采集器是否需要自动启动
+				reloadStartTelegraf();
+			}
+		}
+	}
+
+	/**
+	 * 检查采集器是否需要自动启动
+	 */
+	public static void reloadStartTelegraf() {
+		String path = ConfUtil.getPath();
+		if(FileUtil.exist(path + "pid.json")) {
+			// 需要自动启动
+			FileReader reader = new FileReader(path + "/pid.json");
+			if(StrUtil.isNotEmpty(reader.readString())) {
+				JSONObject pidJsonObj = JSONUtil.parseObj(reader.readString());
+				List<String> list = new ArrayList<>();
+				JSONObject pidObj = new JSONObject();
+				for (Map.Entry<String, Object> entry : pidJsonObj.entrySet()) {
+					String itemGroupId = entry.getKey();
+					ExecUtil.startTelegraf("/telegraf_" + itemGroupId + ".conf");
+					String pid = ExecUtil.getTelegrafPid(list);
+					pidObj.set(itemGroupId, pid);
+					list.add(pid);
+				}
+				FileWriter writer1 = new FileWriter(path + "/pid.json");
+				writer1.write(pidObj.toJSONString(0));
+			}
+		}
 	}
 
 }

+ 30 - 4
新采集器/fast-api/src/main/java/com/ws/fastapi/config/InfluxDbTask.java

@@ -1,21 +1,27 @@
 package com.ws.fastapi.config;
 
 import cn.hutool.core.date.DateUtil;
+import cn.hutool.core.io.FileUtil;
 import cn.hutool.core.io.file.FileReader;
+import cn.hutool.core.util.StrUtil;
 import cn.hutool.json.JSONArray;
 import cn.hutool.json.JSONObject;
 import cn.hutool.json.JSONUtil;
+import com.ws.fastapi.util.InfluxDBClientUtil;
 import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.stereotype.Component;
 
 import com.ws.fastapi.util.ConfUtil;
 
+import javax.annotation.Resource;
 import java.util.Date;
-import java.util.Map;
 
 @Component
 public class InfluxDbTask {
 
+	@Resource
+	private InfluxDBClientUtil influxDBClientUtil;
+
 	@Scheduled(cron = "0 0 0 * * ?")
 	private void process() {
 		String path = ConfUtil.getPath();
@@ -25,10 +31,9 @@ public class InfluxDbTask {
 		Integer type = output.getInt("type");
 		if (type > 0) {
 			Date date = DateUtil.date();
-
-			output.set("bucket", output.getStr("bucket") + "_" + DateUtil.year(date));
+			String bucket = output.getStr("bucket") + "_" + DateUtil.year(date);
+			output.set("bucket", bucket);
 			json.set("output", output);
-
 			if (type == 2) {
 				JSONArray input = json.getJSONArray("input");
 				for (int i = 0; i < input.size(); i++) {
@@ -80,4 +85,25 @@ public class InfluxDbTask {
 
 	}
 
+	/**
+	 * 每年最后一分钟、最后两秒时,判断下一年的库是否创建
+	 */
+	@Scheduled(cron = "0,58 59 23 31 12 ?")
+	private void process2() {
+		String path = ConfUtil.getPath();
+		if(FileUtil.exist(path + "/configer.json")){
+			FileReader reader = new FileReader(path + "/configer.json");
+			if(StrUtil.isNotEmpty(reader.readString())){
+				JSONObject json = JSONUtil.parseObj(reader.readString());
+				JSONObject output = json.getJSONObject("output");
+				Integer type = output.getInt("type", 0);
+				if (type > 0) {
+					Date date = DateUtil.date();
+					String bucket = output.getStr("bucket") + "_" + (DateUtil.year(date) + 1);
+					// 检查分库,未创建则创建分库
+					influxDBClientUtil.initClient(json).checkAndCreateBucket(bucket);
+				}
+			}
+		}
+	}
 }

+ 9 - 1
新采集器/fast-api/src/main/java/com/ws/fastapi/controller/ClientController.java

@@ -1,6 +1,5 @@
 package com.ws.fastapi.controller;
 
-import java.io.BufferedReader;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -10,6 +9,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
+import com.ws.fastapi.util.InfluxDBClientUtil;
 import org.springframework.web.bind.annotation.GetMapping;
 import org.springframework.web.bind.annotation.PostMapping;
 import org.springframework.web.bind.annotation.RequestMapping;
@@ -29,10 +29,16 @@ import cn.hutool.json.JSONArray;
 import cn.hutool.json.JSONObject;
 import cn.hutool.json.JSONUtil;
 
+import javax.annotation.Resource;
+
 @RestController
 @RequestMapping("client/api")
 public class ClientController {
 
+
+	@Resource
+	private InfluxDBClientUtil influxDBClientUtil;
+
 	/**
 	 * 获取采集器状态
 	 *
@@ -74,6 +80,8 @@ public class ClientController {
 		}
 		FileWriter writer = new FileWriter(path + "/pid.json");
 		writer.write(pidJson.toJSONString(0));
+		// 检查分库,未创建则创建分库
+		influxDBClientUtil.initClient(json).checkAndCreateBucket();
 		return Result.okMsg("采集器启动成功");
 	}
 

+ 109 - 0
新采集器/fast-api/src/main/java/com/ws/fastapi/entity/InfluxDBProperties.java

@@ -0,0 +1,109 @@
+package com.ws.fastapi.entity;
+
+import org.springframework.stereotype.Component;
+
+@Component
+public class InfluxDBProperties {
+    // influxdb 连接
+    private String url;
+
+    // influxdb api token
+    private String token;
+
+    // influxdb 组织
+    private String org;
+
+    // influxdb 默认库
+    private String bucket;
+
+    // 分库方案 0、不分库不分表;1、按年分库;2、按年分库按月分表
+    private Integer subType;
+
+    // influxdb client 读取超时时间
+    private Integer readTimeout;
+
+    // influxdb client 写入超时时间
+    private Integer writeTimeout;
+
+    // influxdb client 连接超时时间
+    private Integer connectTimeout;
+
+
+    public String getUrl() {
+        return url;
+    }
+
+    public void setUrl(String url) {
+        this.url = url;
+    }
+
+    public String getToken() {
+        return token;
+    }
+
+    public void setToken(String token) {
+        this.token = token;
+    }
+
+    public String getOrg() {
+        return org;
+    }
+
+    public void setOrg(String org) {
+        this.org = org;
+    }
+
+    public String getBucket() {
+        return bucket;
+    }
+
+    public void setBucket(String bucket) {
+        this.bucket = bucket;
+    }
+
+    public Integer getSubType() {
+        return subType;
+    }
+
+    public void setSubType(Integer subType) {
+        this.subType = subType;
+    }
+
+    public Integer getReadTimeout() {
+        return readTimeout;
+    }
+
+    public void setReadTimeout(Integer readTimeout) {
+        this.readTimeout = readTimeout;
+    }
+
+    public Integer getWriteTimeout() {
+        return writeTimeout;
+    }
+
+    public void setWriteTimeout(Integer writeTimeout) {
+        this.writeTimeout = writeTimeout;
+    }
+
+    public Integer getConnectTimeout() {
+        return connectTimeout;
+    }
+
+    public void setConnectTimeout(Integer connectTimeout) {
+        this.connectTimeout = connectTimeout;
+    }
+
+    @Override
+    public String toString() {
+        return "InfluxDBProperties{" +
+                "url='" + url + '\'' +
+                ", token='" + token + '\'' +
+                ", org='" + org + '\'' +
+                ", bucket='" + bucket + '\'' +
+                ", subType=" + subType +
+                ", readTimeout=" + readTimeout +
+                ", writeTimeout=" + writeTimeout +
+                ", connectTimeout=" + connectTimeout +
+                '}';
+    }
+}

+ 126 - 0
新采集器/fast-api/src/main/java/com/ws/fastapi/util/InfluxDBClientUtil.java

@@ -0,0 +1,126 @@
+package com.ws.fastapi.util;
+
+import cn.hutool.core.date.DateUtil;
+import cn.hutool.core.io.FileUtil;
+import cn.hutool.core.util.StrUtil;
+import cn.hutool.log.StaticLog;
+import com.influxdb.client.InfluxDBClient;
+import com.influxdb.client.InfluxDBClientFactory;
+import com.influxdb.client.InfluxDBClientOptions;
+import com.influxdb.client.domain.Bucket;
+import com.influxdb.client.domain.Organization;
+import okhttp3.OkHttpClient;
+
+import cn.hutool.core.io.file.FileReader;
+import cn.hutool.json.JSONObject;
+import cn.hutool.json.JSONUtil;
+import com.ws.fastapi.entity.InfluxDBProperties;
+import org.springframework.stereotype.Component;
+
+import java.util.Date;
+import java.util.concurrent.TimeUnit;
+
+@Component
+public class InfluxDBClientUtil {
+
+    private InfluxDBProperties properties;
+
+    private InfluxDBClient client;
+    public InfluxDBClientUtil initClient(){
+        return initClient(null);
+    }
+    public InfluxDBClientUtil initClient(JSONObject configerJson){
+        if(configerJson == null){
+            initInfluxDBProperties();
+        }else {
+            initInfluxDBProperties(configerJson);
+        }
+        OkHttpClient.Builder okHttpClientBuilder = new OkHttpClient.Builder()
+                .readTimeout(properties.getReadTimeout(), TimeUnit.SECONDS)
+                .writeTimeout(properties.getWriteTimeout(), TimeUnit.SECONDS)
+                .connectTimeout(properties.getConnectTimeout(), TimeUnit.SECONDS);
+        // 设置客户端信息
+        InfluxDBClientOptions options = InfluxDBClientOptions.builder()
+                .okHttpClient(okHttpClientBuilder)
+                .url(properties.getUrl())
+                .authenticateToken(properties.getToken().toCharArray())
+                .org(properties.getOrg())
+                .build();
+        // 创建客户端
+        client = InfluxDBClientFactory.create(options);
+        return this;
+    }
+
+    private void initInfluxDBProperties(JSONObject configerJson){
+        JSONObject output = configerJson.getJSONObject("output");
+        Integer type = output.getInt("type");
+        String url = output.getStr("url");
+        String organization = output.getStr("organization");
+        String token = output.getStr("token");
+        String bucket = output.getStr("bucket");
+        properties = new InfluxDBProperties();
+        properties.setReadTimeout(3600);
+        properties.setWriteTimeout(3600);
+        properties.setConnectTimeout(3600);
+        properties.setSubType(type);
+        properties.setUrl(url);
+        properties.setOrg(organization);
+        properties.setToken(token);
+        properties.setBucket(bucket);
+    }
+
+    private void initInfluxDBProperties(){
+        String path = ConfUtil.getPath();
+        if(FileUtil.exist(path + "/configer.json")){
+            FileReader reader = new FileReader(path + "/configer.json");
+            if(StrUtil.isNotEmpty(reader.readString())){
+                JSONObject json = JSONUtil.parseObj(reader.readString());
+                initInfluxDBProperties(json);
+            }
+        }
+    }
+
+    private boolean checkBucketExist(String bucket) {
+        try {
+            Bucket find = client.getBucketsApi().findBucketByName(bucket);
+            return find != null;
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+        return false;
+    }
+
+    private boolean createBucket(String bucket) {
+        try {
+            // 查找org组织
+            Organization organization = client.getOrganizationsApi()
+                    .findOrganizations().stream()
+                    .filter(it -> properties.getOrg().equals(it.getName()))
+                    .findFirst()
+                    .orElseThrow(IllegalStateException::new);
+            // 创建bucket
+            client.getBucketsApi().createBucket(bucket, organization);
+            StaticLog.info(bucket + "分库创建成功...");
+            return true;
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+        return false;
+    }
+
+    public boolean checkAndCreateBucket(String bucket) {
+        if(client == null){
+            return false;
+        }
+        if (!checkBucketExist(bucket)) {
+            return createBucket(bucket);
+        }
+        return true;
+    }
+
+    public boolean checkAndCreateBucket() {
+        Date date = DateUtil.date();
+        String bucket = properties.getBucket() + "_" + DateUtil.year(date);
+        return checkAndCreateBucket(bucket);
+    }
+}