package com.billion.main.plc.sub; import cn.hutool.core.collection.CollUtil; import cn.hutool.core.util.ObjectUtil; import cn.hutool.http.HttpRequest; import cn.hutool.http.HttpResponse; import cn.hutool.json.JSONObject; import cn.hutool.json.JSONUtil; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.billion.framework.websocket.WebSocketUsers; import com.billion.main.da.domain.DaFailRecord; import com.billion.main.da.domain.DaParamCollection; import com.billion.main.da.service.IDaFailRecordService; import com.billion.main.da.service.IDaParamCollectionService; import com.billion.main.da.service.IDaStationCollectionService; import com.billion.main.plc.constant.Constants; import com.billion.main.sc.domain.ScCollectionParamConf; import com.billion.main.sc.service.IScCollectionParamConfService; import com.kangaroohy.milo.model.ReadWriteEntity; import com.kangaroohy.milo.runner.subscription.SubscriptionCallback; import com.kangaroohy.milo.service.MiloService; import lombok.extern.slf4j.Slf4j; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import javax.websocket.Session; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; import java.lang.reflect.Field; @Slf4j @Component public class OPCUaSubscription implements SubscriptionCallback { public static Map map = WebSocketUsers.getUsers(); public static MiloService miloService; public static IScCollectionParamConfService collectionParamConfService; public static IDaParamCollectionService paramCollectionService; public static IDaStationCollectionService stationCollectionService; public static IDaFailRecordService failRecordService; public OPCUaSubscription(MiloService miloService, IScCollectionParamConfService collectionParamConfService , IDaParamCollectionService paramCollectionService, IDaStationCollectionService stationCollectionService , IDaFailRecordService failRecordService) { OPCUaSubscription.miloService = miloService; OPCUaSubscription.collectionParamConfService = collectionParamConfService; OPCUaSubscription.paramCollectionService = paramCollectionService; OPCUaSubscription.stationCollectionService = stationCollectionService; OPCUaSubscription.failRecordService = failRecordService; } @Override public void onSubscribe(String identifier, Object value) { log.info("地址:"+identifier+"值:"+value); try { if(null != value && !Constants.ZERO.equals(value.toString())) { String[] nodes = identifier.split("[.]"); String thoroughfare = nodes[0];//通道 String device = nodes[1];//设备 String tab = nodes[2];//标记 String valueString = value.toString();//地址值 CompletableFuture cp1 = CompletableFuture.runAsync(() -> { subHandle(thoroughfare,device,tab,valueString); }); } } catch (Exception e) { log.error(e.getMessage()); } } public void subHandle(String thoroughfare,String device,String tab,String valueString){ try{ //监听recordData if(tab.equals(Constants.RECORD_DATA)){ //如果是2做出站处理 if(valueString.equals(Constants.TWO)){ Integer result = Constants.INTEGER_TWO_ONE; result = saveParamCollection(device, "snCode", "stationStatus"); miloService.writeToOpcShort(ReadWriteEntity.builder().identifier(thoroughfare + "." + device + ".RecordDataDone").value(result).build()); // miloService.writeToOpcUa(ReadWriteEntity.builder().identifier(thoroughfare + "." + device + ".RecordDataDone").value("21").build()); } } }catch (Exception e) { log.error(e.getMessage()); } } /** * 保存参数数据和发送工厂MES * @param device 工位 * @param snCode 产品SN * @param stationStatus 站状态 * @return result * @throws Exception e */ private static Integer saveParamCollection(String device, String snCode, String stationStatus) { Integer result = Constants.INTEGER_TWO_ONE; try { // 查询参数配置表 List list = collectionParamConfService.list(new LambdaQueryWrapper() .eq(ScCollectionParamConf::getLocationCode, device) .eq(ScCollectionParamConf::getSubscribe, Constants.ONE) .orderByDesc(ScCollectionParamConf::getOrderNum) ); if (CollUtil.isNotEmpty(list)) { Object sfcCodeObject = miloService.readFromOpcUa( "wc." + device + ".sfcCode").getValue(); // 获取参数值 List collect = list.stream() .map(ScCollectionParamConf::getNode) .collect(Collectors.toList()); List readWriteEntityList = miloService.readFromOpcUa(collect); for (int i = 0; i < readWriteEntityList.size(); i++) { ReadWriteEntity entity = readWriteEntityList.get(i); if (ObjectUtil.isNull(entity.getValue())) { log.error("读取到的值为空,跳过保存操作。设备:{},参数:{}", device, list.get(i).getParamCode()); return Constants.INTEGER_TWO_TWO; } } // 保存采集数据 saveCollectionData(device, sfcCodeObject.toString(), list, readWriteEntityList); // 创建工位对象并赋值 Object stationObject = createStationObject(device, list, readWriteEntityList); if (stationObject != null) { System.out.println("工位" + device + "对象创建成功:" + stationObject); // 这里可以对创建的对象进行进一步处理 handleMesRequest(stationObject, device); } } } catch (Exception e) { log.error("保存数据异常", e); throw new RuntimeException("保存数据发送工厂MES异常"); } return result; } /** * 保存采集数据 */ private static void saveCollectionData(String device, String snCode, List list, List readWriteEntityList) { ArrayList collectionList = new ArrayList<>(); for (int i = 0; i < readWriteEntityList.size(); i++) { DaParamCollection daParamCollection = new DaParamCollection(); daParamCollection.setSfcCode(snCode); daParamCollection.setLocationCode(device); daParamCollection.setParamCode(list.get(i).getParamCode()); daParamCollection.setParamName(list.get(i).getParamName()); daParamCollection.setParamValue(readWriteEntityList.get(i).getValue().toString().trim()); daParamCollection.setCollectTime(new Date()); collectionList.add(daParamCollection); } paramCollectionService.insertBatch(collectionList); } /** * 创建工位对象并赋值 */ private static Object createStationObject(String device, List list, List readWriteEntityList) { try { // 构建完整的类名 String className = "com.billion.main.api.domain." + device; Class stationClass = Class.forName(className); Object stationObject = stationClass.newInstance(); // 获取所有属性 Field[] fields = stationClass.getDeclaredFields(); // 创建参数值Map,方便查找 Map paramValueMap = new HashMap<>(); for (int i = 0; i < list.size(); i++) { paramValueMap.put(list.get(i).getParamCode(), readWriteEntityList.get(i).getValue().toString()); } // 按顺序给属性赋值 for (Field field : fields) { field.setAccessible(true); String paramCode = field.getName(); // 假设属性名与参数编码一致 String value = paramValueMap.get(paramCode); if (value != null) { // 根据字段类型转换值 Object convertedValue = convertValue(value, field.getType()); field.set(stationObject, convertedValue); } } return stationObject; } catch (Exception e) { log.error("创建工位对象失败:" + device, e); return null; } } /** * 根据字段类型转换值 */ private static Object convertValue(String value, Class type) { if (type == String.class) { return value.trim(); } else if (type == Integer.class || type == int.class) { return Integer.parseInt(value); } else if (type == Double.class || type == double.class) { return Double.parseDouble(value); } else if (type == Float.class || type == float.class) { return Float.parseFloat(value); } else if (type == Long.class || type == long.class) { return Long.parseLong(value); } else if (type == Boolean.class || type == boolean.class) { return Boolean.parseBoolean(value); } else if (type == Byte.class || type == byte.class) { return Byte.parseByte(value); } else if (type == Date.class) { try { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); return sdf.parse(value); } catch (ParseException e) { return null; } } return value; } private static void handleMesRequest(Object stationObject, String device) { try { System.out.println("stationObject"+stationObject+"-------------------device"+device); WebSocketUsers.sendMessageToUserByText(map.get("IGBT"), "21"); // 发送HTTP请求 // HttpResponse response = HttpRequest.post(Constants.WC_TEST_MES_URL) // .header("token", "XNY") // .body(JSONUtil.toJsonStr(stationObject)) // .timeout(5000) // 设置超时时间 // .execute(); // System.out.println("--------------------------"+response.body()); // log.info("发送潍柴MES结果{}", response.body()); // // 解析响应 // if (response.isOk()) { // 请求成功 // JSONObject jsonResponse = JSONUtil.parseObj(response.body()); // Integer code = jsonResponse.getInt("code"); // // if (code == 200) { // // 响应不是200,保存失败记录 // saveMesFailRecord(stationObject, device, "响应正常:" + code, "1"); // }else { // saveMesFailRecord(stationObject, device, "响应异常:" + code,Constants.ZERO); // } // } else { // // HTTP请求失败,保存失败记录 // saveMesFailRecord(stationObject, device, "HTTP状态码:" + response.getStatus(),Constants.ZERO); // } } catch (Exception e) { // 发生异常(比如连接超时、服务不可用等),保存失败记录 saveMesFailRecord(stationObject, device, "请求异常:" + e.getMessage(),Constants.ZERO); } } /** * 保存MES推送失败记录 */ private static void saveMesFailRecord(Object stationObject, String device, String errorMsg, String status) { try { DaFailRecord failRecord = new DaFailRecord(); failRecord.setDeviceCode(device); failRecord.setRequestData(JSONUtil.toJsonStr(stationObject)); failRecord.setErrorMsg(errorMsg); failRecord.setCreateTime(new Date()); failRecord.setStatus(status); // 0:未处理 1:已处理 failRecord.setRetryCount(0); // 重试次数 // 保存到数据库 failRecordService.save(failRecord); if(status.equals("1")){ log.error("MES推送成功,已保存成功记录。设备:{},报文:{}", device, errorMsg); }else { log.error("MES推送失败,已保存失败记录。设备:{},错误:{}", device, errorMsg); } } catch (Exception e) { log.error("保存MES失败记录时发生错误", e); } } // @Scheduled(fixedDelay = 300000) // 5分钟执行一次 // public void retryFailedRecords() { // List failRecords = failRecordService.findUnhandledRecords(); // for (MesFailRecord record : failRecords) { // // 重试逻辑 // } // } @Scheduled(cron = "0 0 1 * * ?") // 每天凌晨1点执行 public void cleanOldRecords() { try { // 清理已处理的历史数据 List unHandledList = failRecordService.list(new LambdaQueryWrapper().eq(DaFailRecord::getStatus, "0")); for (DaFailRecord daFailRecord : unHandledList) { HttpResponse response = HttpRequest.post(Constants.WC_TEST_MES_URL) .body(JSONUtil.toJsonStr(daFailRecord.getRequestData())) .timeout(5000) // 设置超时时间 .execute(); // 解析响应 if (response.isOk()) { // 请求成功 JSONObject jsonResponse = JSONUtil.parseObj(response.body()); Integer code = jsonResponse.getInt("code"); if (code == 200) { failRecordService.deleteDaFailRecordById(daFailRecord.getId()); }else { this.updateFailRecordRetryCount(daFailRecord); } } else { // HTTP请求失败,保存失败记录 this.updateFailRecordRetryCount(daFailRecord); } } } catch (Exception e) { log.error("定时重发MES失败记录时发生错误", e); } } public void updateFailRecordRetryCount(DaFailRecord failRecord){ failRecord.setRetryCount(failRecord.getRetryCount()+1); boolean b = failRecordService.saveOrUpdate(failRecord); } }