|
@@ -15,7 +15,6 @@ import com.slibra.common.constant.MyConstants;
|
|
|
import com.slibra.common.enums.BusinessEnum;
|
|
|
import com.slibra.common.enums.DataSourceType;
|
|
|
import com.slibra.common.utils.DateUtils;
|
|
|
-import com.slibra.common.utils.ip.IpUtils;
|
|
|
import com.slibra.common.utils.uuid.IdUtils;
|
|
|
import com.slibra.framework.datasource.DynamicDataSourceContextHolder;
|
|
|
import inference.InferenceAPIsServiceGrpc;
|
|
@@ -35,7 +34,6 @@ import java.math.RoundingMode;
|
|
|
import java.time.LocalDateTime;
|
|
|
import java.time.format.DateTimeFormatter;
|
|
|
import java.util.*;
|
|
|
-import java.util.concurrent.TimeUnit;
|
|
|
|
|
|
import static com.slibra.common.constant.MyConstants.*;
|
|
|
|
|
@@ -90,6 +88,9 @@ public class RyTask
|
|
|
// public final static StopWatch watch = new StopWatch("task");
|
|
|
public static final String[] queryTags = {"信义污水厂JS_COD_Value","信义污水厂JS_PH_Value","信义污水厂JS_SS_Value","信义污水厂JS_ZL_Value","信义污水厂JS_ZA_Value","信义污水厂JS_AD_Value","信义污水厂JS_T_Value","信义污水厂进水泵房液位","信义污水厂出水瞬时流量","信义污水厂升级出水COD","信义污水厂升级出水PH","信义污水厂升级出水SS","信义污水厂升级出水TN","信义污水厂升级出水TP","信义污水厂升级出水氨氮","信义污水厂AIT202_Value","信义污水厂AIT203_Value","信义污水厂AIT207_Value","信义污水厂AIT206_Value","信义污水厂AIT209_Value","信义污水厂AIT210_Value","信义污水厂进水TDS","信义污水厂FT101_Value","信义污水厂SWCHHYHLB1_R_Value","信义污水厂SWCHHYHLB2_R_Value","信义污水厂SWCHHYHLB3_R_Value","信义污水厂SWCHHYHLB4_R_Value","信义污水厂SWCHHYHLB5_R_Value","信义污水厂SWCHHYHLB6_R_Value","信义污水厂SWCWNHLB1_R_Value","信义污水厂SWCWNHLB2_R_Value","信义污水厂SWCWNHLB3_R_Value","信义污水厂SWCWNHLB4_R_Value","信义污水厂SWCWNHLB5_R_Value","信义污水厂GFJ1_R_Value","信义污水厂GFJ2_R_Value","信义污水厂GFJ3_R_Value","信义污水厂GFJ4_R_Value","信义污水厂GFJ5_R_Value","信义污水厂GFJ6_R_Value","信义污水厂GFJ1_KQLL_Value","信义污水厂GFJ2_KQLL_Value","信义污水厂GFJ3_KQLL_Value","信义污水厂GFJ4_KQLL_Value","信义污水厂GFJ5_KQLL_Value","信义污水厂GFJ6_KQLL_Value","信义污水厂实际碳源加药量","信义污水厂除磷加药瞬时流量", "信义污水厂_除磷P04预测值_"};
|
|
|
|
|
|
+
|
|
|
+ public static final String[] predictorArr = {"出水COD", "出水SS", "出水总氮", "出水总磷", "出水氨氮", "xsy1", "xsy2"};
|
|
|
+ public static final String[] predictorArrStr = {"出水COD", "出水SS", "出水总氮", "出水总磷", "出水氨氮", "硝酸盐#1", "硝酸盐#2"};
|
|
|
/**
|
|
|
* 定时从工业库获取数据
|
|
|
*
|
|
@@ -1170,6 +1171,243 @@ public class RyTask
|
|
|
log.info("定时生成简报任务结束~~~~~~~~~~~~~~");
|
|
|
}
|
|
|
|
|
|
+
|
|
|
+ /**
|
|
|
+ *
|
|
|
+ * 每小时发一一次预测
|
|
|
+ *
|
|
|
+ */
|
|
|
+ public void predictor(){
|
|
|
+ for (int i = 0; i < predictorArr.length; i++) {
|
|
|
+ String result = testPredictor(predictorArr[i]);
|
|
|
+ JSONObject jsonObject = JSON.parseObject(result);
|
|
|
+ String task = jsonObject.getString("task");
|
|
|
+ String hour = jsonObject.getString("hour");
|
|
|
+ String pred = jsonObject.getString("pred");
|
|
|
+ if(StringUtils.isNotBlank(pred) && pred.contains(",")){
|
|
|
+ String[] split = pred.split(",");
|
|
|
+ if(split.length != 3){
|
|
|
+ log.error("预测数据返回结果为{},长度不是3,无法正常解析", result);
|
|
|
+ //因为部分预测还不支持 所以不需要
|
|
|
+// handleXinYiWarningsYC(cscodBzz, split, cscodGkz, BusinessEnum.WarningCategoryEnum.CS_COD_YC.getCode(), hour, normConfig);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ //解析数据 处理报警 调研prompt 保存等
|
|
|
+ this.handlePredictorWarning(split, hour, task);
|
|
|
+ }else {
|
|
|
+ log.error("预测数据返回结果为{},无法正常解析", result);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void handlePredictorWarning(String[] split, String hour, String task) {
|
|
|
+ //获取配置表
|
|
|
+ List<TXinyiNormConfig> tXinyiNormConfigs = this.xinyiNormConfigMapper.selectTXinyiNormConfigList(null);
|
|
|
+ if(CollectionUtils.isEmpty(tXinyiNormConfigs)) {
|
|
|
+ log.error( "未查询到配置信息");
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ TXinyiNormConfig normConfig = tXinyiNormConfigs.get(0);
|
|
|
+ //获取最新的工业库的数据
|
|
|
+ TXinyiIndustry tXinyiIndustry = this.xinyiIndustryMapper.selectTXinyiIndustryNewest();
|
|
|
+ if("cod".equals(task)){
|
|
|
+ BigDecimal cscodBzz = normConfig.getCscodBzz();
|
|
|
+ BigDecimal cscodGkz = normConfig.getCscodGkz();
|
|
|
+ handleXinYiWarningsYC(cscodBzz, split, cscodGkz, BusinessEnum.WarningCategoryEnum.CS_COD_YC.getCode(), hour, normConfig,tXinyiIndustry.getCsCod(), tXinyiIndustry);
|
|
|
+ }else if("ss".equals(task)){
|
|
|
+
|
|
|
+ }else if("tn".equals(task)){
|
|
|
+
|
|
|
+ }else if("tp".equals(task)){
|
|
|
+
|
|
|
+ }else if("nh3".equals(task)){
|
|
|
+
|
|
|
+ }else if("xsy1".equals(task)){
|
|
|
+
|
|
|
+ }else if("xsy2".equals(task)){
|
|
|
+
|
|
|
+ }else {
|
|
|
+ log.error("暂未支持的类型{}", task);
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ private void handleXinYiWarningsYC(BigDecimal csBzz, String[] split, BigDecimal csGkz, String category, String hour, TXinyiNormConfig normConfig, BigDecimal currentVal, TXinyiIndustry tXinyiIndustry) {
|
|
|
+ BigDecimal multiply = csBzz.multiply(new BigDecimal(MyConstants.SCALE_VALUE));
|
|
|
+ TXinyiWarningRecord tXinyiWarningRecord = null;
|
|
|
+ for (String forecast : split) {
|
|
|
+ BigDecimal forecastVal = new BigDecimal(forecast);
|
|
|
+ tXinyiWarningRecord = new TXinyiWarningRecord();
|
|
|
+ /*String category = BusinessEnum.WarningCategoryEnum.CS_AD.getCode();*/
|
|
|
+ tXinyiWarningRecord.setStatus(0);
|
|
|
+ tXinyiWarningRecord.setType(2);
|
|
|
+ tXinyiWarningRecord.setCategory(category);
|
|
|
+ tXinyiWarningRecord.setTime(DateUtils.getNowDate());
|
|
|
+ tXinyiWarningRecord.setWarningVal(currentVal);//当前值
|
|
|
+ tXinyiWarningRecord.setForecastVal(forecastVal);
|
|
|
+ tXinyiWarningRecord.setDesignVal(csBzz);
|
|
|
+ tXinyiWarningRecord.setControlVal(csGkz);
|
|
|
+ tXinyiWarningRecord.setCreateBy(WARNING_DEFAULT_CREATE);
|
|
|
+ tXinyiWarningRecord.setCreateTime(DateUtils.getNowDate());
|
|
|
+ tXinyiWarningRecord.setRemark("0");
|
|
|
+
|
|
|
+ //2024年5月25日17:52:33 如果工业库获取不到数据,也触发报警,但是不调用决策接口
|
|
|
+ if(Objects.isNull(forecastVal)){
|
|
|
+ tXinyiWarningRecord.setReason(category + EXCEPTION_WARNING_YC);
|
|
|
+ tXinyiWarningRecord.setLevel(WARNING_LEVEL_NO_DATE);
|
|
|
+ }else if(forecastVal.compareTo(multiply) > 0){//一级
|
|
|
+ tXinyiWarningRecord.setReason(category + CHAOBIAO_WARNING_YC);
|
|
|
+ tXinyiWarningRecord.setLevel(WARNING_LEVEL_ONE);
|
|
|
+ }else if(forecastVal.compareTo(csBzz) >= 0 && forecastVal.compareTo(multiply) <= 0){//二级
|
|
|
+ tXinyiWarningRecord.setReason(category + CHAOBIAO_WARNING_YC);
|
|
|
+ tXinyiWarningRecord.setLevel(WARNING_LEVEL_TWO);
|
|
|
+ }else if(!Objects.isNull(csGkz) && forecastVal.compareTo(csGkz) > 0){
|
|
|
+ tXinyiWarningRecord.setReason(category + CHAOGUANKONG_WARNING_YC);
|
|
|
+ tXinyiWarningRecord.setLevel(WARNING_LEVEL_THREE);
|
|
|
+ }else{
|
|
|
+ tXinyiWarningRecord = null;//这种的无需处理
|
|
|
+ }
|
|
|
+ if(!Objects.isNull(tXinyiWarningRecord)){
|
|
|
+ log.info("预测报警中出现了超标的情况,循环可以退出了");
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ //当前状态正常 需要查询历史有无正在报警的数据,如果有,将报警状态改完2(系统自动关闭)
|
|
|
+ List<TXinyiWarningRecord> tXinyiWarningRecords = this.xinyiWarningRecordMapper.selectTXinyiWarningRecordList(TXinyiWarningRecord.builder().delFlag(0).type(2).category(category).warningStatus(0).build());
|
|
|
+ if(Objects.isNull(tXinyiWarningRecord)){//数据正常,无告警信息
|
|
|
+ if(!CollectionUtils.isEmpty(tXinyiWarningRecords)){
|
|
|
+ log.info( "{}:现在恢复正常,历史报警数据为{}", category,JSON.toJSONString(tXinyiWarningRecords));
|
|
|
+ for (TXinyiWarningRecord xinyiWarningRecord : tXinyiWarningRecords) {
|
|
|
+ xinyiWarningRecord.setStatus(2);
|
|
|
+ Date nowDate = DateUtils.getNowDate();
|
|
|
+ xinyiWarningRecord.setOffTime(nowDate);
|
|
|
+ xinyiWarningRecord.setUpdateTime(nowDate);
|
|
|
+ xinyiWarningRecord.setUpdateBy(WARNING_DEFAULT_CREATE);
|
|
|
+ this.xinyiWarningRecordMapper.updateTXinyiWarningRecord(xinyiWarningRecord);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }else{//有告警信息
|
|
|
+ if(CollectionUtils.isEmpty(tXinyiWarningRecords)){//之前没有告警记录
|
|
|
+ //保存到数据库中
|
|
|
+ this.xinyiWarningRecordMapper.insertTXinyiWarningRecord(tXinyiWarningRecord);
|
|
|
+ if(WARNING_LEVEL_NO_DATE.equals(tXinyiWarningRecord.getLevel())){
|
|
|
+ //只保存一个普通的问答记录 不需要调用决策信息,但是实时数据还是要记录的
|
|
|
+ this.addChatRecordByWarning(tXinyiWarningRecord, tXinyiIndustry, normConfig);
|
|
|
+ }else {
|
|
|
+ //继续调用大模型prompt
|
|
|
+ this.askBigModelForYC(tXinyiWarningRecord, tXinyiIndustry, normConfig);
|
|
|
+ }
|
|
|
+ }else{
|
|
|
+ log.info("{}:之前已经有过告警记录了,且还是继续报警,无需重复添加报警,但是决策仍然要调用", category);
|
|
|
+ for (TXinyiWarningRecord xinyiWarningRecord : tXinyiWarningRecords) {//理论上只有一个的
|
|
|
+ if(WARNING_LEVEL_NO_DATE.equals(tXinyiWarningRecord.getLevel())){
|
|
|
+ //只保存一个普通的问答记录 不需要调用决策信息,但是实时数据还是要记录的
|
|
|
+ this.addChatRecordByWarning(tXinyiWarningRecord, tXinyiIndustry, normConfig);
|
|
|
+ }else {
|
|
|
+ //继续调用决策
|
|
|
+ this.askBigModelForYC(xinyiWarningRecord, tXinyiIndustry, normConfig);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void askBigModelForYC(TXinyiWarningRecord xinyiWarningRecord, TXinyiIndustry tXinyiIndustry, TXinyiNormConfig normConfig) {
|
|
|
+ log.info("预测进入了后台接口调⽤⼤模型获取问答结果处理(预测)");
|
|
|
+ StringBuilder sb = new StringBuilder();
|
|
|
+ String sessionId = IdUtils.simpleUUID();
|
|
|
+ ChatReq chatReq = new ChatReq();
|
|
|
+// String ipAddr = IpUtils.getIpAddr();//获取用户的ip地址 传给大模型
|
|
|
+ String ipAddr = "";//获取用户的ip地址 传给大模型 定时任务获取不到ip地址
|
|
|
+ int counts = 1;//默认是第一次
|
|
|
+ //这种问答 没有历史问答的概念 直接把问题扔进去就行 无需查询历史记录
|
|
|
+ List<String> historyDates = new ArrayList<>();
|
|
|
+ //构建问题(替换提示词中的占位符)
|
|
|
+ String shWarningPrompt = YC_WARNING_PROMPT;
|
|
|
+ shWarningPrompt =shWarningPrompt.replace("#{0}", xinyiWarningRecord.getReason());
|
|
|
+ shWarningPrompt =shWarningPrompt.replace("#{1}", String.valueOf(xinyiWarningRecord.getDesignVal()));
|
|
|
+ shWarningPrompt =shWarningPrompt.replace("#{2}", String.valueOf(xinyiWarningRecord.getWarningVal()));
|
|
|
+ shWarningPrompt =shWarningPrompt.replace("#{3}", String.valueOf(xinyiWarningRecord.getForecastVal()));
|
|
|
+ historyDates.add(shWarningPrompt);
|
|
|
+ // 获取输出流
|
|
|
+ ManagedChannel channel = null;
|
|
|
+ try {
|
|
|
+ channel = ManagedChannelBuilder.forAddress("10.0.0.24", 17070)
|
|
|
+ .usePlaintext()
|
|
|
+ .build();
|
|
|
+ InferenceAPIsServiceGrpc.InferenceAPIsServiceBlockingStub stub = InferenceAPIsServiceGrpc.newBlockingStub(channel);
|
|
|
+ String dataJson = "{\"bot_id\":\"721\",\"exp_id\":\"721\",\"session_id\":\"" + sessionId + "\",\"use_rag\":\"true\",\"prompt\":\"你是⼀个资深⽔务领域专家,能回答各种⽔务相关问题\",\"history_dia\":" + JSON.toJSONString(historyDates) + ",\"generate_args\":{\"max_new_tokens\":2048,\"max_length\":4096,\"num_beams\":1,\"do_sample\":true,\"top_p\":0.7,\"temperature\":0.95},\"extra\":{ \"ip_address\": \"" + ipAddr + "\" },\"strengthen\":" + (true) + "}";
|
|
|
+ log.info("请求大模型的问答参数为{}", dataJson);
|
|
|
+ PredictionsRequest request = PredictionsRequest.newBuilder()
|
|
|
+ .setModelName("slibra_bot")
|
|
|
+ .putInput("method", ByteString.copyFrom("infer_stream", "utf-8"))//推理
|
|
|
+ .putInput("data", ByteString.copyFrom(dataJson, "utf-8"))
|
|
|
+ .buildPartial();
|
|
|
+ Iterator<PredictionResponse> predictions = stub.streamPredictions(request);
|
|
|
+ //将结果记录到问答表
|
|
|
+ while (predictions.hasNext()) {
|
|
|
+ String responseStr = predictions.next().getPrediction().toStringUtf8();
|
|
|
+ log.info("大模型问答返回的原始结果为{}", responseStr);
|
|
|
+ responseStr = JSON.parseObject(responseStr).getString("message");
|
|
|
+ if("complete".equals(responseStr)){
|
|
|
+ log.info("结尾语句并且是非JSON,无需处理");
|
|
|
+ }else{
|
|
|
+ sb.append(responseStr);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ //将问答更新到数据库中
|
|
|
+ chatReq.setSessionId(sessionId);
|
|
|
+ chatReq.setType(3);//0问答 1决策 2本地 3仿真预测
|
|
|
+ chatReq.setModule(3);//0专家问答 1智能工单 2智能体助手 3告警 4简报
|
|
|
+ //todo 这里不需要再查询数据,让前端展示表格了,大模型做了该功能。
|
|
|
+ String showVal = this.buildShowValue(xinyiWarningRecord, tXinyiIndustry, normConfig);
|
|
|
+ chatReq.setShowVal(showVal);
|
|
|
+ chatReq.setQuestion(shWarningPrompt);
|
|
|
+ chatReq.setAnswer(sb.toString());
|
|
|
+ chatReq.setWarningId(String.valueOf(xinyiWarningRecord.getId()));
|
|
|
+ chatReq.setCounts(counts);//问答次数
|
|
|
+ chatReq.setUserId(WARNING_DEFAULT_CREATE);
|
|
|
+ chatReq.setCreateBy(WARNING_DEFAULT_CREATE);
|
|
|
+ chatReq.setCreateTime(DateUtils.getNowDate());
|
|
|
+ this.xinyiChatRecordMapper.insertTXinyiChatRecord(chatReq);
|
|
|
+ } catch (IOException e) {
|
|
|
+ throw new RuntimeException(e);
|
|
|
+ } finally {
|
|
|
+ // 关闭输出流
|
|
|
+ channel.shutdown();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ public static String testPredictor(String type){
|
|
|
+ // 获取输出流
|
|
|
+ ManagedChannel channel = null;
|
|
|
+ try {
|
|
|
+ channel = ManagedChannelBuilder.forAddress("10.0.0.24", 17070)
|
|
|
+ .usePlaintext()
|
|
|
+ .build();
|
|
|
+ InferenceAPIsServiceGrpc.InferenceAPIsServiceBlockingStub stub = InferenceAPIsServiceGrpc.newBlockingStub(channel);
|
|
|
+ String dataJson = "{\"bot_id\":\"b00001\",\"exp_id\":\"721\",\"norm\":\"" + type + "\",\"session_id\":\" " + IdUtils.simpleUUID() + " \",\"extra\":{}}";
|
|
|
+ log.info("请求大模型的预测的参数为{}", dataJson);
|
|
|
+ PredictionsRequest request = PredictionsRequest.newBuilder()
|
|
|
+ .setModelName("slibra_bot")
|
|
|
+ .putInput("method", ByteString.copyFrom("predictor", "utf-8"))//推理
|
|
|
+ .putInput("data", ByteString.copyFrom(dataJson, "utf-8"))
|
|
|
+ .buildPartial();
|
|
|
+ Iterator<PredictionResponse> predictions = stub.streamPredictions(request);
|
|
|
+ //将结果记录到问答表
|
|
|
+ String responseStr = predictions.next().getPrediction().toStringUtf8();
|
|
|
+ log.info("大模型的预测的返回结果为{}", responseStr);
|
|
|
+// return JSON.parseObject(responseStr).getString("pred");
|
|
|
+ return responseStr;
|
|
|
+ } catch (IOException e) {
|
|
|
+ throw new RuntimeException(e);
|
|
|
+ } finally {
|
|
|
+ assert channel != null;
|
|
|
+ channel.shutdown();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
*
|
|
|
* 2022/01/01 转成2022年01月01日 数据
|