package com.billion.main.plc.sub;
|
|
|
import cn.hutool.core.collection.CollUtil;
|
import cn.hutool.core.util.ObjectUtil;
|
import cn.hutool.http.HttpRequest;
|
import cn.hutool.http.HttpResponse;
|
import cn.hutool.json.JSONObject;
|
import cn.hutool.json.JSONUtil;
|
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
import com.billion.framework.websocket.WebSocketUsers;
|
import com.billion.main.da.domain.DaFailRecord;
|
import com.billion.main.da.domain.DaParamCollection;
|
import com.billion.main.da.service.IDaFailRecordService;
|
import com.billion.main.da.service.IDaParamCollectionService;
|
import com.billion.main.da.service.IDaStationCollectionService;
|
import com.billion.main.plc.constant.Constants;
|
import com.billion.main.sc.domain.ScCollectionParamConf;
|
import com.billion.main.sc.service.IScCollectionParamConfService;
|
import com.kangaroohy.milo.model.ReadWriteEntity;
|
import com.kangaroohy.milo.runner.subscription.SubscriptionCallback;
|
import com.kangaroohy.milo.service.MiloService;
|
import lombok.extern.slf4j.Slf4j;
|
import org.springframework.scheduling.annotation.Scheduled;
|
import org.springframework.stereotype.Component;
|
|
import javax.websocket.Session;
|
import java.text.ParseException;
|
import java.text.SimpleDateFormat;
|
import java.util.*;
|
import java.util.concurrent.CompletableFuture;
|
import java.util.stream.Collectors;
|
import java.lang.reflect.Field;
|
|
|
@Slf4j
|
@Component
|
public class OPCUaSubscription implements SubscriptionCallback {
|
public static Map<String, Session> map = WebSocketUsers.getUsers();
|
|
public static MiloService miloService;
|
public static IScCollectionParamConfService collectionParamConfService;
|
public static IDaParamCollectionService paramCollectionService;
|
public static IDaStationCollectionService stationCollectionService;
|
public static IDaFailRecordService failRecordService;
|
|
public OPCUaSubscription(MiloService miloService, IScCollectionParamConfService collectionParamConfService
|
, IDaParamCollectionService paramCollectionService, IDaStationCollectionService stationCollectionService
|
, IDaFailRecordService failRecordService) {
|
OPCUaSubscription.miloService = miloService;
|
OPCUaSubscription.collectionParamConfService = collectionParamConfService;
|
OPCUaSubscription.paramCollectionService = paramCollectionService;
|
OPCUaSubscription.stationCollectionService = stationCollectionService;
|
OPCUaSubscription.failRecordService = failRecordService;
|
}
|
|
@Override
|
public void onSubscribe(String identifier, Object value) {
|
log.info("地址:"+identifier+"值:"+value);
|
try {
|
if(null != value && !Constants.ZERO.equals(value.toString())) {
|
String[] nodes = identifier.split("[.]");
|
String thoroughfare = nodes[0];//通道
|
String device = nodes[1];//设备
|
String tab = nodes[2];//标记
|
String valueString = value.toString();//地址值
|
|
CompletableFuture<Void> cp1 = CompletableFuture.runAsync(() -> {
|
subHandle(thoroughfare,device,tab,valueString);
|
});
|
|
}
|
} catch (Exception e) {
|
log.error(e.getMessage());
|
}
|
}
|
|
public void subHandle(String thoroughfare,String device,String tab,String valueString){
|
try{
|
//监听recordData
|
if(tab.equals(Constants.RECORD_DATA)){
|
//如果是2做出站处理
|
if(valueString.equals(Constants.TWO)){
|
Integer result = Constants.INTEGER_TWO_ONE;
|
result = saveParamCollection(device, "snCode", "stationStatus");
|
miloService.writeToOpcShort(ReadWriteEntity.builder().identifier(thoroughfare + "." + device + ".RecordDataDone").value(result).build());
|
// miloService.writeToOpcUa(ReadWriteEntity.builder().identifier(thoroughfare + "." + device + ".RecordDataDone").value("21").build());
|
}
|
}
|
}catch (Exception e) {
|
log.error(e.getMessage());
|
}
|
}
|
|
|
/**
|
* 保存参数数据和发送工厂MES
|
* @param device 工位
|
* @param snCode 产品SN
|
* @param stationStatus 站状态
|
* @return result
|
* @throws Exception e
|
*/
|
private static Integer saveParamCollection(String device, String snCode, String stationStatus) {
|
Integer result = Constants.INTEGER_TWO_ONE;
|
try {
|
// 查询参数配置表
|
List<ScCollectionParamConf> list = collectionParamConfService.list(new LambdaQueryWrapper<ScCollectionParamConf>()
|
.eq(ScCollectionParamConf::getLocationCode, device)
|
.eq(ScCollectionParamConf::getSubscribe, Constants.ONE)
|
.orderByDesc(ScCollectionParamConf::getOrderNum)
|
);
|
|
if (CollUtil.isNotEmpty(list)) {
|
Object sfcCodeObject = miloService.readFromOpcUa( "wc." + device + ".sfcCode").getValue();
|
|
// 获取参数值
|
List<String> collect = list.stream()
|
.map(ScCollectionParamConf::getNode)
|
.collect(Collectors.toList());
|
List<ReadWriteEntity> readWriteEntityList = miloService.readFromOpcUa(collect);
|
|
for (int i = 0; i < readWriteEntityList.size(); i++) {
|
ReadWriteEntity entity = readWriteEntityList.get(i);
|
if (ObjectUtil.isNull(entity.getValue())) {
|
log.error("读取到的值为空,跳过保存操作。设备:{},参数:{}", device, list.get(i).getParamCode());
|
return Constants.INTEGER_TWO_TWO;
|
}
|
}
|
|
// 保存采集数据
|
saveCollectionData(device, sfcCodeObject.toString(), list, readWriteEntityList);
|
|
// 创建工位对象并赋值
|
Object stationObject = createStationObject(device, list, readWriteEntityList);
|
if (stationObject != null) {
|
System.out.println("工位" + device + "对象创建成功:" + stationObject);
|
// 这里可以对创建的对象进行进一步处理
|
handleMesRequest(stationObject, device);
|
}
|
}
|
|
} catch (Exception e) {
|
log.error("保存数据异常", e);
|
throw new RuntimeException("保存数据发送工厂MES异常");
|
}
|
return result;
|
}
|
|
/**
|
* 保存采集数据
|
*/
|
private static void saveCollectionData(String device, String snCode,
|
List<ScCollectionParamConf> list, List<ReadWriteEntity> readWriteEntityList) {
|
ArrayList<DaParamCollection> collectionList = new ArrayList<>();
|
for (int i = 0; i < readWriteEntityList.size(); i++) {
|
DaParamCollection daParamCollection = new DaParamCollection();
|
daParamCollection.setSfcCode(snCode);
|
daParamCollection.setLocationCode(device);
|
daParamCollection.setParamCode(list.get(i).getParamCode());
|
daParamCollection.setParamName(list.get(i).getParamName());
|
daParamCollection.setParamValue(readWriteEntityList.get(i).getValue().toString().trim());
|
daParamCollection.setCollectTime(new Date());
|
collectionList.add(daParamCollection);
|
}
|
paramCollectionService.insertBatch(collectionList);
|
}
|
|
/**
|
* 创建工位对象并赋值
|
*/
|
private static Object createStationObject(String device,
|
List<ScCollectionParamConf> list, List<ReadWriteEntity> readWriteEntityList) {
|
try {
|
// 构建完整的类名
|
String className = "com.billion.main.api.domain." + device;
|
Class<?> stationClass = Class.forName(className);
|
Object stationObject = stationClass.newInstance();
|
|
// 获取所有属性
|
Field[] fields = stationClass.getDeclaredFields();
|
|
// 创建参数值Map,方便查找
|
Map<String, String> paramValueMap = new HashMap<>();
|
for (int i = 0; i < list.size(); i++) {
|
paramValueMap.put(list.get(i).getParamCode(),
|
readWriteEntityList.get(i).getValue().toString());
|
}
|
|
// 按顺序给属性赋值
|
for (Field field : fields) {
|
field.setAccessible(true);
|
String paramCode = field.getName(); // 假设属性名与参数编码一致
|
String value = paramValueMap.get(paramCode);
|
|
if (value != null) {
|
// 根据字段类型转换值
|
Object convertedValue = convertValue(value, field.getType());
|
field.set(stationObject, convertedValue);
|
}
|
}
|
|
return stationObject;
|
} catch (Exception e) {
|
log.error("创建工位对象失败:" + device, e);
|
return null;
|
}
|
}
|
|
/**
|
* 根据字段类型转换值
|
*/
|
private static Object convertValue(String value, Class<?> type) {
|
if (type == String.class) {
|
return value.trim();
|
} else if (type == Integer.class || type == int.class) {
|
return Integer.parseInt(value);
|
} else if (type == Double.class || type == double.class) {
|
return Double.parseDouble(value);
|
} else if (type == Float.class || type == float.class) {
|
return Float.parseFloat(value);
|
} else if (type == Long.class || type == long.class) {
|
return Long.parseLong(value);
|
} else if (type == Boolean.class || type == boolean.class) {
|
return Boolean.parseBoolean(value);
|
} else if (type == Byte.class || type == byte.class) {
|
return Byte.parseByte(value);
|
} else if (type == Date.class) {
|
try {
|
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
|
return sdf.parse(value);
|
} catch (ParseException e) {
|
return null;
|
}
|
}
|
return value;
|
}
|
|
private static void handleMesRequest(Object stationObject, String device) {
|
try {
|
System.out.println("stationObject"+stationObject+"-------------------device"+device);
|
WebSocketUsers.sendMessageToUserByText(map.get("IGBT"), "21");
|
|
// 发送HTTP请求
|
// HttpResponse response = HttpRequest.post(Constants.WC_TEST_MES_URL)
|
// .header("token", "XNY")
|
// .body(JSONUtil.toJsonStr(stationObject))
|
// .timeout(5000) // 设置超时时间
|
// .execute();
|
// System.out.println("--------------------------"+response.body());
|
// log.info("发送潍柴MES结果{}", response.body());
|
// // 解析响应
|
// if (response.isOk()) { // 请求成功
|
// JSONObject jsonResponse = JSONUtil.parseObj(response.body());
|
// Integer code = jsonResponse.getInt("code");
|
//
|
// if (code == 200) {
|
// // 响应不是200,保存失败记录
|
// saveMesFailRecord(stationObject, device, "响应正常:" + code, "1");
|
// }else {
|
// saveMesFailRecord(stationObject, device, "响应异常:" + code,Constants.ZERO);
|
// }
|
// } else {
|
// // HTTP请求失败,保存失败记录
|
// saveMesFailRecord(stationObject, device, "HTTP状态码:" + response.getStatus(),Constants.ZERO);
|
// }
|
} catch (Exception e) {
|
// 发生异常(比如连接超时、服务不可用等),保存失败记录
|
saveMesFailRecord(stationObject, device, "请求异常:" + e.getMessage(),Constants.ZERO);
|
}
|
}
|
|
/**
|
* 保存MES推送失败记录
|
*/
|
private static void saveMesFailRecord(Object stationObject, String device, String errorMsg, String status) {
|
try {
|
DaFailRecord failRecord = new DaFailRecord();
|
failRecord.setDeviceCode(device);
|
failRecord.setRequestData(JSONUtil.toJsonStr(stationObject));
|
failRecord.setErrorMsg(errorMsg);
|
failRecord.setCreateTime(new Date());
|
failRecord.setStatus(status); // 0:未处理 1:已处理
|
failRecord.setRetryCount(0); // 重试次数
|
|
// 保存到数据库
|
failRecordService.save(failRecord);
|
if(status.equals("1")){
|
log.error("MES推送成功,已保存成功记录。设备:{},报文:{}", device, errorMsg);
|
}else {
|
log.error("MES推送失败,已保存失败记录。设备:{},错误:{}", device, errorMsg);
|
}
|
} catch (Exception e) {
|
log.error("保存MES失败记录时发生错误", e);
|
}
|
}
|
|
// @Scheduled(fixedDelay = 300000) // 5分钟执行一次
|
// public void retryFailedRecords() {
|
// List<DaFailRecord> failRecords = failRecordService.findUnhandledRecords();
|
// for (MesFailRecord record : failRecords) {
|
// // 重试逻辑
|
// }
|
// }
|
|
@Scheduled(cron = "0 0 1 * * ?") // 每天凌晨1点执行
|
public void cleanOldRecords() {
|
try {
|
// 清理已处理的历史数据
|
List<DaFailRecord> unHandledList = failRecordService.list(new LambdaQueryWrapper<DaFailRecord>().eq(DaFailRecord::getStatus, "0"));
|
for (DaFailRecord daFailRecord : unHandledList) {
|
HttpResponse response = HttpRequest.post(Constants.WC_TEST_MES_URL)
|
.body(JSONUtil.toJsonStr(daFailRecord.getRequestData()))
|
.timeout(5000) // 设置超时时间
|
.execute();
|
// 解析响应
|
if (response.isOk()) { // 请求成功
|
JSONObject jsonResponse = JSONUtil.parseObj(response.body());
|
Integer code = jsonResponse.getInt("code");
|
if (code == 200) {
|
failRecordService.deleteDaFailRecordById(daFailRecord.getId());
|
}else {
|
this.updateFailRecordRetryCount(daFailRecord);
|
}
|
} else {
|
// HTTP请求失败,保存失败记录
|
this.updateFailRecordRetryCount(daFailRecord);
|
}
|
}
|
} catch (Exception e) {
|
log.error("定时重发MES失败记录时发生错误", e);
|
}
|
}
|
|
public void updateFailRecordRetryCount(DaFailRecord failRecord){
|
failRecord.setRetryCount(failRecord.getRetryCount()+1);
|
boolean b = failRecordService.saveOrUpdate(failRecord);
|
}
|
|
}
|