|
@@ -1,138 +0,0 @@
|
|
|
-package com.example.opc_da.task;
|
|
|
-
|
|
|
-import cn.hutool.core.collection.CollUtil;
|
|
|
-import com.cqcy.ei.influxdb.entity.Item;
|
|
|
-import com.cqcy.ei.influxdb.service.QueryService;
|
|
|
-import com.example.opc_common.entity.ReportDataPolicy;
|
|
|
-import com.example.opc_common.entity.ReportDataPolicyItem;
|
|
|
-import com.example.opc_common.util.Blank;
|
|
|
-import com.example.opc_common.util.ConstantStr;
|
|
|
-import com.example.opc_common.util.MathUtil;
|
|
|
-import com.example.opc_da.util.QueryServiceUtil;
|
|
|
-import com.example.opc_da.util.RedisUtil;
|
|
|
-import lombok.extern.slf4j.Slf4j;
|
|
|
-
|
|
|
-import java.math.BigDecimal;
|
|
|
-import java.util.*;
|
|
|
-import java.util.stream.Collectors;
|
|
|
-
|
|
|
-@Slf4j
|
|
|
-public class PolicyReadTask extends TimerTask {
|
|
|
- private final Timer timer;
|
|
|
- private final QueryService queryService;
|
|
|
- private final RedisUtil redisUtil;
|
|
|
- private final AsyncTask asyncTask;
|
|
|
- private final Long endTime;
|
|
|
- private final ReportDataPolicy reportDataPolicy;
|
|
|
- private final String dataSourceId;
|
|
|
- private final List<String> items;
|
|
|
-
|
|
|
- /**
|
|
|
- * 有参构造
|
|
|
- */
|
|
|
- /**
|
|
|
- * @param timer 任务停止时,摧毁timer
|
|
|
- * @param queryService 从influxdb获取数据
|
|
|
- * @param redisUtil 往缓存中添加获取数据
|
|
|
- * @param asyncTask 使用异步任务,将获取的数据添加到数据库
|
|
|
- * @param endTime 定时任务停止时间
|
|
|
- */
|
|
|
- public PolicyReadTask(Timer timer,
|
|
|
- QueryService queryService,
|
|
|
- RedisUtil redisUtil,
|
|
|
- AsyncTask asyncTask,
|
|
|
- Long endTime,
|
|
|
- ReportDataPolicy reportDataPolicy,
|
|
|
- String dataSourceId,
|
|
|
- List<String> items) {
|
|
|
- this.timer = timer;
|
|
|
- this.queryService = queryService;
|
|
|
- this.redisUtil = redisUtil;
|
|
|
- this.asyncTask = asyncTask;
|
|
|
- this.endTime = endTime;
|
|
|
- this.reportDataPolicy = reportDataPolicy;
|
|
|
- this.dataSourceId = dataSourceId;
|
|
|
- this.items = items;
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public void run() {
|
|
|
-// log.info("定时器内部开始执行");
|
|
|
- //查看redis中此报表数据策略的状态
|
|
|
- if (!getPolicyRunState(reportDataPolicy.getId())) {
|
|
|
- timer.cancel();
|
|
|
- }
|
|
|
- //判断数据组是否到达了,今天的结束时间
|
|
|
- if (System.currentTimeMillis() >= endTime) {
|
|
|
- timer.cancel();
|
|
|
- }
|
|
|
- Integer readMode = reportDataPolicy.getReadMode();
|
|
|
- Integer id = reportDataPolicy.getId();
|
|
|
- List<ReportDataPolicyItem> policyItemList = reportDataPolicy.getPolicyItemList();
|
|
|
- //查询数据项相关的数据
|
|
|
- List<Item> itemDataByLast = queryService.getItemDataByLast(items, dataSourceId);
|
|
|
- if (CollUtil.isEmpty(itemDataByLast)) {
|
|
|
- return;
|
|
|
- }
|
|
|
- //判断数据是否为重复数据
|
|
|
-// if (true) {
|
|
|
-// return;
|
|
|
-// }
|
|
|
- //将得到的数据转换为Map<数据项名称,Item实体类>
|
|
|
- Map<String, Item> map = itemDataByLast.stream().collect(Collectors.toMap(Item::getName, i -> i));
|
|
|
- //将报表数据策略的数据项,赋值相应的值
|
|
|
- for (int i = 0; i < policyItemList.size(); i++) {
|
|
|
- ReportDataPolicyItem policyItem = policyItemList.get(i);
|
|
|
- Item item = map.get(policyItem.getItemReadName());
|
|
|
- QueryServiceUtil.itemParentCountValue(policyItem, item);
|
|
|
- }
|
|
|
-
|
|
|
- //按照频率读取
|
|
|
- if (readMode.equals(ConstantStr.ON_FREQUENCY)) {
|
|
|
- asyncTask.addData(id, policyItemList);
|
|
|
- //按照值读取
|
|
|
- } else if (readMode.equals(ConstantStr.ON_CHANGE)) {
|
|
|
- changeRead(id, policyItemList);
|
|
|
- //按照条件读取
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 按照值改变读取
|
|
|
- *
|
|
|
- * @param id
|
|
|
- * @param policyItemList
|
|
|
- */
|
|
|
- private void changeRead(Integer id, List<ReportDataPolicyItem> policyItemList) {
|
|
|
- List<ReportDataPolicyItem> validPolicyItemList = new ArrayList<>();
|
|
|
- for (ReportDataPolicyItem policyItem : policyItemList) {
|
|
|
- Integer itemId = policyItem.getItemId();
|
|
|
- BigDecimal dataValue = new BigDecimal(policyItem.getDataValue());
|
|
|
- if (MathUtil.isMeetChange(
|
|
|
- (BigDecimal) redisUtil.get(ConstantStr.VALUE + itemId)
|
|
|
- , dataValue
|
|
|
- , new BigDecimal(reportDataPolicy.getModeValue())
|
|
|
- , reportDataPolicy.getReadModeType()
|
|
|
- )) validPolicyItemList.add(policyItem);
|
|
|
- redisUtil.set(ConstantStr.VALUE + itemId, dataValue);
|
|
|
- }
|
|
|
- if (Blank.isNotEmpty(validPolicyItemList)) asyncTask.addData(id, validPolicyItemList);
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
- /**
|
|
|
- * 获取redis中此时报表数据策略的运行状态
|
|
|
- *
|
|
|
- * @param id
|
|
|
- * @return
|
|
|
- */
|
|
|
- public Boolean getPolicyRunState(Integer id) {
|
|
|
- Boolean flage = (Boolean) redisUtil.get(ConstantStr.REPORT_DATA_POLICY + id);
|
|
|
- if (Blank.isEmpty(flage)) {
|
|
|
- flage = false;
|
|
|
- }
|
|
|
- return flage;
|
|
|
- }
|
|
|
-
|
|
|
-}
|