ifishSocket/src/main/java/com/ifish/socketNew/SomeServer.java

520 lines
20 KiB
Java
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package com.ifish.socketNew;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.ifish.entity.*;
import com.ifish.entity.event.QueueEventBody;
import com.ifish.entity.event.QueueEventEntity;
import com.ifish.enums.BooleanEnum;
import com.ifish.enums.PushTypeEnum;
import com.ifish.netease.NeteaseIM;
import com.ifish.quartz.JobGroup;
import com.ifish.quartz.ScheduleJob;
import com.ifish.service.DeviceService;
import com.ifish.service.UserService;
import com.ifish.socketNew.model.receive.*;
import com.ifish.socketNew.model.send.*;
import com.ifish.socketNew.util.OrderModel;
import com.ifish.util.ByteUtil;
import com.ifish.util.IfishUtil;
import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.session.IoSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Session;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
/**
* 这个类主要是通过spring注入给MinaServerHandler用的, 表示: mina在接收到信息后,该由主要业务类来处理 服务器处理类
**/
public class SomeServer {
@Autowired
private UserService userService;
@Autowired
private DeviceService deviceService;
@Autowired
private ScheduleJob scheduleJob;
@Autowired
private NeteaseIM neteaseIM;
@Autowired
private JmsTemplate jmsTemplate;
@Autowired
private Destination ifish7PushQueueDestination;
//是否回复心跳
public static boolean isReplay = true;
private static Logger log = LoggerFactory.getLogger(SomeServer.class);
public static final ConcurrentHashMap<String, String> remoteAddress = new ConcurrentHashMap<String, String>();
public static final ConcurrentHashMap<String, IoSession> sessions_cz = new ConcurrentHashMap<String, IoSession>();
private ConcurrentHashMap<String, CopyOnWriteArraySet<IoSession>> sessions_sjs = new ConcurrentHashMap<String, CopyOnWriteArraySet<IoSession>>();
private volatile Map<String,Integer> heaterPushStatus = new HashMap<>();
/**
* 过滤收到的数据 服务器收到数据,进行协议类型匹配,如果不匹配,则不执行相应操作。
* @param session
* @param message
*/
public void doSome(IoSession session, Object message) {
//通配字符串
if(message instanceof byte[]){
byte[] bytes = (byte[]) message;
//长度
int length = bytes[14] & 0xff;
IoBuffer ioBuffer = ByteUtil.byteToIoBuffer(bytes, length);
byte type = ioBuffer.get();
// int Check_code = (int) ioBuffer.get();
//mac地址
byte[] srcByte = new byte[6];
for (int i = 2; i < 8; i++) {
srcByte[i-2] = bytes[i];
}
String strSrc = ByteUtil.bytesToHexString(srcByte);
//如果为设备端返回 发送给app端
if(type == 1){
sendToApp(session,strSrc, message);
}
else{
sendToDevice(session,strSrc, message);
}
} else if (message instanceof BackFunctionCodeHeater) {
//智能加热棒 获取服务器响应数据 以小时分开 mac地址唯一 最多24条数据
BackFunctionCodeHeater heater = (BackFunctionCodeHeater) message;
String macAddress = ByteUtil.bytesToHexString(heater.getSrc());
String hour = String.valueOf(Calendar.getInstance().get(Calendar.HOUR_OF_DAY));
List<DeviceHeater> deviceHeaters = deviceService.getDeviceHeaterByProperty(hour, macAddress,IfishUtil.format1(new Date()));
if (deviceHeaters.size() > 0) {
DeviceHeater deviceHeater = deviceHeaters.get(0);
deviceHeater.setHeaterWaterTemperature(String.valueOf(heater.getWaterTemperature()));
deviceHeater.setHeaterPh(String.valueOf(heater.getPh()));
deviceService.update(deviceHeater);
} else {
deviceService.save(heater);
}
//功能码
if (heater.getCheck_code() == 5 || heater.getCheck_code() == 6 || heater.getCheck_code() == 8) {
Integer status = heaterPushStatus.get(macAddress);
//当前水流量不正常 并且更新过正常水流标记 或者是第一次出现没有水流
if (heater.getPh() != 499 && ((status != null && status.intValue() == 2) || status == null)) {
//已推送标记
heaterPushStatus.put(macAddress, 1);
log.info(heater.toString());
pushNotifcationByPh(macAddress,PushTypeEnum.ph_warn.getValue(),"没有水流, 请及时查看!");
} else {
if (heater.getPh() == 499) {
heaterPushStatus.put(macAddress, 2);
}
}
}
//设备重新连接上,则移除延时推送的任务
JobGroup jobGroup = new JobGroup();
jobGroup.setJobName(macAddress);
jobGroup.setTriggerName(macAddress);
scheduleJob.deleteJob(jobGroup);
sendToApp(session,macAddress,heater.getByteMessage());
} else if (message instanceof BackBytes45) {
BackBytes45 backBytes45 = (BackBytes45) message;
String macAddress = ByteUtil.bytesToHexString(backBytes45.getSrc());
if (backBytes45.getCheck_code() == 3 ||
backBytes45.getCheck_code() == 5 ||
backBytes45.getCheck_code() == 6 ||
backBytes45.getCheck_code() == 8) {
//需要推送预警
if (backBytes45.getHumidity() == 2) {
log.info("macAddress : " + macAddress + ", 水位预警 : " + backBytes45);
pushNotifcationByPh(macAddress,PushTypeEnum.water_warn.getValue(),"水位预警, 请及时查看!");
}
}
sendToApp(session, macAddress, backBytes45.getByteMessage());
}
//设置设备温度报警
else if(message instanceof OrderFunctionCode5) {
OrderFunctionCode5 receive = (OrderFunctionCode5) message;
String strDest = ByteUtil .bytesToHexString(receive.getDest());
sendToDevice(session,strDest, message);
try {
Integer Onoff = receive.getOnoff() & 0xff;
if(Onoff==1 || Onoff==0){
deviceService.updateWarnOnoff(strDest,Onoff.toString());
}
else{
log.error("error Onoff:{},macAddress:{}",Onoff,strDest);
}
} catch (Exception e) {
log.error("set OrderSetWarnModel error:macAddress:{},error msg:{}",strDest,e.toString());
}
}
//设备温度报警
else if(message instanceof OrderFunctionCode9) {
OrderFunctionCode9 receive = (OrderFunctionCode9) message;
String strSrc = ByteUtil.bytesToHexString(receive.getSrc());
//推送类型
Integer warnType = receive.getWarn_type() & 0xff;
if(warnType==0){
log.info("mac : " + strSrc + ", 低温预警 : " + receive);
pushNotifcation(strSrc,PushTypeEnum.wendu_warn.getValue(),"温度达到"+(float)receive.getWendu()/10f+"℃,已低于"+(float)receive.getWarn_wendu()/10f+"℃,请及时查看!");
}
else if(warnType==1){
log.info("mac : " + strSrc + ", 高温预警 : " + receive);
pushNotifcation(strSrc,PushTypeEnum.wendu_warn.getValue(),"温度达到"+(float)receive.getWendu()/10f+"℃,已高于"+(float)receive.getWarn_wendu()/10f+"℃,请及时查看!");
}
else{
log.error("error warn_type:{},macAddress:{}",warnType,strSrc);
}
}
//设备登录服务器
else if(message instanceof OrderFunctionCode1) {
OrderFunctionCode1 receive = (OrderFunctionCode1) message;
byte[] srcBytes = receive.getSrc();
byte[] destBytes = receive.getDest();
String strSrc = ByteUtil.bytesToHexString(srcBytes);
//服务器回复设备登录请求
BackFunctionCode1 model = OrderModel.BackFunctionCode1(destBytes, srcBytes);
//直接往本次session写回去
session.write(model);
//保存连接
sessions_cz.put(strSrc, session);
remoteAddress.put(session.getRemoteAddress().toString(), strSrc);
//设备重新连接上,则移除延时推送的任务
JobGroup jobGroup = new JobGroup();
jobGroup.setJobName(strSrc);
jobGroup.setTriggerName(strSrc);
scheduleJob.deleteJob(jobGroup);
try {
//更新设备信息
deviceService.update(receive);
// if(device==null){
// String HexDump = ByteUtil.toHex(receive.getType())+ByteUtil.toHex(receive.getCheck_code())+ByteUtil.bytesToHexString(receive.getSrc())+ByteUtil.bytesToHexString(destBytes)+ByteUtil.toHex(receive.getRemote_len())+ByteUtil.toHex(receive.getVendor())+ByteUtil.toHex(receive.getHardware_type())+ByteUtil.toHex(receive.getVersion())+ByteUtil.bytesToHexString(receive.getLogin_ip())+ByteUtil.bytesToHexString(receive.getCrc16_code());
// log.error("HexDump:{}",HexDump);
// log.error("factoryCode or hardwareType not exist:macAddress:{},factoryCode:{},hardwareType:{}",strSrc,ByteUtil.toHex(receive.getVendor()),ByteUtil.toHex(receive.getHardware_type()));
// }
// else{
//记录登录时间
LoginRecord loginRecord = new LoginRecord();
loginRecord.setMacAddress(strSrc);
loginRecord.setLoginTime(new Date());
deviceService.save(loginRecord);
// }
} catch (Exception e) {
log.error("save device login error:macAddress:{},error msg:{}",strSrc,e.toString());
}
}
//APP登录服务器
else if(message instanceof OrderFunctionCode0) {
OrderFunctionCode0 receive = (OrderFunctionCode0) message;
byte[] srcBytes = receive.getSrc();
byte[] destBytes = receive.getDest();
String strDest = ByteUtil.bytesToHexString(receive.getDest());
//设备是否在线
IoSession tmpSession = sessions_cz.get(strDest);
//1表示在线0表示离线
byte login_status = 0;
if(tmpSession!=null && tmpSession.getRemoteAddress()!=null){
login_status = 1;
}
else {
login_status = 0;
}
//服务器回复APP登录请求
BackFunctionCode0 model = OrderModel.BackFunctionCode0(destBytes, srcBytes ,login_status);
session.write(model);
//保存APP连接
CopyOnWriteArraySet<IoSession> sessionSet = sessions_sjs.get(strDest);
if(sessionSet==null){
sessionSet = new CopyOnWriteArraySet<IoSession>();
sessionSet.add(session);
sessions_sjs.put(strDest, sessionSet);
}
else{
for(IoSession ioSession : sessionSet) {
if(ioSession==null || ioSession.getRemoteAddress()==null){
sessionSet.remove(ioSession);
}
}
sessionSet.add(session);
}
}
//模块版本号
else if(message instanceof OrderFunctionCode16) {
OrderFunctionCode16 receive = (OrderFunctionCode16) message;
byte[] macBytes = receive.getSrc();
String strSrc = ByteUtil.bytesToHexString(macBytes);
try {
//查找设备
Device device = this.deviceService.getUniqueByProperty("macAddress", strSrc);
if(device!=null){
//是否升级
if(device.getIsUpgrade().equals(BooleanEnum.YES.getKey())){
device.setIsUpgrade(BooleanEnum.NO.getKey());
device.setUpgradeTime(new Date());
//升级版本
int upgradeVersion = device.getUpgradeVersion();
OrderFunctionCode15 model = OrderModel.OrderFunctionCode15(macBytes,upgradeVersion);
session.write(model);
}
//记录参数
device.setSdkVersion(receive.getVersion() & 0xff);
device.setSdkTime(IfishUtil.StrToDate(receive.getYear()+"-"+(receive.getMonth() & 0xff)+"-"+(receive.getDay() & 0xff)));
this.deviceService.update(device);
}
} catch (Exception e) {
log.error("save device sdk version16 error:macAddress:{},error msg:{}",strSrc,e.toString());
}
}
//模块版本号
else if(message instanceof OrderFunctionCode17) {
OrderFunctionCode17 receive = (OrderFunctionCode17) message;
byte[] macBytes = receive.getSrc();
String strSrc = ByteUtil.bytesToHexString(macBytes);
try {
//查找设备
Device device = this.deviceService.getUniqueByProperty("macAddress", strSrc);
if(device!=null){
//是否升级
if(device.getIsUpgrade().equals(BooleanEnum.YES.getKey())){
device.setIsUpgrade(BooleanEnum.NO.getKey());
device.setUpgradeTime(new Date());
//升级版本
int upgradeVersion = device.getUpgradeVersion();
OrderFunctionCode15 model = OrderModel.OrderFunctionCode15(macBytes,upgradeVersion);
session.write(model);
}
//记录参数
device.setSdkVersion(receive.getVersion() & 0xff);
device.setSdkTime(IfishUtil.StrToDate(receive.getYear()+"-"+(receive.getMonth() & 0xff)+"-"+(receive.getDay() & 0xff)));
device.setNumber1(receive.getNumber1());
device.setNumber2(receive.getNumber2());
device.setNumber3(receive.getNumber3());
device.setNumber4(receive.getNumber4());
device.setNumber5(receive.getNumber5());
this.deviceService.update(device);
}
} catch (Exception e) {
log.error("save device sdk version17 error:macAddress:{},error msg:{}",strSrc,e.toString());
}
}
}
/**
* 发送数据APP
* @param message 发送对象
* @return
*/
public void sendToApp(IoSession session,String strDest,Object message) {
//回复设备
if(isReplay){
BackFunctionCode8 model = OrderModel.BackFunctionCode8(strDest);
session.write(model);
}
//转发给APP
CopyOnWriteArraySet<IoSession> sessionSet = sessions_sjs.get(strDest);
if(sessionSet!=null && sessionSet.size()>0){
for (IoSession sj_session : sessionSet) {
if(sj_session!=null && sj_session.getRemoteAddress()!=null){
sj_session.write(message);
}
else{
sessionSet.remove(sj_session);
}
}
}
}
/**
* 发送数据给设备
* @param message 发送对象
* @return
*/
public void sendToDevice(IoSession session,String strDest,Object message) {
IoSession cz_session = sessions_cz.get(strDest);
if(cz_session!=null && cz_session.getRemoteAddress()!=null){
cz_session.write(message);
}
else{
//设备离线
byte[] bytes = ByteUtil.hexStringToBytes2(strDest);
BackFunctionCode0 model = OrderModel.BackFunctionCode0(bytes, bytes ,(byte)0);
session.write(model);
}
}
/**
* 智能加热棒 水流量提醒
* @param strSrc
* @param title
* @param contont
*/
public void pushNotifcationByPh(String strSrc,String title,String contont) {
Device device = deviceService.getUniqueByProperty("macAddress", strSrc);
try {
if(device!=null) {
Integer deviceId = device.getDeviceId();
//绑定设备的用户
List<DeviceUser> list = deviceService.getListByProperty(device.getDeviceId());
for (DeviceUser deviceUser : list) {
Integer userId = deviceUser.getPriId().getUserId();
String showName = deviceUser.getShowName();
String msg = "" + title + "】你的水族箱“" + showName + "" + contont;
setPushMsg(title, device, deviceId, userId, showName, msg, PushTypeEnum.ph_warn);
}
}
}catch (Exception e) {
log.error(e.getLocalizedMessage(), e);
}
}
/**
* 温度预警推送
* @param strSrc
* @param title
* @param contont
*/
public void pushNotifcation(String strSrc,String title,String contont){
try {
Device device = deviceService.getUniqueByProperty("macAddress", strSrc);
if(device!=null){
Integer deviceId = device.getDeviceId();
//是否开启预警
String onOff = device.getOnOff();
if(onOff!=null && onOff.equals(BooleanEnum.YES.getKey())){
//宠物笼设备
if (device.getHardwareType().equals("3f")) {
List<DevicePetUser> petUserList = deviceService.getPetListByProperty(deviceId);
for (DevicePetUser deviceUser : petUserList) {
Integer userId = deviceUser.getPriId().getUserId();
String showName = deviceUser.getShowName();
String timestamp = IfishUtil.format(new Date());
String msg = ""+title+"】你的宠物笼“"+showName+"”在"+timestamp+contont;
setPushMsg(title, device, deviceId, userId, showName, msg, PushTypeEnum.wendu_warn);
}
} else {
//绑定设备的用户
List<DeviceUser> list = deviceService.getListByProperty(device.getDeviceId());
for (DeviceUser deviceUser : list) {
Integer userId = deviceUser.getPriId().getUserId();
String showName = deviceUser.getShowName();
String timestamp = IfishUtil.format(new Date());
String msg = ""+title+"】你的水族箱“"+showName+"”在"+timestamp+contont;
setPushMsg(title, device, deviceId, userId, showName, msg, PushTypeEnum.wendu_warn);
}
}
}
}
} catch (Exception e) {
log.error("pushNotifcation error:macAddress:{},error msg:{}",strSrc,e.toString());
}
}
private void setPushMsg(String title, Device device, Integer deviceId, Integer userId, String showName, String msg, PushTypeEnum wendu_warn) throws JMSException {
User user = userService.findById(userId);
if (user != null) {
//推送记录
PushList pushList = new PushList();
//记录推送
pushList.setUserId(userId);
pushList.setDeviceId(deviceId);
pushList.setPhoneType("ALL");
pushList.setShowName(showName);
pushList.setPushType(wendu_warn.getKey());
pushList.setPushTitle(title);
pushList.setPushContext(msg);
pushList.setNumber1(device.getNumber1());
pushList.setNumber2(device.getNumber2());
pushList.setNumber3(device.getNumber3());
pushList.setNumber4(device.getNumber4());
pushList.setNumber5(device.getNumber5());
JSONObject data = JSON.parseObject(JSON.toJSONString(pushList));
QueueEventEntity eventEntity = new QueueEventEntity();
eventEntity.setEventName("com.ifish7.mq.queues.event.PushNotifcationEvent");
eventEntity.setEventProcess("deviceNotifcationPlus");
QueueEventBody eventBody = new QueueEventBody("com.ifish7.mq.business.user.entity.TblPushList", data);
eventEntity.setEventBody(eventBody);
//推送至消息推送队列
sendPushQueueMessage(JSONObject.toJSONString(eventEntity));
}
}
/**
* 推送Push消息队列
* @param json 内容
*/
private void sendPushQueueMessage(final String json) throws JMSException{
jmsTemplate.send(ifish7PushQueueDestination,(Session session) -> session.createTextMessage(json));
}
/*public void pushNotifcation(String strSrc,String title,String contont){
try {
Device device = deviceService.getUniqueByProperty("macAddress", strSrc);
if(device!=null){
Integer deviceId = device.getDeviceId();
//是否开启预警
String onOff = device.getOnOff();
if(onOff!=null && onOff.equals(BooleanEnum.YES.getKey())){
//绑定设备的用户
List<DeviceUser> list = deviceService.getListByProperty(device.getDeviceId());
for (DeviceUser deviceUser : list) {
Integer userId = deviceUser.getPriId().getUserId();
String showName = deviceUser.getShowName();
String timestamp = IfishUtil.format(new Date());
String msg = "【"+title+"】你的水族箱“"+showName+"”在"+timestamp+contont;
User user = userService.findById(userId);
if(user!=null){
//云信推送
boolean neteaseBln = false;
//极光推送
boolean jpushIosBln = false;
boolean jpushAndroidBln = false;
//发送云信消息
Map<String, String> neteaseMap = neteaseIM.sendMsg("ifish", userId.toString(), msg);
if(neteaseMap!=null){
neteaseBln = neteaseMap.get("code").equals(NeteaseEnum.status200.getKey());
}
String loginType = user.getLoginType();
if(loginType!=null){
//推送记录
PushList pushList = new PushList();
Map<String,String> map = new HashMap<String, String>();
map.put("device_id", deviceId.toString());
map.put("device_name", showName);
map.put("timestamp", timestamp);
map.put("msg_type", PushTypeEnum.wendu_warn.getKey());
//记录推送
if(jpushIosBln || jpushAndroidBln || neteaseBln){
pushList.setUserId(userId);
pushList.setDeviceId(deviceId);
pushList.setPhoneType(loginType.toLowerCase());
pushList.setShowName(showName);
pushList.setPushType(PushTypeEnum.wendu_warn.getKey());
pushList.setPushTitle(title);
pushList.setPushContext(msg);
pushList.setNeteaseStatus(neteaseBln?BooleanEnum.YES.getKey():BooleanEnum.NO.getKey());
pushList.setCreateTime(new Date());
pushList.setNumber1(device.getNumber1());
pushList.setNumber2(device.getNumber2());
pushList.setNumber3(device.getNumber3());
pushList.setNumber4(device.getNumber4());
pushList.setNumber5(device.getNumber5());
this.userService.save(pushList);
}
}
}
}
}
}
} catch (Exception e) {
log.error("pushNotifcation error:macAddress:{},error msg:{}",strSrc,e.toString());
}
}*/
}