package com.slibra.quartz.task; import cn.hutool.core.date.DateUtil; import cn.hutool.http.HttpRequest; import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONArray; import com.alibaba.fastjson2.JSONObject; import com.alibaba.fastjson2.JSONWriter; import com.google.protobuf.ByteString; import com.slibra.business.domain.*; import com.slibra.business.mapper.*; import com.slibra.business.req.ChatReq; import com.slibra.business.req.DecisionReq; import com.slibra.common.constant.MyConstants; import com.slibra.common.enums.BusinessEnum; import com.slibra.common.enums.DataSourceType; import com.slibra.common.utils.DateUtils; import com.slibra.common.utils.ip.IpUtils; import com.slibra.common.utils.uuid.IdUtils; import com.slibra.framework.datasource.DynamicDataSourceContextHolder; import inference.InferenceAPIsServiceGrpc; import inference.PredictionResponse; import inference.PredictionsRequest; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import com.slibra.common.utils.StringUtils; import org.springframework.util.CollectionUtils; import java.io.IOException; import java.math.BigDecimal; import java.math.RoundingMode; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.*; import static com.slibra.common.constant.MyConstants.*; /** * 定时任务调度测试 * * */ @Component("ryTask") @Slf4j public class RyTask { public void ryMultipleParams(String s, Boolean b, Long l, Double d, Integer i) { System.out.println(StringUtils.format("执行多参方法: 字符串类型{},布尔类型{},长整型{},浮点型{},整形{}", s, b, l, d, i)); } public void ryParams(String params) { System.out.println("执行有参方法:" + params); } public void ryNoParams() { System.out.println("执行无参方法"); } //----------------------------------------------下面是新增的方法---------------------------------------------- @Autowired private TXinyiIndustryMapper xinyiIndustryMapper; @Autowired private TXinyiRobotMapper xinyiRobotMapper; @Autowired private TXinyiNormConfigMapper xinyiNormConfigMapper; @Autowired private TXinyiWarningRecordMapper xinyiWarningRecordMapper; @Autowired private TXinyiChatRecordMapper xinyiChatRecordMapper; @Autowired private TXinyiDailyMapper xinyiDailyMapper; // public final static StopWatch watch = new StopWatch("task"); public static final String[] queryTags = {"信义污水厂JS_COD_Value","信义污水厂JS_PH_Value","信义污水厂JS_SS_Value","信义污水厂JS_ZL_Value","信义污水厂JS_ZA_Value","信义污水厂JS_AD_Value","信义污水厂JS_T_Value","信义污水厂进水泵房液位","信义污水厂出水瞬时流量","信义污水厂升级出水COD","信义污水厂升级出水PH","信义污水厂升级出水SS","信义污水厂升级出水TN","信义污水厂升级出水TP","信义污水厂升级出水氨氮","信义污水厂AIT202_Value","信义污水厂AIT203_Value","信义污水厂AIT207_Value","信义污水厂AIT206_Value","信义污水厂AIT209_Value","信义污水厂AIT210_Value","信义污水厂进水TDS","信义污水厂FT101_Value","信义污水厂SWCHHYHLB1_R_Value","信义污水厂SWCHHYHLB2_R_Value","信义污水厂SWCHHYHLB3_R_Value","信义污水厂SWCHHYHLB4_R_Value","信义污水厂SWCHHYHLB5_R_Value","信义污水厂SWCHHYHLB6_R_Value","信义污水厂SWCWNHLB1_R_Value","信义污水厂SWCWNHLB2_R_Value","信义污水厂SWCWNHLB3_R_Value","信义污水厂SWCWNHLB4_R_Value","信义污水厂SWCWNHLB5_R_Value","信义污水厂GFJ1_R_Value","信义污水厂GFJ2_R_Value","信义污水厂GFJ3_R_Value","信义污水厂GFJ4_R_Value","信义污水厂GFJ5_R_Value","信义污水厂GFJ6_R_Value","信义污水厂GFJ1_KQLL_Value","信义污水厂GFJ2_KQLL_Value","信义污水厂GFJ3_KQLL_Value","信义污水厂GFJ4_KQLL_Value","信义污水厂GFJ5_KQLL_Value","信义污水厂GFJ6_KQLL_Value","信义污水厂实际碳源加药量","信义污水厂除磷加药瞬时流量", "信义污水厂_除磷P04预测值_"}; /** * 定时从工业库获取数据 * * 2024年4月17日17:44:15 调整逻辑:考虑到因断电等情况导致服务断电,所以不再同步最近一小时,而是同步从上次成功的最后一条数据开始。 */ public void getIndustryData(){ log.info("进入了定时同步工业库数据的任务"); //耗时工具 // watch.start("parseJob"); // 给定时间段的起始时间和结束时间 LocalDateTime endTime = LocalDateTime.now(); // LocalDateTime startTime = endTime.plusMinutes(-60); //获取上次最后一条同步的数据的日期到 分钟维度 String lastDateHour = this.xinyiIndustryMapper.getLastMinute(); log.info("获取上次同步工业库的最后一条记录的时间是{}", lastDateHour); lastDateHour = lastDateHour + ":00"; //开始时间 LocalDateTime startTime = LocalDateTime.parse(lastDateHour.replaceAll("/", "-").replace(" ", "T")); startTime = startTime.plusMinutes(60L);//加一分钟 从上次最后一条记录的下一分钟开始 /*LocalDateTime startTime = LocalDateTime.parse("2024-02-26T00:00:00"); LocalDateTime endTime = LocalDateTime.parse("2024-02-27T00:00:00");*/ // 每个小时的时间格式 DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); // 循环按小时分割 LocalDateTime currentHour = startTime; //最终获取的数据 Map needMap = new LinkedHashMap<>(); while (currentHour.isBefore(endTime)) { String begin = currentHour.format(formatter); String end = currentHour.plusMinutes(5).format(formatter); // 输出当前小时的起始时间和结束时间 System.out.println("起始时间:" + begin); System.out.println("结束时间:" + end); // 当前小时加一小时,作为下一个小时的起始时间 currentHour = currentHour.plusMinutes(5); //每个小时查询一次数据 String url = "http://10.0.0.27:4568/api/v1/khquerydata"; HashMap req = new HashMap<>(); req.put("tagNames", queryTags); req.put("startTime", begin); req.put("endTime", end); req.put("recordNumbers", 100000); String body = HttpRequest.post(url).header("Authorization", "c2E6c2E=").header("clientName", "hongshan").body(JSON.toJSONString(req)).execute().body(); // System.out.println("body = " + body); List> list = new ArrayList<>(); //行转列数据处理 for (String queryTag : queryTags) { JSONArray array = JSON.parseObject(body).getJSONArray(queryTag); //特殊数据处理一 if(Objects.isNull(array) || array.isEmpty()){ System.out.println(queryTag + "查询到了空的数据,跳过本次循环"); continue; } int size = array.size(); //特殊数据处理二 if("0".equals(array.get(1) + "")){ System.out.println(queryTag + "查询到了数据,但是数据集合只有一条,且都是0"); continue; } //结合至少62个数据才满足条件(有可能获取不到) /*if(size < 62){ System.out.println(queryTag + "查询到了不符合条件的数据,跳过本次循环"); continue; }*/ //存放的数据集 //利用map去重 HashMap map = new LinkedHashMap<>(); for (int i = 2; i < size; i++) { // System.out.println(i + "" + array.get(i)); JSONArray oneRecord = JSON.parseArray(JSON.toJSONString(array.get(i))); //处理为空或者为0的数据 Object timeStampValue = oneRecord.get(2); if(Objects.isNull(timeStampValue) || "0".equals(timeStampValue + "")) continue; BigDecimal value = Objects.isNull(oneRecord.get(0)) ? null : new BigDecimal(oneRecord.get(0) + ""); long timestamp = (long) timeStampValue; String format = DateUtil.format(new Date(timestamp), DateUtils.YYYYMMDDHH_TS); map.put(format, queryTag + "-" + value); } list.add(map); } Set recordTimeSet = new HashSet<>(); Map recordMap = new HashMap<>(); for (int i = 0; i < list.size(); i++) { HashMap map = list.get(i); int finalJ = i; map.forEach((k, v) ->{ TXinyiIndustry industry = null; if(!recordTimeSet.contains(k)){//第一次 industry = new TXinyiIndustry(); recordTimeSet.add(k); recordMap.put(k, industry); }else{ industry = recordMap.get(k); } industry.setTestTime(k + ":00"); //2024年4月15日11:19:52 额外增加2个字段 industry.setTestDate(k.substring(0,10)); industry.setTestHour(k.substring(0,13)); //解析值 String[] split = v.split("-"); String type = split[0]; BigDecimal value = new BigDecimal(split[1]); if ("信义污水厂JS_COD_Value".equals(type)) { industry.setJsCod(value); } else if ("信义污水厂JS_PH_Value".equals(type)) { industry.setJsPh(value); } else if ("信义污水厂JS_SS_Value".equals(type)) { industry.setJsSs(value); } else if ("信义污水厂JS_ZL_Value".equals(type)) { industry.setJsTp(value); } else if ("信义污水厂JS_ZA_Value".equals(type)) { industry.setJsTn(value); } else if ("信义污水厂JS_AD_Value".equals(type)) { industry.setJsNh3(value); } else if ("信义污水厂JS_T_Value".equals(type)) { industry.setJsSwPh(value); } else if ("信义污水厂进水泵房液位".equals(type)) { industry.setJsBfyw(value); } else if ("信义污水厂出水瞬时流量".equals(type)) { industry.setCsSlqc(value); } else if ("信义污水厂升级出水COD".equals(type)) { industry.setCsCod(value); } else if ("信义污水厂升级出水PH".equals(type)) { industry.setCsPh(value); } else if ("信义污水厂升级出水SS".equals(type)) { industry.setCsSs(value); } else if ("信义污水厂升级出水TN".equals(type)) { industry.setCsTn(value); } else if ("信义污水厂升级出水TP".equals(type)) { industry.setCsTp(value); } else if ("信义污水厂升级出水氨氮".equals(type)) { industry.setCsNh3(value); } else if ("信义污水厂AIT202_Value".equals(type)) { industry.setOneHyzdDo(value); } else if ("信义污水厂AIT203_Value".equals(type)) { industry.setOneHymdDo(value); } else if ("信义污水厂AIT207_Value".equals(type)) { industry.setTwoHyzdDo(value); } else if ("信义污水厂AIT206_Value".equals(type)) { industry.setTwoHymdDo(value); } else if ("信义污水厂AIT209_Value".equals(type)) { industry.setOneMlss(value); } else if ("信义污水厂AIT210_Value".equals(type)) { industry.setTwoMlss(value); } else if ("信义污水厂进水TDS".equals(type)) { industry.setJsTds(value); } else if ("信义污水厂FT101_Value".equals(type)) { industry.setJsSlq(value); } else if ("信义污水厂SWCHHYHLB1_R_Value".equals(type)) { industry.setNHlbOneGp(value); } else if ("信义污水厂SWCHHYHLB2_R_Value".equals(type)) { industry.setNHlbTwoGp(value); } else if ("信义污水厂SWCHHYHLB3_R_Value".equals(type)) { industry.setNHlbThreeGp(value); } else if ("信义污水厂SWCHHYHLB4_R_Value".equals(type)) { industry.setNHlbFourGp(value); } else if ("信义污水厂SWCHHYHLB5_R_Value".equals(type)) { industry.setNhlBFiveGp(value); } else if ("信义污水厂SWCHHYHLB6_R_Value".equals(type)) { industry.setNHlbSixGp(value); } else if ("信义污水厂SWCWNHLB1_R_Value".equals(type)) { industry.setWHlbOneGp(value); } else if ("信义污水厂SWCWNHLB2_R_Value".equals(type)) { industry.setWHlbTwoGp(value); } else if ("信义污水厂SWCWNHLB3_R_Value".equals(type)) { industry.setWHlbThreeGp(value); } else if ("信义污水厂SWCWNHLB4_R_Value".equals(type)) { industry.setWHlbFourGp(value); } else if ("信义污水厂SWCWNHLB5_R_Value".equals(type)) { industry.setWHlbFiveGp(value); } else if ("信义污水厂GFJ1_R_Value".equals(type)) { industry.setFjOne(value); } else if ("信义污水厂GFJ2_R_Value".equals(type)) { industry.setFjTwo(value); } else if ("信义污水厂GFJ3_R_Value".equals(type)) { industry.setFjThree(value); } else if ("信义污水厂GFJ4_R_Value".equals(type)) { industry.setFjFour(value); } else if ("信义污水厂GFJ5_R_Value".equals(type)) { industry.setFjFive(value); } else if ("信义污水厂GFJ6_R_Value".equals(type)) { industry.setFjSix(value); } else if ("信义污水厂GFJ1_KQLL_Value".equals(type)) { industry.setKqllOne(value); } else if ("信义污水厂GFJ2_KQLL_Value".equals(type)) { industry.setKqllTwo(value); } else if ("信义污水厂GFJ3_KQLL_Value".equals(type)) { industry.setKqllThree(value); } else if ("信义污水厂GFJ4_KQLL_Value".equals(type)) { industry.setKqllFour(value); } else if ("信义污水厂GFJ5_KQLL_Value".equals(type)) { industry.setKqllFive(value); } else if ("信义污水厂GFJ6_KQLL_Value".equals(type)) { industry.setKqllSix(value); }else if ("信义污水厂实际碳源加药量".equals(type)) { industry.setSJTYJLY(value); }else if ("信义污水厂除磷加药瞬时流量".equals(type)) { industry.setCLJYSSLL(value); } else if ("信义污水厂_除磷P04预测值_".equals(type)) { industry.setCLP04YCZ(value); } //只有最后一次才执行数据库添加 if(finalJ == list.size()-1){ needMap.put(industry.getTestHour(), industry); } }); } } //保存数据 触发告警 决策 问答记录等等 needMap.forEach((k, industry) ->{ //2024年4月22日15:45:24 额外保存两个字段 数组 List extraList = new ArrayList<>(); extraList.add(industry.getOneHymdDo()); extraList.add(industry.getTwoHymdDo()); industry.setHycRjyAll(JSON.toJSONString(extraList)); extraList.clear(); extraList.add(industry.getOneHyzdDo()); extraList.add(industry.getTwoHyzdDo()); industry.setHycRjyZdAll(JSON.toJSONString(extraList)); extraList.clear(); extraList.add(industry.getOneMlss()); extraList.add(industry.getTwoMlss()); industry.setHycWnndAll(JSON.toJSONString(extraList)); //插入数据库 xinyiIndustryMapper.insertTXinyiIndustry(industry); //判断是否触发告警、接触告警、保存决策等等 this.handleWarning(industry); }); //执行完成 测试执行时间 //计时结束 // watch.stop(); // System.out.println(watch.getLastTaskName() + " 执行耗时:" + watch.getLastTaskTimeMillis() + " ms"); } private void handleWarning(TXinyiIndustry tXinyiIndustry) { log.info("进入了定时任务保存工业库数据并触发报警操作"); //获取配置表 List tXinyiNormConfigs = this.xinyiNormConfigMapper.selectTXinyiNormConfigList(null); if(CollectionUtils.isEmpty(tXinyiNormConfigs)) throw new RuntimeException("未查询到配置信息"); TXinyiNormConfig normConfig = tXinyiNormConfigs.get(0); //水质报警 this.handleSZWarning(tXinyiIndustry, normConfig); //2024年5月28日14:14:26 下面是新增的 生化报警处理 this.handleSHWarning(tXinyiIndustry, normConfig); } private void handleSHWarning(TXinyiIndustry tXinyiIndustry, TXinyiNormConfig normConfig) { //判断对应指标是否报警 然后调研大模型获取决策信息 BigDecimal jsSlq = tXinyiIndustry.getJsSlq(); //内回流比报警 BigDecimal nhlbSjz = normConfig.getNhlbSjz();//400 BigDecimal nhlbnkSxz = normConfig.getNhlbnkSxz();//360 BigDecimal nhlbqdsl = normConfig.getNHLBQDSL(); BigDecimal nhlbdsjll = normConfig.getNHLBDSJLL(); BigDecimal nhlbgzxl = normConfig.getNHLBGZXL(); if(!Objects.isNull(jsSlq) && !Objects.isNull(nhlbnkSxz) && !Objects.isNull(nhlbqdsl) && !Objects.isNull(nhlbdsjll) && !Objects.isNull(nhlbgzxl)){ BigDecimal divide = nhlbqdsl.multiply(nhlbdsjll).multiply(nhlbgzxl).divide(jsSlq, 4, RoundingMode.HALF_UP); if(!Objects.isNull(nhlbSjz)){ handleXinYiWarningsSH(nhlbSjz, divide, nhlbnkSxz, BusinessEnum.WarningCategoryEnum.NHLB.getCode(), tXinyiIndustry, normConfig); } } //外回流比报警 BigDecimal whlbqdsl = normConfig.getWHLBQDSL(); BigDecimal whlbdsjll = normConfig.getWHLBDSJLL(); BigDecimal whlbgzxl = normConfig.getWHLBGZXL(); BigDecimal whlbnkXxz = normConfig.getWhlbnkXxz(); BigDecimal whlbSjz = normConfig.getWhlbSjz(); if(!Objects.isNull(whlbqdsl) && !Objects.isNull(whlbdsjll) && !Objects.isNull(whlbgzxl)){ BigDecimal divide = whlbqdsl.multiply(whlbdsjll).multiply(whlbgzxl).divide(jsSlq, 4, RoundingMode.HALF_UP); if(!Objects.isNull(whlbSjz)){ handleXinYiWarningsSH(whlbSjz, divide, whlbnkXxz, BusinessEnum.WarningCategoryEnum.WHLB.getCode(), tXinyiIndustry, normConfig); } } //最新的一条日报数据 List tXinyiDailies = this.xinyiDailyMapper.selectTXinyiDailyList(null); if(CollectionUtils.isEmpty(tXinyiDailies)) return;//肯定不会出现这种情况 因为有很多历史数据了 TXinyiDaily tXinyiDaily = tXinyiDailies.get(0); log.info("生化报警获取日报的最新的一条数据为{}", JSON.toJSONString(tXinyiDaily)); BigDecimal jsBod5 = tXinyiDaily.getJsBod5(); //污泥负荷(需要从日报获取数据) 计算 + 部分数据从日报获取 //污泥负荷=[Ls]=24*([BOD_in]-[BOD_off])*[Q_in]/[MLSS]/([V_hao]+[V_que]+[V_yan])/2 kgBOD/(kgMLSS·d) //todo 后面再加 计算太复杂 //碳氮比(需要从日报获取数据) //进水碳氮比=[b]=[BOD_in]/[TN_in] BigDecimal jsTn = tXinyiIndustry.getJsTn(); if(!Objects.isNull(jsBod5) && !Objects.isNull(jsTn)){ BigDecimal divide = jsBod5.divide(jsTn, 4, RoundingMode.HALF_UP); BigDecimal jstdbnkzSxz = normConfig.getJstdbnkzSxz(); BigDecimal cstdbSjz = normConfig.getCstdbSjz(); if(!Objects.isNull(whlbSjz)){ handleXinYiWarningsSH(cstdbSjz, divide, jstdbnkzSxz, BusinessEnum.WarningCategoryEnum.TDB.getCode(), tXinyiIndustry, normConfig); } } //碳磷比(需要从日报获取数据) //进水碳磷比=[c]=[BOD_in]/[TP_in] BigDecimal jsTp = tXinyiIndustry.getJsTp(); BigDecimal jstlbSjz = normConfig.getJstlbSjz(); if(!Objects.isNull(jsBod5) && !Objects.isNull(jsTp)){ BigDecimal divide = jsBod5.divide(jsTp, 4, RoundingMode.HALF_UP); BigDecimal jstlbNkz = normConfig.getJstlbNkz(); if(!Objects.isNull(jstlbSjz)){ handleXinYiWarningsSH(jstlbSjz, divide, jstlbNkz, BusinessEnum.WarningCategoryEnum.TLB.getCode(), tXinyiIndustry, normConfig); } } //BOD比COD(需要从日报获取数据) //进水BOD与COD比值数据=[d]=[BOD_in]/[COD_in] BigDecimal jsCod = tXinyiIndustry.getJsCod(); if(!Objects.isNull(jsBod5) && !Objects.isNull(jsCod)){ BigDecimal jsbodycodbzSzj = normConfig.getJsbodycodbzSzj(); String jsbodycodbzGkz = normConfig.getJsbodycodbzGkz(); if(StringUtils.isNotBlank(jsbodycodbzGkz) && jsbodycodbzGkz.contains("-")){ String[] split = jsbodycodbzGkz.split("-"); BigDecimal gkz = new BigDecimal(split[1]); BigDecimal divide = jsBod5.divide(jsCod, 4, RoundingMode.HALF_UP); handleXinYiWarningsSH(gkz, divide, jsbodycodbzSzj, BusinessEnum.WarningCategoryEnum.BODCODB.getCode(), tXinyiIndustry, normConfig); } } //好氧区DO(一池) BigDecimal shcHyOneDo = tXinyiIndustry.getOneHymdDo(); BigDecimal hycrjysjzSxz = normConfig.getHycrjysjzSxz(); BigDecimal hycrjyNkz = normConfig.getHycrjyNkz(); if(!Objects.isNull(shcHyOneDo) && !Objects.isNull(hycrjysjzSxz)){ handleXinYiWarningsSH(hycrjysjzSxz, shcHyOneDo, hycrjyNkz, BusinessEnum.WarningCategoryEnum.HYQDO_ONE.getCode(), tXinyiIndustry, normConfig); } //好氧区DO(二池) BigDecimal shcHyTwoDo = tXinyiIndustry.getTwoHymdDo(); if(!Objects.isNull(shcHyTwoDo) && !Objects.isNull(hycrjysjzSxz)){ handleXinYiWarningsSH(hycrjysjzSxz, shcHyTwoDo, hycrjyNkz, BusinessEnum.WarningCategoryEnum.HYQDO_TWO.getCode(), tXinyiIndustry, normConfig); } //气水比(需要从日报获取数据) BigDecimal gfjgzts = normConfig.getGFJGZTS(); BigDecimal gfjckll = normConfig.getGFJCKLL(); BigDecimal shcqbSjz = normConfig.getShcqbSjz(); if(!Objects.isNull(gfjgzts) && !Objects.isNull(gfjckll) && !Objects.isNull(shcqbSjz) && !Objects.isNull(jsSlq)){ BigDecimal qsb = gfjgzts.multiply(gfjckll).divide(jsSlq, 4, RoundingMode.HALF_UP); BigDecimal shcqbNkz = normConfig.getShcqbNkz(); handleXinYiWarningsSH(shcqbSjz, qsb, shcqbNkz, BusinessEnum.WarningCategoryEnum.WHLB.getCode(), tXinyiIndustry, normConfig); } } /** * 处理信义生化报警的逻辑 统一处理 * @param bzz * @param currentVal * @param gkz * @param category * @param tXinyiIndustry * @param normConfig */ private void handleXinYiWarningsSH(BigDecimal bzz, BigDecimal currentVal, BigDecimal gkz, String category, TXinyiIndustry tXinyiIndustry, TXinyiNormConfig normConfig) { BigDecimal multiply = bzz.multiply(new BigDecimal(MyConstants.SCALE_VALUE)); TXinyiWarningRecord tXinyiWarningRecord = new TXinyiWarningRecord(); /*String category = BusinessEnum.WarningCategoryEnum.CS_AD.getCode();*/ tXinyiWarningRecord.setStatus(0); tXinyiWarningRecord.setType(1); tXinyiWarningRecord.setCategory(category); tXinyiWarningRecord.setTime(DateUtils.getNowDate()); tXinyiWarningRecord.setWarningVal(currentVal); tXinyiWarningRecord.setDesignVal(bzz); tXinyiWarningRecord.setControlVal(gkz); tXinyiWarningRecord.setCreateBy(WARNING_DEFAULT_CREATE); tXinyiWarningRecord.setCreateTime(DateUtils.getNowDate()); //2024年5月25日17:52:33 如果工业库获取不到数据,也触发报警,但是不调用决策接口 if (Objects.isNull(currentVal)) { tXinyiWarningRecord.setReason(category + EXCEPTION_WARNING); tXinyiWarningRecord.setLevel(WARNING_LEVEL_NO_DATE); } else if (currentVal.compareTo(multiply) > 0) {//一级 tXinyiWarningRecord.setReason(category + CHAOBIAO_WARNING); tXinyiWarningRecord.setLevel(WARNING_LEVEL_ONE); } else if (currentVal.compareTo(bzz) >= 0 && currentVal.compareTo(multiply) <= 0) {//二级 tXinyiWarningRecord.setReason(category + CHAOBIAO_WARNING); tXinyiWarningRecord.setLevel(WARNING_LEVEL_TWO); } else if (!Objects.isNull(gkz) && currentVal.compareTo(gkz) > 0) { tXinyiWarningRecord.setReason(category + CHAOGUANKONG_WARNING); tXinyiWarningRecord.setLevel(WARNING_LEVEL_THREE); } else { tXinyiWarningRecord = null;//这种的无需处理 } //当前状态正常 需要查询历史有无正在报警的数据,如果有,将报警状态改完2(系统自动关闭) List tXinyiWarningRecords = this.xinyiWarningRecordMapper.selectTXinyiWarningRecordList(TXinyiWarningRecord.builder().delFlag(0).type(1).category(category).status(0).build()); if(Objects.isNull(tXinyiWarningRecord)){//数据正常,无告警信息 if(!CollectionUtils.isEmpty(tXinyiWarningRecords)){ log.info( "{}:现在恢复正常,历史报警数据为{}", category,JSON.toJSONString(tXinyiWarningRecords)); for (TXinyiWarningRecord xinyiWarningRecord : tXinyiWarningRecords) { xinyiWarningRecord.setStatus(2); Date nowDate = DateUtils.getNowDate(); xinyiWarningRecord.setOffTime(nowDate); xinyiWarningRecord.setUpdateTime(nowDate); xinyiWarningRecord.setUpdateBy(WARNING_DEFAULT_CREATE); this.xinyiWarningRecordMapper.updateTXinyiWarningRecord(xinyiWarningRecord); } } }else{//有新的告警信息 if(CollectionUtils.isEmpty(tXinyiWarningRecords)){//之前没有告警记录 //保存到数据库中 this.xinyiWarningRecordMapper.insertTXinyiWarningRecord(tXinyiWarningRecord); if(WARNING_LEVEL_NO_DATE.equals(tXinyiWarningRecord.getLevel())){ //只保存一个普通的问答记录 不需要调用决策信息,但是实时数据还是要记录的 this.addChatRecordByWarning(tXinyiWarningRecord, tXinyiIndustry, normConfig); }else { //继续调用决策(普通问答) this.askBigModelForSHWarning(tXinyiWarningRecord, tXinyiIndustry, normConfig); } }else{ log.info("{}:之前已经有过告警记录了,且还是继续报警,无需重复添加报警,但是决策仍然要调用", category); for (TXinyiWarningRecord xinyiWarningRecord : tXinyiWarningRecords) {//理论上只有一个的 if(WARNING_LEVEL_NO_DATE.equals(tXinyiWarningRecord.getLevel())){ //只保存一个普通的问答记录 不需要调用决策信息,但是实时数据还是要记录的 this.addChatRecordByWarning(tXinyiWarningRecord, tXinyiIndustry, normConfig); }else { //继续调用决策(普通问答) this.askBigModelForSHWarning(xinyiWarningRecord, tXinyiIndustry, normConfig); } } } } } private void askBigModelForSHWarning(TXinyiWarningRecord xinyiWarningRecord, TXinyiIndustry tXinyiIndustry, TXinyiNormConfig normConfig) { log.info("进入了后台接口调⽤⼤模型获取问答结果处理"); StringBuilder sb = new StringBuilder(); String sessionId = IdUtils.simpleUUID(); ChatReq chatReq = new ChatReq(); // String ipAddr = IpUtils.getIpAddr();//获取用户的ip地址 传给大模型 String ipAddr = "";//获取用户的ip地址 传给大模型 定时任务获取不到ip地址 int counts = 1;//默认是第一次 //这种问答 没有历史问答的概念 直接把问题扔进去就行 无需查询历史记录 List historyDates = new ArrayList<>(); //构建问题(替换提示词中的占位符) String shWarningPrompt = SH_WARNING_PROMPT; shWarningPrompt =shWarningPrompt.replace("#{0}", xinyiWarningRecord.getReason()); shWarningPrompt =shWarningPrompt.replace("#{1}", String.valueOf(xinyiWarningRecord.getDesignVal())); shWarningPrompt =shWarningPrompt.replace("#{2}", String.valueOf(xinyiWarningRecord.getControlVal())); shWarningPrompt =shWarningPrompt.replace("#{3}", String.valueOf(xinyiWarningRecord.getWarningVal())); historyDates.add(shWarningPrompt); // 获取输出流 ManagedChannel channel = null; try { channel = ManagedChannelBuilder.forAddress("10.0.0.24", 17070) .usePlaintext() .build(); InferenceAPIsServiceGrpc.InferenceAPIsServiceBlockingStub stub = InferenceAPIsServiceGrpc.newBlockingStub(channel); String dataJson = "{\"bot_id\":\"721\",\"exp_id\":\"721\",\"session_id\":\"" + sessionId + "\",\"use_rag\":\"true\",\"prompt\":\"你是⼀个资深⽔务领域专家,能回答各种⽔务相关问题\",\"history_dia\":" + JSON.toJSONString(historyDates) + ",\"generate_args\":{\"max_new_tokens\":2048,\"max_length\":4096,\"num_beams\":1,\"do_sample\":true,\"top_p\":0.7,\"temperature\":0.95},\"extra\":{ \"ip_address\": \"" + ipAddr + "\" },\"strengthen\":" + (true) + "}"; log.info("请求大模型的问答参数为{}", dataJson); PredictionsRequest request = PredictionsRequest.newBuilder() .setModelName("slibra_bot") .putInput("method", ByteString.copyFrom("infer_stream", "utf-8"))//推理 .putInput("data", ByteString.copyFrom(dataJson, "utf-8")) .buildPartial(); Iterator predictions = stub.streamPredictions(request); //将结果记录到问答表 while (predictions.hasNext()) { String responseStr = predictions.next().getPrediction().toStringUtf8(); log.info("大模型问答返回的原始结果为{}", responseStr); responseStr = JSON.parseObject(responseStr).getString("message"); if("complete".equals(responseStr)){ log.info("结尾语句并且是非JSON,无需处理"); }else{ sb.append(responseStr); } } //将问答更新到数据库中 chatReq.setSessionId(sessionId); chatReq.setType(1);//0问答 1决策 2本地 3仿真预测 chatReq.setModule(3);//0专家问答 1智能工单 2智能体助手 3告警 4简报 String showVal = this.buildShowValue(xinyiWarningRecord, tXinyiIndustry, normConfig); chatReq.setShowVal(showVal); chatReq.setQuestion(shWarningPrompt); chatReq.setAnswer(sb.toString()); chatReq.setWarningId(String.valueOf(xinyiWarningRecord.getId())); chatReq.setCounts(counts);//问答次数 chatReq.setUserId(WARNING_DEFAULT_CREATE); chatReq.setCreateBy(WARNING_DEFAULT_CREATE); chatReq.setCreateTime(DateUtils.getNowDate()); this.xinyiChatRecordMapper.insertTXinyiChatRecord(chatReq); } catch (IOException e) { throw new RuntimeException(e); } finally { // 关闭输出流 channel.shutdown(); } } private void handleSZWarning(TXinyiIndustry tXinyiIndustry, TXinyiNormConfig normConfig) { //出水相关 //出水COD报警 BigDecimal csCod = tXinyiIndustry.getCsCod(); BigDecimal cscodBzz = normConfig.getCscodBzz(); BigDecimal cscodGkz = normConfig.getCscodGkz(); if(!Objects.isNull(cscodBzz)){ handleXinYiWarningsCs(cscodBzz, csCod, cscodGkz, BusinessEnum.WarningCategoryEnum.CS_COD.getCode(), tXinyiIndustry, normConfig); } //出水总磷超标报警 BigDecimal csTp = tXinyiIndustry.getCsTp(); BigDecimal cszlBzz = normConfig.getCszlBzz(); BigDecimal cszlGkz = normConfig.getCszlGkz(); if(!Objects.isNull(cszlBzz)){ handleXinYiWarningsCs(cszlBzz, csTp, cszlGkz, BusinessEnum.WarningCategoryEnum.CS_ZL.getCode(), tXinyiIndustry, normConfig); } //出水总氮超标报警 BigDecimal csTn = tXinyiIndustry.getCsTn(); BigDecimal cszzBzz = normConfig.getCszzBzz(); BigDecimal cszzGkz = normConfig.getCszzGkz(); if(!Objects.isNull(cszzBzz)){ handleXinYiWarningsCs(cszzBzz, csTn, cszzGkz, BusinessEnum.WarningCategoryEnum.CS_ZD.getCode(), tXinyiIndustry, normConfig); } //出水氨氮超标报警 BigDecimal csNh3 = tXinyiIndustry.getCsNh3(); BigDecimal csadBzz = normConfig.getCsadBzz(); BigDecimal csadGkz = normConfig.getCsadGkz(); if(!Objects.isNull(csadBzz)){ handleXinYiWarningsCs(csadBzz, csNh3, csadGkz, BusinessEnum.WarningCategoryEnum.CS_AD.getCode(), tXinyiIndustry, normConfig); } //出水SS超标报警 BigDecimal csSS = tXinyiIndustry.getCsSs(); BigDecimal csSSBzz = normConfig.getCsssBzz(); BigDecimal csssGkz = normConfig.getCsssGkz(); if(!Objects.isNull(csSSBzz)){ handleXinYiWarningsCs(csSSBzz, csSS, csssGkz, BusinessEnum.WarningCategoryEnum.CS_SS.getCode(), tXinyiIndustry, normConfig); } //进水相关报警 //进水总磷超标报警 BigDecimal jsTp = tXinyiIndustry.getJsTp(); BigDecimal jszlSjz = normConfig.getJszlSjz(); if(!Objects.isNull(jszlSjz)){ handleXinYiWarningRecordJS(jszlSjz, jsTp, BusinessEnum.WarningCategoryEnum.JS_ZL.getCode(), tXinyiIndustry, normConfig); } //进水COD超标报警 BigDecimal jsCod = tXinyiIndustry.getJsCod(); BigDecimal jscodSjz = normConfig.getJscodSjz(); if(!Objects.isNull(jscodSjz)){ handleXinYiWarningRecordJS(jscodSjz, jsCod, BusinessEnum.WarningCategoryEnum.JS_COD.getCode(), tXinyiIndustry, normConfig); } //进水总氮超标报警 BigDecimal jsTn = tXinyiIndustry.getJsTn(); BigDecimal jszdSjz = normConfig.getJszdSjz(); if(!Objects.isNull(jszdSjz)){ handleXinYiWarningRecordJS(jszdSjz, jsTn, BusinessEnum.WarningCategoryEnum.JS_ZD.getCode(), tXinyiIndustry, normConfig); } //进水氨氮超标报警 BigDecimal jsNh3 = tXinyiIndustry.getJsNh3(); BigDecimal jsadSjz = normConfig.getJsadSjz(); if(!Objects.isNull(jsadSjz)){ handleXinYiWarningRecordJS(jsadSjz, jsNh3, BusinessEnum.WarningCategoryEnum.JS_AD.getCode(), tXinyiIndustry, normConfig); } //进水SS超标报警 BigDecimal jsSS = tXinyiIndustry.getJsSs(); BigDecimal jsSSSjz = normConfig.getJsssSjz(); if(!Objects.isNull(jsSSSjz)){ handleXinYiWarningRecordJS(jsSSSjz, jsSS, BusinessEnum.WarningCategoryEnum.JS_SS.getCode(), tXinyiIndustry, normConfig); } } /** * 调用大模型获取决策结果 并同时记录对应信息到聊天记录表中 * * @param tXinyiWarningRecord * @param tXinyiIndustry * @param normConfig */ private void handleDecision(TXinyiWarningRecord tXinyiWarningRecord, TXinyiIndustry tXinyiIndustry, TXinyiNormConfig normConfig) { log.info("进入了调⽤大模型决策接口"); ChatReq chatReq = new ChatReq(); // StringBuilder sb = new StringBuilder(); //大模型结果 放入一个结合中 List resultData = new ArrayList<>(); //决策和问答不一样 没有历史的概念 所以sessionId都是新的 次数都是1 String sessionId = IdUtils.simpleUUID(); String feedback = chatReq.getFeedback(); //决策请求的业务参数 // List> list = this.xinyiIndustryMapper.selectLast10RecordsForDecision(); //2024年5月21日15:23:07 这里不能用关联查询处理,日报要获取最新的一条而不是今日的数据。 List decisionReqs = this.xinyiIndustryMapper.selectLast10RecordsForDecisionOnlyIndustry(); if(!CollectionUtils.isEmpty(decisionReqs)){ for (DecisionReq decisionReq : decisionReqs) { //处理日报数据 TXinyiDaily daily = this.xinyiDailyMapper.selectNewestData(); if(!Objects.isNull(daily)){ decisionReq.setT(daily.getJsSw()); decisionReq.setSVI(daily.getWntjzsAll()); decisionReq.setSV(daily.getWncjbAll()); decisionReq.setMlvss(daily.getHfxwnndAll()); decisionReq.setDoAna(daily.getYycRjyAll()); decisionReq.setDoQue(daily.getQycRjyAll()); //2024年5月26日11:59:02 干污泥量数据做了同步 decisionReq.setGwnl(daily.getGWNL()); BigDecimal jsBod5 = daily.getJsBod5(); decisionReq.setBodIn(jsBod5); BigDecimal tpIn = decisionReq.getTpIn(); BigDecimal tnIn = decisionReq.getTnIn(); if(!Objects.isNull(jsBod5)){ if(!Objects.isNull(tpIn) && tpIn.compareTo(new BigDecimal("0")) >0){ decisionReq.setC(jsBod5.divide(tpIn, 4, RoundingMode.HALF_UP)); } if(!Objects.isNull(tnIn) && tnIn.compareTo(new BigDecimal("0")) >0){ decisionReq.setB(jsBod5.divide(tnIn, 4, RoundingMode.HALF_UP)); } } } } } String rows = JSON.toJSONString(decisionReqs, JSONWriter.Feature.WriteNulls); // 获取输出流 ManagedChannel channel = null; String dataJson = ""; try { channel = ManagedChannelBuilder.forAddress("10.0.0.24", 17070) .usePlaintext() .build(); InferenceAPIsServiceGrpc.InferenceAPIsServiceBlockingStub stub = InferenceAPIsServiceGrpc.newBlockingStub(channel); dataJson = "{\"bot_id\":\"b00001\",\"exp_id\":\"721\",\"norm\":\"" + tXinyiWarningRecord.getCategory() + "\",\"feedback\":" + feedback + ",\"session_id\":" + "\"" + sessionId + "\"" + ",\"generate_args\":{\"max_new_tokens\":1024,\"max_length\":4096,\"num_beams\":1,\"do_sample\":true,\"top_p\":0.7,\"temperature\":0.95},\"extra\":{\"rows\":" + rows + "}}"; log.info("请求大模型的决策的参数为{}", dataJson); PredictionsRequest request = PredictionsRequest.newBuilder() .setModelName("slibra_bot") .putInput("method", ByteString.copyFrom("decision_stream", "utf-8"))//推理 .putInput("data", ByteString.copyFrom(dataJson, "utf-8")) .buildPartial(); Iterator predictions = stub.streamPredictions(request); while (predictions.hasNext()) { String responseStr = predictions.next().getPrediction().toStringUtf8(); log.info("决策流式返回的结果是{}", responseStr); //2024年5月25日16:37:16 按照大模型返回的类型解析数据 String biz = JSON.parseObject(responseStr).getString("biz"); if(BusinessEnum.BigModelBizEnum.OK.getCode().equals(biz)){ log.info("结尾语句并且是非JSON,无需处理"); //结束语句也流式输出,但是并不记录下来 2024年5月24日11:15:23 也不返回前端 /*outputStream.write(responseStr.getBytes()); outputStream.flush();*/ }else if(BusinessEnum.BigModelBizEnum.DECISION_DEBUGGER.getCode().equals(biz)){ log.info("中间过程,目前只打印日志,不记录数据,也不返回给前端,返回数据为{}", responseStr); //结束语句也流式输出,但是并不记录下来 2024年5月24日11:15:23 也不返回前端 /*outputStream.write(responseStr.getBytes()); outputStream.flush();*/ }else{//其他 要么错误 要么alert 要么出的报告 // sb.append(responseStr); resultData.add(responseStr); } } } catch (Exception e) { // throw new RuntimeException(e); log.error("定时任务处理告警调用决策异常,异常信息为{}", JSON.toJSONString(e)); resultData.add("{\"biz\":\"ERROR\",\"message\":\"大模型分析数据异常,请稍后再试\"}"); } finally { log.info("决策最终要保存的数据是{}", JSON.toJSONString(resultData)); //保存聊天记录 //将问答更新到数据库中 chatReq.setSessionId(sessionId); chatReq.setType(1);//0问答 1决策 chatReq.setModule(3); /*String userId = SecurityUtils.getUserId().toString(); String username = SecurityUtils.getUsername();*/ chatReq.setUserId(WARNING_DEFAULT_CREATE); String showVal = this.buildShowValue(tXinyiWarningRecord, tXinyiIndustry, normConfig); chatReq.setShowVal(showVal);//前端展示的数据和提问的数据不一致 chatReq.setQuestion(dataJson); chatReq.setAnswer(JSON.toJSONString(resultData)); chatReq.setWarningId(String.valueOf(tXinyiWarningRecord.getId())); chatReq.setCounts(1);//问答次数 chatReq.setCreateBy(WARNING_DEFAULT_CREATE); chatReq.setCreateTime(DateUtils.getNowDate()); this.xinyiChatRecordMapper.insertTXinyiChatRecord(chatReq); // 关闭输出流 if(!Objects.isNull(channel)) channel.shutdown(); } } private String buildShowValue(TXinyiWarningRecord tXinyiWarningRecord, TXinyiIndustry tXinyiIndustry, TXinyiNormConfig normConfig) { JSONObject result = new JSONObject(); JSONObject basic = new JSONObject(); Integer status = tXinyiWarningRecord.getStatus(); Date warningTime = tXinyiWarningRecord.getTime(); int count = DateUtils.differentHoursByMillisecond(warningTime, DateUtils.getNowDate()) + 1; basic.put("title", tXinyiWarningRecord.getReason()); basic.put("报警时间", DateUtils.parseDateToStr(DateUtils.YYYY_MM_DD_HH_MM ,warningTime)); basic.put("报警值", tXinyiWarningRecord.getWarningVal()); basic.put("标准值", tXinyiWarningRecord.getDesignVal()); basic.put("管控值", tXinyiWarningRecord.getControlVal()); basic.put("报警次数", Math.min(count, MAX_COUNT)); if(tXinyiWarningRecord.getType() != 2) basic.put("状态", status == 0 ? "报警中" : status == 1 ? "用户关闭" : status == 2 ? "系统关闭" : "应急处理中"); else basic.put("状态", status == 0 ? "预警中" : "已完成"); //2024年5月27日14:04:22 额外返回2个字段 [管控值 和 告警级别] 返回的json没有数据是因为value没有值 // basic.put("管控值", tXinyiWarningRecord.getControlVal()); basic.put("告警级别", tXinyiWarningRecord.getLevel()); result.put("basic", basic); JSONObject jsData = getJsonObject(tXinyiIndustry, normConfig);//进水数据 result.put("jsData", jsData); JSONObject csData = getCsonObject(tXinyiIndustry, normConfig);//出水数据 result.put("csData", csData); return JSON.toJSONString(result, JSONWriter.Feature.WriteNulls); } private static JSONObject getJsonObject(TXinyiIndustry tXinyiIndustry, TXinyiNormConfig normConfig) { JSONObject jsData = new JSONObject(); HashMap temp1 = new HashMap<>(); BigDecimal jsSlq = tXinyiIndustry.getJsSlq(); temp1.put("value", jsSlq); temp1.put("exceed", jsSlq.compareTo(normConfig.getJsslSjz()) >0); jsData.put("流量", temp1); HashMap temp2 = new HashMap<>(); BigDecimal jsCod = tXinyiIndustry.getJsCod(); temp2.put("value", jsCod); temp2.put("exceed", jsCod.compareTo(normConfig.getJscodSjz()) > 0); jsData.put("COD", temp2); HashMap temp3 = new HashMap<>(); BigDecimal jsNh3 = tXinyiIndustry.getJsNh3(); temp3.put("value", jsNh3); temp3.put("exceed", jsNh3.compareTo(normConfig.getJsadSjz()) > 0); jsData.put("NH3-N", temp3); HashMap temp4 = new HashMap<>(); BigDecimal jsTp = tXinyiIndustry.getJsTp(); temp4.put("value", jsTp); temp4.put("exceed", jsTp.compareTo(normConfig.getJszlSjz()) > 0); jsData.put("TP", temp4); HashMap temp5 = new HashMap<>(); BigDecimal jsSs = tXinyiIndustry.getJsSs(); temp5.put("value", jsSs); temp5.put("exceed", jsSs.compareTo(normConfig.getJsssSjz()) > 0); jsData.put("SS", temp5); HashMap temp6 = new HashMap<>(); BigDecimal jsTn = tXinyiIndustry.getJsTn(); temp6.put("value", jsTn); temp6.put("exceed", jsTn.compareTo(normConfig.getJszdSjz()) > 0); jsData.put("TN", temp6); return jsData; } private static JSONObject getCsonObject(TXinyiIndustry tXinyiIndustry, TXinyiNormConfig normConfig) { JSONObject csData = new JSONObject(); HashMap temp1 = new HashMap<>(); BigDecimal csSlq = tXinyiIndustry.getCsSlqc(); temp1.put("value", csSlq); temp1.put("exceed", false);//出水水量没有管控值 csData.put("流量", temp1); HashMap temp2 = new HashMap<>(); BigDecimal csCod = tXinyiIndustry.getCsCod(); temp2.put("value", csCod); temp2.put("exceed", csCod.compareTo(normConfig.getCscodGkz()) > 0); csData.put("COD", temp2); HashMap temp3 = new HashMap<>(); BigDecimal csNh3 = tXinyiIndustry.getCsNh3(); temp3.put("value", csNh3); temp3.put("exceed", csNh3.compareTo(normConfig.getCsadGkz()) > 0); csData.put("NH3-N", temp3); HashMap temp4 = new HashMap<>(); BigDecimal csTp = tXinyiIndustry.getCsTp(); temp4.put("value", csTp); temp4.put("exceed", csTp.compareTo(normConfig.getCszlGkz()) > 0); csData.put("TP", temp4); HashMap temp5 = new HashMap<>(); BigDecimal csSs = tXinyiIndustry.getCsSs(); temp5.put("value", csSs); temp5.put("exceed", csSs.compareTo(normConfig.getCsssGkz()) > 0); csData.put("SS", temp5); HashMap temp6 = new HashMap<>(); BigDecimal csTn = tXinyiIndustry.getCsTn(); temp6.put("value", csTn); temp6.put("exceed", csTn.compareTo(normConfig.getCszzGkz()) > 0); csData.put("TN", temp6); return csData; } /** * 通过输入的值 生成对应类型的报警对象(出水) * * @param csBzz * @param currentVal * @param csGkz * @param category * @param tXinyiIndustry * @param normConfig * @return */ private void handleXinYiWarningsCs(BigDecimal csBzz, BigDecimal currentVal, BigDecimal csGkz, String category, TXinyiIndustry tXinyiIndustry, TXinyiNormConfig normConfig) { BigDecimal multiply = csBzz.multiply(new BigDecimal(MyConstants.SCALE_VALUE)); TXinyiWarningRecord tXinyiWarningRecord = new TXinyiWarningRecord(); /*String category = BusinessEnum.WarningCategoryEnum.CS_AD.getCode();*/ tXinyiWarningRecord.setStatus(0); tXinyiWarningRecord.setType(0); tXinyiWarningRecord.setCategory(category); tXinyiWarningRecord.setTime(DateUtils.getNowDate()); tXinyiWarningRecord.setWarningVal(currentVal); tXinyiWarningRecord.setDesignVal(csBzz); tXinyiWarningRecord.setControlVal(csGkz); tXinyiWarningRecord.setCreateBy(WARNING_DEFAULT_CREATE); tXinyiWarningRecord.setCreateTime(DateUtils.getNowDate()); //2024年5月25日17:52:33 如果工业库获取不到数据,也触发报警,但是不调用决策接口 if(Objects.isNull(currentVal)){ tXinyiWarningRecord.setReason(category + EXCEPTION_WARNING); tXinyiWarningRecord.setLevel(WARNING_LEVEL_NO_DATE); }else if(currentVal.compareTo(multiply) > 0){//一级 tXinyiWarningRecord.setReason(category + CHAOBIAO_WARNING); tXinyiWarningRecord.setLevel(WARNING_LEVEL_ONE); }else if(currentVal.compareTo(csBzz) >= 0 && currentVal.compareTo(multiply) <= 0){//二级 tXinyiWarningRecord.setReason(category + CHAOBIAO_WARNING); tXinyiWarningRecord.setLevel(WARNING_LEVEL_TWO); }else if(!Objects.isNull(csGkz) && currentVal.compareTo(csGkz) > 0){ tXinyiWarningRecord.setReason(category + CHAOGUANKONG_WARNING); tXinyiWarningRecord.setLevel(WARNING_LEVEL_THREE); }else{ tXinyiWarningRecord = null;//这种的无需处理 } //当前状态正常 需要查询历史有无正在报警的数据,如果有,将报警状态改完2(系统自动关闭) List tXinyiWarningRecords = this.xinyiWarningRecordMapper.selectTXinyiWarningRecordList(TXinyiWarningRecord.builder().delFlag(0).type(0).category(category).warningStatus(0).build()); if(Objects.isNull(tXinyiWarningRecord)){//数据正常,无告警信息 if(!CollectionUtils.isEmpty(tXinyiWarningRecords)){ log.info( "{}:现在恢复正常,历史报警数据为{}", category,JSON.toJSONString(tXinyiWarningRecords)); for (TXinyiWarningRecord xinyiWarningRecord : tXinyiWarningRecords) { xinyiWarningRecord.setStatus(2); Date nowDate = DateUtils.getNowDate(); xinyiWarningRecord.setOffTime(nowDate); xinyiWarningRecord.setUpdateTime(nowDate); xinyiWarningRecord.setUpdateBy(WARNING_DEFAULT_CREATE); this.xinyiWarningRecordMapper.updateTXinyiWarningRecord(xinyiWarningRecord); } } }else{//有告警信息 if(CollectionUtils.isEmpty(tXinyiWarningRecords)){//之前没有告警记录 //保存到数据库中 this.xinyiWarningRecordMapper.insertTXinyiWarningRecord(tXinyiWarningRecord); if(WARNING_LEVEL_NO_DATE.equals(tXinyiWarningRecord.getLevel())){ //只保存一个普通的问答记录 不需要调用决策信息,但是实时数据还是要记录的 this.addChatRecordByWarning(tXinyiWarningRecord, tXinyiIndustry, normConfig); }else { //继续调用决策 this.handleDecision(tXinyiWarningRecord, tXinyiIndustry, normConfig); } }else{ log.info("{}:之前已经有过告警记录了,且还是继续报警,无需重复添加报警,但是决策仍然要调用", category); for (TXinyiWarningRecord xinyiWarningRecord : tXinyiWarningRecords) {//理论上只有一个的 if(WARNING_LEVEL_NO_DATE.equals(tXinyiWarningRecord.getLevel())){ //只保存一个普通的问答记录 不需要调用决策信息,但是实时数据还是要记录的 this.addChatRecordByWarning(tXinyiWarningRecord, tXinyiIndustry, normConfig); }else { //继续调用决策 this.handleDecision(xinyiWarningRecord, tXinyiIndustry, normConfig); } } } } } /** * 通过输入的值 生成对应类型的报警对象(进水) * * @param jsBzz * @param currentVal * @param category * @param tXinyiIndustry * @param normConfig * @return */ private void handleXinYiWarningRecordJS(BigDecimal jsBzz, BigDecimal currentVal, String category, TXinyiIndustry tXinyiIndustry, TXinyiNormConfig normConfig) { BigDecimal multiply = jsBzz.multiply(new BigDecimal(MyConstants.SCALE_VALUE)); TXinyiWarningRecord tXinyiWarningRecord = new TXinyiWarningRecord(); /*String category = BusinessEnum.WarningCategoryEnum.CS_AD.getCode();*/ tXinyiWarningRecord.setStatus(0); tXinyiWarningRecord.setType(0); tXinyiWarningRecord.setCategory(category); tXinyiWarningRecord.setTime(DateUtils.getNowDate()); tXinyiWarningRecord.setWarningVal(currentVal); tXinyiWarningRecord.setDesignVal(jsBzz); // tXinyiWarningRecord.setControlVal(csGkz); tXinyiWarningRecord.setCreateBy(WARNING_DEFAULT_CREATE); tXinyiWarningRecord.setCreateTime(DateUtils.getNowDate()); //2024年5月25日17:52:33 如果工业库获取不到数据,也触发报警,但是不调用决策接口 if(Objects.isNull(currentVal)){ tXinyiWarningRecord.setReason(category + EXCEPTION_WARNING); tXinyiWarningRecord.setLevel(WARNING_LEVEL_NO_DATE); }else if(currentVal.compareTo(multiply) > 0){//一级 tXinyiWarningRecord.setReason(category + CHAOBIAO_WARNING); tXinyiWarningRecord.setLevel(WARNING_LEVEL_ONE); }else if(currentVal.compareTo(jsBzz) >= 0 && currentVal.compareTo(multiply) <= 0){//二级 tXinyiWarningRecord.setReason(category + CHAOBIAO_WARNING); tXinyiWarningRecord.setLevel(WARNING_LEVEL_TWO); }else{ tXinyiWarningRecord = null;//这种的无需处理 } /*else if(!Objects.isNull(csGkz) && currentVal.compareTo(csGkz) > 0){ tXinyiWarningRecord.setReason(category + CHAOGUANKONG_WARNING); tXinyiWarningRecord.setLevel(WARNING_LEVEL_THREE); }*/ //当前状态正常 需要查询历史有无正在报警的数据,如果有,将报警状态改完2(系统自动关闭) List tXinyiWarningRecords = this.xinyiWarningRecordMapper.selectTXinyiWarningRecordList(TXinyiWarningRecord.builder().delFlag(0).type(0).category(category).status(0).build()); if(Objects.isNull(tXinyiWarningRecord)){//数据正常,无告警信息 if(!CollectionUtils.isEmpty(tXinyiWarningRecords)){ log.info( "{}:现在恢复正常,历史报警数据为{}", category,JSON.toJSONString(tXinyiWarningRecords)); for (TXinyiWarningRecord xinyiWarningRecord : tXinyiWarningRecords) { xinyiWarningRecord.setStatus(2); Date nowDate = DateUtils.getNowDate(); xinyiWarningRecord.setOffTime(nowDate); xinyiWarningRecord.setUpdateTime(nowDate); xinyiWarningRecord.setUpdateBy(WARNING_DEFAULT_CREATE); this.xinyiWarningRecordMapper.updateTXinyiWarningRecord(xinyiWarningRecord); } } }else{//有告警信息 if(CollectionUtils.isEmpty(tXinyiWarningRecords)){//之前没有告警记录 //保存到数据库中 this.xinyiWarningRecordMapper.insertTXinyiWarningRecord(tXinyiWarningRecord); if(WARNING_LEVEL_NO_DATE.equals(tXinyiWarningRecord.getLevel())){ //只保存一个普通的问答记录 不需要调用决策信息,但是实时数据还是要记录的 this.addChatRecordByWarning(tXinyiWarningRecord, tXinyiIndustry, normConfig); }else { //继续调用决策 this.handleDecision(tXinyiWarningRecord, tXinyiIndustry, normConfig); } }else{ log.info("{}:之前已经有过告警记录了,且还是继续报警,无需重复添加报警,但是决策仍然要调用", category); for (TXinyiWarningRecord xinyiWarningRecord : tXinyiWarningRecords) {//理论上只有一个的 if(WARNING_LEVEL_NO_DATE.equals(tXinyiWarningRecord.getLevel())){ //只保存一个普通的问答记录 不需要调用决策信息,但是实时数据还是要记录的 this.addChatRecordByWarning(tXinyiWarningRecord, tXinyiIndustry, normConfig); }else { //继续调用决策 this.handleDecision(xinyiWarningRecord, tXinyiIndustry, normConfig); } } } } } private void addChatRecordByWarning(TXinyiWarningRecord tXinyiWarningRecord, TXinyiIndustry tXinyiIndustry, TXinyiNormConfig normConfig) { ChatReq chatReq = new ChatReq(); //保存聊天记录 //将问答更新到数据库中 chatReq.setSessionId(IdUtils.simpleUUID()); chatReq.setType(1);//0问答 1决策 2本地 3仿真预测 chatReq.setModule(3); /*String userId = SecurityUtils.getUserId().toString(); String username = SecurityUtils.getUsername();*/ chatReq.setUserId(WARNING_DEFAULT_CREATE); String showVal = this.buildShowValue(tXinyiWarningRecord, tXinyiIndustry, normConfig); chatReq.setShowVal(showVal);//前端展示的数据和提问的数据不一致 chatReq.setQuestion(WARNING_DEFAULT_QUESTION);//本地问题 chatReq.setAnswer(tXinyiWarningRecord.getReason() + ",请检查设备是否正常运行"); chatReq.setWarningId(String.valueOf(tXinyiWarningRecord.getId())); chatReq.setCounts(1);//问答次数 chatReq.setCreateBy(WARNING_DEFAULT_CREATE); chatReq.setCreateTime(DateUtils.getNowDate()); this.xinyiChatRecordMapper.insertTXinyiChatRecord(chatReq); } /** * 定时从sqlserver获取数据 */ public void sqlserverData(){ log.info("进入了定时同步SqlServer的任务"); //主库获取上次最新的同步日期 String lastTime = this.xinyiRobotMapper.selectLastTime(); log.info("上次同步的日期是{}", lastTime); //从 DynamicDataSourceContextHolder.setDataSourceType(DataSourceType.SLAVE.name()); List tXinyiRobots = xinyiRobotMapper.selectTXinyiRobotListByTime(lastTime); DynamicDataSourceContextHolder.clearDataSourceType(); // System.out.println(JSON.toJSONString(tXinyiRobots)); // System.out.println("-------------"); //主 if(!CollectionUtils.isEmpty(tXinyiRobots)){ for (TXinyiRobot tXinyiRobot : tXinyiRobots) { String date = handleDate(tXinyiRobot.getVDate().replaceAll(" ", ""));//有空格 String time = handleDate(tXinyiRobot.getVTime().replaceAll(" ", ""));//有空格 tXinyiRobot.setVDate(date); tXinyiRobot.setVTime(time); tXinyiRobot.setVDateTime(date + " " + time); //处理给前端展示的字段 tXinyiRobot.setTestDate(date);//日期 tXinyiRobot.setTestHour(date + " " + time.substring(0, 2));//小时 tXinyiRobot.setTestTime(date + " " + time.substring(0, 5));//分钟 tXinyiRobot.setCreatedTime(new Date()); //2024年5月29日10:33:32 额外处理几个新增的字段 多个池子数据合并一个 List extraList = new ArrayList<>(); extraList.add(tXinyiRobot.getNo3Hlj1Jqr()); extraList.add(tXinyiRobot.getNo3Hlj2Jqr()); tXinyiRobot.setHycxsyAll(JSON.toJSONString(extraList)); extraList.clear(); extraList.add(tXinyiRobot.getNh31Jqr()); extraList.add(tXinyiRobot.getNh32Jqr()); tXinyiRobot.setQyanAll(JSON.toJSONString(extraList)); extraList.clear(); extraList.add(tXinyiRobot.getNo3Qyc1Jqr()); extraList.add(tXinyiRobot.getNo3Qyc2Jqr()); tXinyiRobot.setQyckxsyAll(JSON.toJSONString(extraList)); extraList.clear(); extraList.add(tXinyiRobot.getTpHl1Jqr()); extraList.add(tXinyiRobot.getTpHl2Jqr()); tXinyiRobot.setHyzlsyAll(JSON.toJSONString(extraList)); this.xinyiRobotMapper.insertTXinyiRobot(tXinyiRobot); } } } /** * * 定时生成每日简报数据 * */ public void generageShortReport(){ log.info("进入了定时生成每日简报数据"); List dailyTwoRecords = this.xinyiDailyMapper.selectNRecords(2); //正常不会有这种问题 因为日报有很多条 if(CollectionUtils.isEmpty(dailyTwoRecords) || dailyTwoRecords.size() < 2){ log.error("进入了定时生成每日简报数据 获取最新的2条数据不足,终止"); return; } //暂时不考虑因为没有填写日报 导致生成重复数据的问题(后续需要的话再添加) //处理数据 并 拼装 String queryData = buildShortReportQueryData(dailyTwoRecords); log.info("定时生成简报,组装好的请求大模型的参数为:{}", queryData); if(StringUtils.isBlank(queryData)){ log.error("无法拼装请求数据!!!!!!"); return; } String showVal = formateDateStr(dailyTwoRecords.get(0).getTestDate()) + JIAN_BAO_END; //调用模型 并保存结果 this.askBigModel(queryData, showVal); log.info("定时生成简报任务结束~~~~~~~~~~~~~~"); } /** * * 2022/01/01 转成2022年01月01日 数据 * @param testDate * @return */ private String formateDateStr(String testDate) { if(StringUtils.isBlank(testDate)) return ""; if(!testDate.contains("/")) return testDate; String[] split = testDate.split("/"); return split[0] + "年" + split[1] + "月" + split[2] + "日"; } private void askBigModel(String question, String showVal) { log.info("进入了后台接口调⽤⼤模型获取问答结果处理"); StringBuilder sb = new StringBuilder(); String sessionId = IdUtils.simpleUUID(); ChatReq chatReq = new ChatReq(); // String ipAddr = IpUtils.getIpAddr();//获取用户的ip地址 传给大模型 String ipAddr = "";//获取用户的ip地址 传给大模型 定时任务获取不到ip地址 int counts = 1;//默认是第一次 //这种问答 没有历史问答的概念 直接把问题扔进去就行 无需查询历史记录 List historyDates = new ArrayList<>(); historyDates.add(question); // 获取输出流 ManagedChannel channel = null; try { channel = ManagedChannelBuilder.forAddress("10.0.0.24", 17070) .usePlaintext() .build(); InferenceAPIsServiceGrpc.InferenceAPIsServiceBlockingStub stub = InferenceAPIsServiceGrpc.newBlockingStub(channel); String dataJson = "{\"bot_id\":\"721\",\"exp_id\":\"721\",\"session_id\":\"" + sessionId + "\",\"use_rag\":\"true\",\"prompt\":\"你是⼀个资深⽔务领域专家,能回答各种⽔务相关问题\",\"history_dia\":" + JSON.toJSONString(historyDates) + ",\"generate_args\":{\"max_new_tokens\":2048,\"max_length\":4096,\"num_beams\":1,\"do_sample\":true,\"top_p\":0.7,\"temperature\":0.95},\"extra\":{ \"ip_address\": \"" + ipAddr + "\" },\"strengthen\":" + (true) + "}"; log.info("请求大模型的问答参数为{}", dataJson); PredictionsRequest request = PredictionsRequest.newBuilder() .setModelName("slibra_bot") .putInput("method", ByteString.copyFrom("infer_stream", "utf-8"))//推理 .putInput("data", ByteString.copyFrom(dataJson, "utf-8")) .buildPartial(); Iterator predictions = stub.streamPredictions(request); //将结果记录到问答表 while (predictions.hasNext()) { String responseStr = predictions.next().getPrediction().toStringUtf8(); log.info("大模型问答返回的原始结果为{}", responseStr); responseStr = JSON.parseObject(responseStr).getString("message"); if("complete".equals(responseStr)){ System.out.println("结尾语句并且是非JSON,无需处理"); //结束语句也流式输出,但是并不记录下来 2024年5月24日11:15:23 也不返回前端 /*outputStream.write(responseStr.getBytes()); outputStream.flush();*/ }else{ sb.append(responseStr); } } //将问答更新到数据库中 chatReq.setSessionId(sessionId); chatReq.setType(2);//0问答 1决策 2本地 3仿真预测 chatReq.setModule(4);//0专家问答 1智能工单 2智能体助手 3告警 4简报 chatReq.setShowVal(showVal); chatReq.setQuestion(question); chatReq.setAnswer(sb.toString()); chatReq.setCounts(counts);//问答次数 chatReq.setUserId(WARNING_DEFAULT_CREATE); chatReq.setCreateBy(WARNING_DEFAULT_CREATE); chatReq.setCreateTime(DateUtils.getNowDate()); this.xinyiChatRecordMapper.insertTXinyiChatRecord(chatReq); } catch (IOException e) { throw new RuntimeException(e); } finally { // 关闭输出流 channel.shutdown(); } } private String buildShortReportQueryData(List dailyTwoRecords) { //查询配置信息 List tXinyiNormConfigs = this.xinyiNormConfigMapper.selectTXinyiNormConfigList(null); if(CollectionUtils.isEmpty(tXinyiNormConfigs)) return null; TXinyiNormConfig normConfig = tXinyiNormConfigs.get(0); //获取数据 TXinyiDaily yesterdayData = dailyTwoRecords.get(0); TXinyiDaily beforeYesterdayData = dailyTwoRecords.get(1); String originStr = JIAN_BAO_PROMPT; String yesterdayStr = yesterdayData.getTestDate().substring(5).replace("/", "月") + "日"; String beforeYesterdayStr = beforeYesterdayData.getTestDate().substring(5).replace("/", "月") + "日"; originStr = originStr.replace("#{0}", yesterdayStr); originStr = originStr.replace("#{1}", beforeYesterdayStr); originStr = originStr.replace("#{2}", String.valueOf(yesterdayData.getJsCod())); originStr = originStr.replace("#{3}", String.valueOf(yesterdayData.getJsTn())); originStr = originStr.replace("#{4}", String.valueOf(yesterdayData.getJsTp())); originStr = originStr.replace("#{5}", String.valueOf(yesterdayData.getJsNh3())); originStr = originStr.replace("#{6}", String.valueOf(yesterdayData.getJsSs())); originStr = originStr.replace("#{7}", String.valueOf(yesterdayData.getJSL())); originStr = originStr.replace("#{8}", String.valueOf(yesterdayData.getCsCod())); originStr = originStr.replace("#{9}", String.valueOf(yesterdayData.getCsTn())); originStr = originStr.replace("#{10}", String.valueOf(yesterdayData.getCsTp())); originStr = originStr.replace("#{11}", String.valueOf(yesterdayData.getCsNh3())); originStr = originStr.replace("#{12}", String.valueOf(yesterdayData.getCsSs())); originStr = originStr.replace("#{13}", String.valueOf(yesterdayData.getCSL())); originStr = originStr.replace("#{14}", String.valueOf(beforeYesterdayData.getJsCod())); originStr = originStr.replace("#{15}", String.valueOf(beforeYesterdayData.getJsTn())); originStr = originStr.replace("#{16}", String.valueOf(beforeYesterdayData.getJsTp())); originStr = originStr.replace("#{17}", String.valueOf(beforeYesterdayData.getJsNh3())); originStr = originStr.replace("#{18}", String.valueOf(beforeYesterdayData.getJsSs())); originStr = originStr.replace("#{19}", String.valueOf(beforeYesterdayData.getJSL())); originStr = originStr.replace("#{20}", String.valueOf(beforeYesterdayData.getCsCod())); originStr = originStr.replace("#{21}", String.valueOf(beforeYesterdayData.getCsTn())); originStr = originStr.replace("#{22}", String.valueOf(beforeYesterdayData.getCsTp())); originStr = originStr.replace("#{23}", String.valueOf(beforeYesterdayData.getCsNh3())); originStr = originStr.replace("#{24}", String.valueOf(beforeYesterdayData.getCsSs())); originStr = originStr.replace("#{25}", String.valueOf(beforeYesterdayData.getCSL())); originStr = originStr.replace("#{26}", String.valueOf(normConfig.getJscodSjz())); originStr = originStr.replace("#{27}", String.valueOf(normConfig.getJszdSjz())); originStr = originStr.replace("#{28}", String.valueOf(normConfig.getJszlSjz())); originStr = originStr.replace("#{29}", String.valueOf(normConfig.getJsadSjz())); originStr = originStr.replace("#{30}", String.valueOf(normConfig.getJsssSjz())); originStr = originStr.replace("#{31}", String.valueOf(normConfig.getCscodBzz())); originStr = originStr.replace("#{32}", String.valueOf(normConfig.getCszzBzz())); originStr = originStr.replace("#{33}", String.valueOf(normConfig.getCszlBzz())); originStr = originStr.replace("#{34}", String.valueOf(normConfig.getCsadBzz())); originStr = originStr.replace("#{35}", String.valueOf(normConfig.getCsssBzz())); return originStr; } public static String handleDate(String str){ StringBuilder sb = new StringBuilder(); if(str.contains(" ")){//包含空格 就是年月日时分秒了 String[] split = str.split(" "); addBeforeZero(sb, split[0], "/"); sb.append(" "); addBeforeZero(sb, split[1], ":"); }else{ if(str.contains("/")){//年月日 addBeforeZero(sb, str, "/"); }else if(str.contains(":")){//时分秒 addBeforeZero(sb, str, ":"); }else { sb.append(str); } } return sb.toString(); } public static StringBuilder addBeforeZero(StringBuilder sb, String str, String tag){ String[] split = str.split(tag); int length = split.length; for (int i = 0; i < length; i++) { String value = split[i]; Integer intValue = Integer.parseInt(value); if(intValue < 10 && value.length() == 1){////防止有正确的情况 额外再补充字符串 sb.append(0).append(value); }else{ sb.append(value); } if(i < length-1){ sb.append(tag); } } return sb; } public static void main(String[] args) { /*LocalDateTime endTime = LocalDateTime.now(); System.out.println("endTime = " + endTime); endTime = endTime.plusMinutes(60); System.out.println("endTime = " + endTime);*/ /*String str = "2024/04/15 09:55"; System.out.println(str); System.out.println(str.substring(0,10)); System.out.println(str.substring(0,13));*/ /*String str = "2024/04/18 08:00"; str = str + ":00"; System.out.println(str); LocalDateTime startTime = LocalDateTime.parse(str.replaceAll("/", "-").replace(" ", "T")); System.out.println(startTime.plusMinutes(1L).format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));*/ /*ArrayList objects = new ArrayList<>(); objects.add(1); objects.add(2); objects.add(3); System.out.println(objects); objects.clear(); System.out.println(objects); // test(); Date date = new Date(); System.out.println(DateUtils.differentHoursByMillisecond(date, date)); HashMap map = new HashMap<>(); map.put("a", null); map.put("b", "ab"); map.put("c", ""); map.put("d", '1'); System.out.println(JSON.toJSONString(map, JSONWriter.Feature.WriteNulls)); TXinyiIndustry tXinyiIndustry = new TXinyiIndustry(); System.out.println(JSON.toJSONString(tXinyiIndustry, JSONWriter.Feature.WriteNulls));*/ String s = "2022/01/01".substring(5).replace("/", "月") + "日"; System.out.println("s = " + s); System.out.println(new BigDecimal("1").compareTo(null));//空指针 要判断 } //测试工业库 没小时保存一条记录是否可行 public static void test() { // 每个小时的时间格式 DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); LocalDateTime startTime = LocalDateTime.parse("2024-05-18T00:00:00"); LocalDateTime endTime = LocalDateTime.now(); // 循环按小时分割 LocalDateTime currentHour = startTime; //最终获取的数据 Map needMap = new LinkedHashMap<>(); while (currentHour.isBefore(endTime)) { String begin = currentHour.format(formatter); String end = currentHour.plusMinutes(5).format(formatter); // 输出当前小时的起始时间和结束时间 System.out.println("起始时间:" + begin); System.out.println("结束时间:" + end); // 当前小时加一小时,作为下一个小时的起始时间 currentHour = currentHour.plusMinutes(5); //每个小时查询一次数据 String url = "http://10.0.0.27:4568/api/v1/khquerydata"; HashMap req = new HashMap<>(); req.put("tagNames", queryTags); req.put("startTime", begin); req.put("endTime", end); req.put("recordNumbers", 100000); String body = HttpRequest.post(url).header("Authorization", "c2E6c2E=").header("clientName", "hongshan").body(JSON.toJSONString(req)).execute().body(); // System.out.println("body = " + body); List> list = new ArrayList<>(); //行转列数据处理 for (String queryTag : queryTags) { JSONArray array = JSON.parseObject(body).getJSONArray(queryTag); //特殊数据处理一 if(Objects.isNull(array) || array.isEmpty()){ System.out.println(queryTag + "查询到了空的数据,跳过本次循环"); continue; } int size = array.size(); //特殊数据处理二 if("0".equals(array.get(1) + "")){ System.out.println(queryTag + "查询到了数据,但是数据集合只有一条,且都是0"); continue; } //结合至少62个数据才满足条件(有可能获取不到) /*if(size < 62){ System.out.println(queryTag + "查询到了不符合条件的数据,跳过本次循环"); continue; }*/ //存放的数据集 //利用map去重 HashMap map = new LinkedHashMap<>(); for (int i = 2; i < size; i++) { // System.out.println(i + "" + array.get(i)); JSONArray oneRecord = JSON.parseArray(JSON.toJSONString(array.get(i))); //处理为空或者为0的数据 Object timeStampValue = oneRecord.get(2); if(Objects.isNull(timeStampValue) || "0".equals(timeStampValue + "")) continue; BigDecimal value = Objects.isNull(oneRecord.get(0)) ? null : new BigDecimal(oneRecord.get(0) + ""); long timestamp = (long) timeStampValue; String format = DateUtil.format(new Date(timestamp), DateUtils.YYYYMMDDHH_TS); map.put(format, queryTag + "-" + value); } list.add(map); } Set recordTimeSet = new HashSet<>(); Map recordMap = new HashMap<>(); for (int i = 0; i < list.size(); i++) { HashMap map = list.get(i); int finalJ = i; map.forEach((k, v) ->{ TXinyiIndustry industry = null; if(!recordTimeSet.contains(k)){//第一次 industry = new TXinyiIndustry(); recordTimeSet.add(k); recordMap.put(k, industry); }else{ industry = recordMap.get(k); } industry.setTestTime(k + ":00"); //2024年4月15日11:19:52 额外增加2个字段 industry.setTestDate(k.substring(0,10)); industry.setTestHour(k.substring(0,13)); //解析值 String[] split = v.split("-"); String type = split[0]; BigDecimal value = new BigDecimal(split[1]); if ("信义污水厂JS_COD_Value".equals(type)) { industry.setJsCod(value); } else if ("信义污水厂JS_PH_Value".equals(type)) { industry.setJsPh(value); } else if ("信义污水厂JS_SS_Value".equals(type)) { industry.setJsSs(value); } else if ("信义污水厂JS_ZL_Value".equals(type)) { industry.setJsTp(value); } else if ("信义污水厂JS_ZA_Value".equals(type)) { industry.setJsTn(value); } else if ("信义污水厂JS_AD_Value".equals(type)) { industry.setJsNh3(value); } else if ("信义污水厂JS_T_Value".equals(type)) { industry.setJsSwPh(value); } else if ("信义污水厂进水泵房液位".equals(type)) { industry.setJsBfyw(value); } else if ("信义污水厂出水瞬时流量".equals(type)) { industry.setCsSlqc(value); } else if ("信义污水厂升级出水COD".equals(type)) { industry.setCsCod(value); } else if ("信义污水厂升级出水PH".equals(type)) { industry.setCsPh(value); } else if ("信义污水厂升级出水SS".equals(type)) { industry.setCsSs(value); } else if ("信义污水厂升级出水TN".equals(type)) { industry.setCsTn(value); } else if ("信义污水厂升级出水TP".equals(type)) { industry.setCsTp(value); } else if ("信义污水厂升级出水氨氮".equals(type)) { industry.setCsNh3(value); } else if ("信义污水厂AIT202_Value".equals(type)) { industry.setOneHyzdDo(value); } else if ("信义污水厂AIT203_Value".equals(type)) { industry.setOneHymdDo(value); } else if ("信义污水厂AIT207_Value".equals(type)) { industry.setTwoHyzdDo(value); } else if ("信义污水厂AIT206_Value".equals(type)) { industry.setTwoHymdDo(value); } else if ("信义污水厂AIT209_Value".equals(type)) { industry.setOneMlss(value); } else if ("信义污水厂AIT210_Value".equals(type)) { industry.setTwoMlss(value); } else if ("信义污水厂进水TDS".equals(type)) { industry.setJsTds(value); } else if ("信义污水厂FT101_Value".equals(type)) { industry.setJsSlq(value); } else if ("信义污水厂SWCHHYHLB1_R_Value".equals(type)) { industry.setNHlbOneGp(value); } else if ("信义污水厂SWCHHYHLB2_R_Value".equals(type)) { industry.setNHlbTwoGp(value); } else if ("信义污水厂SWCHHYHLB3_R_Value".equals(type)) { industry.setNHlbThreeGp(value); } else if ("信义污水厂SWCHHYHLB4_R_Value".equals(type)) { industry.setNHlbFourGp(value); } else if ("信义污水厂SWCHHYHLB5_R_Value".equals(type)) { industry.setNhlBFiveGp(value); } else if ("信义污水厂SWCHHYHLB6_R_Value".equals(type)) { industry.setNHlbSixGp(value); } else if ("信义污水厂SWCWNHLB1_R_Value".equals(type)) { industry.setWHlbOneGp(value); } else if ("信义污水厂SWCWNHLB2_R_Value".equals(type)) { industry.setWHlbTwoGp(value); } else if ("信义污水厂SWCWNHLB3_R_Value".equals(type)) { industry.setWHlbThreeGp(value); } else if ("信义污水厂SWCWNHLB4_R_Value".equals(type)) { industry.setWHlbFourGp(value); } else if ("信义污水厂SWCWNHLB5_R_Value".equals(type)) { industry.setWHlbFiveGp(value); } else if ("信义污水厂GFJ1_R_Value".equals(type)) { industry.setFjOne(value); } else if ("信义污水厂GFJ2_R_Value".equals(type)) { industry.setFjTwo(value); } else if ("信义污水厂GFJ3_R_Value".equals(type)) { industry.setFjThree(value); } else if ("信义污水厂GFJ4_R_Value".equals(type)) { industry.setFjFour(value); } else if ("信义污水厂GFJ5_R_Value".equals(type)) { industry.setFjFive(value); } else if ("信义污水厂GFJ6_R_Value".equals(type)) { industry.setFjSix(value); } else if ("信义污水厂GFJ1_KQLL_Value".equals(type)) { industry.setKqllOne(value); } else if ("信义污水厂GFJ2_KQLL_Value".equals(type)) { industry.setKqllTwo(value); } else if ("信义污水厂GFJ3_KQLL_Value".equals(type)) { industry.setKqllThree(value); } else if ("信义污水厂GFJ4_KQLL_Value".equals(type)) { industry.setKqllFour(value); } else if ("信义污水厂GFJ5_KQLL_Value".equals(type)) { industry.setKqllFive(value); } else if ("信义污水厂GFJ6_KQLL_Value".equals(type)) { industry.setKqllSix(value); }else if ("信义污水厂实际碳源加药量".equals(type)) { industry.setSJTYJLY(value); }else if ("信义污水厂除磷加药瞬时流量".equals(type)) { industry.setCLJYSSLL(value); } else if ("信义污水厂_除磷P04预测值_".equals(type)) { industry.setCLP04YCZ(value); } //只有最后一次才执行数据库添加 if(finalJ == list.size()-1){ needMap.put(industry.getTestHour(), industry); } }); } } //保存数据 触发告警 决策 问答记录等等 needMap.forEach((k, industry) ->{ System.out.println(JSON.toJSONString(industry)); }); } }