推送优化

This commit is contained in:
yiyan 2019-06-01 19:53:43 +08:00
parent b32745c772
commit 74ea32f3a0
8 changed files with 224 additions and 63 deletions

21
pom.xml
View File

@ -22,12 +22,11 @@
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<!--<dependency>
<groupId>org.apache.activemq</groupId> <groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId> <artifactId>activemq-all</artifactId>
<version>5.15.4</version> <version>5.15.4</version>
</dependency>--> </dependency>
<dependency> <dependency>
<groupId>org.quartz-scheduler</groupId> <groupId>org.quartz-scheduler</groupId>
@ -125,9 +124,15 @@
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.alibaba</groupId> <groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId> <artifactId>fastjson</artifactId>
<version>1.2.56</version> <version>1.2.56</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.6.2</version>
</dependency> </dependency>
<dependency> <dependency>
@ -174,8 +179,8 @@
<groupId>org.apache.maven.plugins</groupId> <groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId> <artifactId>maven-compiler-plugin</artifactId>
<configuration> <configuration>
<source>1.7</source> <source>1.8</source>
<target>1.7</target> <target>1.8</target>
</configuration> </configuration>
</plugin> </plugin>
</plugins> </plugins>

View File

@ -1,8 +1,10 @@
package com.ifish.quartz; package com.ifish.quartz;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.ifish.entity.*; import com.ifish.entity.*;
import com.ifish.enums.BooleanEnum; import com.ifish.entity.event.QueueEventBody;
import com.ifish.enums.NeteaseEnum; import com.ifish.entity.event.QueueEventEntity;
import com.ifish.enums.PushTypeEnum; import com.ifish.enums.PushTypeEnum;
import com.ifish.netease.NeteaseIM; import com.ifish.netease.NeteaseIM;
import com.ifish.service.DeviceService; import com.ifish.service.DeviceService;
@ -16,11 +18,12 @@ import org.quartz.JobDataMap;
import org.quartz.JobExecutionContext; import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException; import org.quartz.JobExecutionException;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import java.util.Date; import javax.jms.Destination;
import java.util.HashMap; import javax.jms.JMSException;
import javax.jms.Session;
import java.util.List; import java.util.List;
import java.util.Map;
public class ExecuteJob implements Job { public class ExecuteJob implements Job {
@ -31,8 +34,86 @@ public class ExecuteJob implements Job {
private UserService userService; private UserService userService;
@Autowired @Autowired
private DeviceService deviceService; private DeviceService deviceService;
@Autowired
private JmsTemplate jmsTemplate;
@Autowired
private Destination ifish7PushQueueDestination;
public void execute(JobExecutionContext context) throws JobExecutionException { public void execute(JobExecutionContext context) throws JobExecutionException {
//任务传递的参数
JobDataMap dataMap = context.getJobDetail().getJobDataMap();
String macAddress = dataMap.getString("macAddress");
String timestamp = dataMap.getString("timestamp");
try {
log.info("离线时间 --> " + timestamp);
log.info("离线设备 --> " + macAddress);
//移除为null的连接
IoSession session_cz = SomeServer.sessions_cz.get(macAddress);
if(session_cz==null || session_cz.getRemoteAddress()==null){
SomeServer.sessions_cz.remove(macAddress);
}
//设备是否存在
Device device = deviceService.getUniqueByProperty("macAddress", macAddress);
if(device!=null){
//保存离线的设备信息
DeviceOffline deviceOffline = new DeviceOffline();
deviceOffline.setOfflineMacAddress(device.getMacAddress());
deviceOffline.setOfflineFactoryCode(device.getFactoryCode());
deviceOffline.setOfflineDeviceType(device.getHardwareType());
deviceOffline.setOfflineSdkVersion(String.valueOf(device.getSdkVersion()));
deviceOffline.setOfflineTime(IfishUtil.StrToDate1(timestamp));
deviceService.saveDeviceOffline(deviceOffline);
log.info("离线设备信息保存++++++++++++++++++++++++++++++++++++++++++");
List<DeviceUser> list = deviceService.getListByProperty(device.getDeviceId());
//是否被绑定
if(list!=null){
for (DeviceUser deviceUser : list) {
Integer userId = deviceUser.getPriId().getUserId();
String showName = deviceUser.getShowName();
String title = PushTypeEnum.offline_push.getValue();
String msg = ""+title+"】你的水族箱“"+showName+"”于"+timestamp+"离线,请及时查看!";
User user = userService.findById(userId);
if(user!=null){
//推送记录
PushList pushList = new PushList();
//记录推送
pushList.setUserId(userId);
pushList.setDeviceId(deviceUser.getPriId().getDeviceId());
pushList.setPhoneType("ALL");
pushList.setShowName(showName);
pushList.setPushType(PushTypeEnum.offline_push.getKey());
pushList.setPushTitle(title);
pushList.setPushContext(msg);
pushList.setNumber1(device.getNumber1());
pushList.setNumber2(device.getNumber2());
pushList.setNumber3(device.getNumber3());
pushList.setNumber4(device.getNumber4());
pushList.setNumber5(device.getNumber5());
JSONObject data = JSON.parseObject(JSON.toJSONString(pushList));
QueueEventEntity eventEntity = new QueueEventEntity();
eventEntity.setEventName("com.ifish7.mq.queues.event.PushNotifcationEvent");
eventEntity.setEventProcess("deviceNotifcationPlus");
QueueEventBody eventBody = new QueueEventBody("com.ifish7.mq.business.user.entity.TblPushList",data);
eventEntity.setEventBody(eventBody);
//推送至消息推送队列
sendPushQueueMessage(JSONObject.toJSONString(eventEntity));
}
}
}
}
} catch (Exception e) {
log.error(e.getMessage(),e);
}
}
/**
* 推送Push消息队列
* @param json 内容
*/
private void sendPushQueueMessage(final String json) throws JMSException {
jmsTemplate.send(ifish7PushQueueDestination,(Session session) -> session.createTextMessage(json));
}
/*public void execute(JobExecutionContext context) throws JobExecutionException {
//任务传递的参数 //任务传递的参数
JobDataMap dataMap = context.getJobDetail().getJobDataMap(); JobDataMap dataMap = context.getJobDetail().getJobDataMap();
String macAddress = dataMap.getString("macAddress"); String macAddress = dataMap.getString("macAddress");
@ -113,5 +194,6 @@ public class ExecuteJob implements Job {
} catch (Exception e) { } catch (Exception e) {
log.error(e.getMessage(),e); log.error(e.getMessage(),e);
} }
} }*/
} }

View File

@ -1,15 +1,7 @@
package com.ifish.quartz; package com.ifish.quartz;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.quartz.JobBuilder; import org.quartz.*;
import org.quartz.JobDetail;
import org.quartz.JobKey;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.SchedulerFactory;
import org.quartz.Trigger;
import org.quartz.TriggerBuilder;
import org.quartz.TriggerKey;
import org.quartz.impl.StdSchedulerFactory; import org.quartz.impl.StdSchedulerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
@ -35,7 +27,7 @@ public class ScheduleJob {
scheduler.setJobFactory(jobFactory); scheduler.setJobFactory(jobFactory);
scheduler.start(); scheduler.start();
} catch (SchedulerException e) { } catch (SchedulerException e) {
log.error(String.format("start scheduler error:{%s}",e.toString())); log.info(String.format("start scheduler error:{%s}",e.toString()));
} }
} }
/** /**
@ -60,7 +52,7 @@ public class ScheduleJob {
//添加触发器 //添加触发器
scheduler.scheduleJob(jobDetail,trigger); scheduler.scheduleJob(jobDetail,trigger);
} catch (SchedulerException e) { } catch (SchedulerException e) {
log.error(String.format("addJob error【macAddress:{%s},jobGroup:{%s},triggerGroup:{%s},errMsg:{%s}】",jobGroup.getMacAddress(),jobGroup.getJobName(),jobGroup.getTriggerGroup(),e.toString())); log.info(String.format("addJob error【macAddress:{%s},jobGroup:{%s},triggerGroup:{%s},errMsg:{%s}】",jobGroup.getMacAddress(),jobGroup.getJobName(),jobGroup.getTriggerGroup(),e.toString()));
// log.error("addJob error【macAddress:{},jobGroup:{},triggerGroup:{},errMsg:{}】",jobGroup.getMacAddress(),jobGroup.getJobName(),jobGroup.getTriggerGroup(),e.toString()); // log.error("addJob error【macAddress:{},jobGroup:{},triggerGroup:{},errMsg:{}】",jobGroup.getMacAddress(),jobGroup.getJobName(),jobGroup.getTriggerGroup(),e.toString());
} }
} }
@ -83,7 +75,7 @@ public class ScheduleJob {
//删除任务 //删除任务
scheduler.deleteJob(jobKey); scheduler.deleteJob(jobKey);
} catch (SchedulerException e) { } catch (SchedulerException e) {
log.error(String.format("deleteJob error【macAddress:{%s},jobGroup:{%s},triggerGroup:{%s},errMsg:{%s}】",jobGroup.getMacAddress(),jobGroup.getJobName(),jobGroup.getTriggerGroup(),e.toString())); log.info(String.format("deleteJob error【macAddress:{%s},jobGroup:{%s},triggerGroup:{%s},errMsg:{%s}】",jobGroup.getMacAddress(),jobGroup.getJobName(),jobGroup.getTriggerGroup(),e.toString()));
} }
} }
@ -95,7 +87,7 @@ public class ScheduleJob {
//关闭任务 //关闭任务
scheduler.shutdown(); scheduler.shutdown();
} catch (SchedulerException e) { } catch (SchedulerException e) {
log.error(String.format("shutdown scheduler error:{%s}",e.toString())); log.info(String.format("shutdown scheduler error:{%s}",e.toString()));
} }
} }

View File

@ -90,8 +90,8 @@ public class MinaServerHandler extends IoHandlerAdapter {
//离线时间 //离线时间
jobGroup.setTimestamp(IfishUtil.format(new Date())); jobGroup.setTimestamp(IfishUtil.format(new Date()));
//10分钟后推送 update 30分钟 //10分钟后推送 update 30分钟
jobGroup.setStartTime(new Date(System.currentTimeMillis() + 600000L * 3)); // jobGroup.setStartTime(new Date(System.currentTimeMillis() + 600000L * 3));
// jobGroup.setStartTime(new Date(System.currentTimeMillis() + 6000)); jobGroup.setStartTime(new Date(System.currentTimeMillis() + 60000L));
scheduleJob.addJob(jobGroup); scheduleJob.addJob(jobGroup);
} }

View File

@ -1,8 +1,11 @@
package com.ifish.socketNew; package com.ifish.socketNew;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.ifish.entity.*; import com.ifish.entity.*;
import com.ifish.entity.event.QueueEventBody;
import com.ifish.entity.event.QueueEventEntity;
import com.ifish.enums.BooleanEnum; import com.ifish.enums.BooleanEnum;
import com.ifish.enums.NeteaseEnum;
import com.ifish.enums.PushTypeEnum; import com.ifish.enums.PushTypeEnum;
import com.ifish.netease.NeteaseIM; import com.ifish.netease.NeteaseIM;
import com.ifish.quartz.JobGroup; import com.ifish.quartz.JobGroup;
@ -22,8 +25,16 @@ import org.apache.mina.core.session.IoSession;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import java.util.*; import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import java.util.Calendar;
import java.util.Date;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.CopyOnWriteArraySet;
@ -41,6 +52,10 @@ public class SomeServer {
private ScheduleJob scheduleJob; private ScheduleJob scheduleJob;
@Autowired @Autowired
private NeteaseIM neteaseIM; private NeteaseIM neteaseIM;
@Autowired
private JmsTemplate jmsTemplate;
@Autowired
private Destination ifish7PushQueueDestination;
//是否回复心跳 //是否回复心跳
public static boolean isReplay = true; public static boolean isReplay = true;
@ -305,6 +320,12 @@ public class SomeServer {
session.write(model); session.write(model);
} }
} }
/**
* 温度预警推送
* @param strSrc
* @param title
* @param contont
*/
/** /**
* 温度预警推送 * 温度预警推送
* @param strSrc * @param strSrc
@ -312,6 +333,63 @@ public class SomeServer {
* @param contont * @param contont
*/ */
public void pushNotifcation(String strSrc,String title,String contont){ public void pushNotifcation(String strSrc,String title,String contont){
try {
Device device = deviceService.getUniqueByProperty("macAddress", strSrc);
if(device!=null){
Integer deviceId = device.getDeviceId();
//是否开启预警
String onOff = device.getOnOff();
if(onOff!=null && onOff.equals(BooleanEnum.YES.getKey())){
//绑定设备的用户
List<DeviceUser> list = deviceService.getListByProperty(device.getDeviceId());
for (DeviceUser deviceUser : list) {
Integer userId = deviceUser.getPriId().getUserId();
String showName = deviceUser.getShowName();
String timestamp = IfishUtil.format(new Date());
String msg = ""+title+"】你的水族箱“"+showName+"”在"+timestamp+contont;
User user = userService.findById(userId);
if(user!=null){
//推送记录
PushList pushList = new PushList();
//记录推送
pushList.setUserId(userId);
pushList.setDeviceId(deviceId);
pushList.setPhoneType("ALL");
pushList.setShowName(showName);
pushList.setPushType(PushTypeEnum.wendu_warn.getKey());
pushList.setPushTitle(title);
pushList.setPushContext(msg);
pushList.setNumber1(device.getNumber1());
pushList.setNumber2(device.getNumber2());
pushList.setNumber3(device.getNumber3());
pushList.setNumber4(device.getNumber4());
pushList.setNumber5(device.getNumber5());
JSONObject data = JSON.parseObject(JSON.toJSONString(pushList));
QueueEventEntity eventEntity = new QueueEventEntity();
eventEntity.setEventName("com.ifish7.mq.queues.event.PushNotifcationEvent");
eventEntity.setEventProcess("deviceNotifcationPlus");
QueueEventBody eventBody = new QueueEventBody("com.ifish7.mq.business.user.entity.TblPushList",data);
eventEntity.setEventBody(eventBody);
//推送至消息推送队列
sendPushQueueMessage(JSONObject.toJSONString(eventEntity));
}
}
}
}
} catch (Exception e) {
log.error("pushNotifcation error:macAddress:{},error msg:{}",strSrc,e.toString());
}
}
/**
* 推送Push消息队列
* @param json 内容
*/
private void sendPushQueueMessage(final String json) throws JMSException{
jmsTemplate.send(ifish7PushQueueDestination,(Session session) -> session.createTextMessage(json));
}
/*public void pushNotifcation(String strSrc,String title,String contont){
try { try {
Device device = deviceService.getUniqueByProperty("macAddress", strSrc); Device device = deviceService.getUniqueByProperty("macAddress", strSrc);
if(device!=null){ if(device!=null){
@ -374,5 +452,5 @@ public class SomeServer {
} catch (Exception e) { } catch (Exception e) {
log.error("pushNotifcation error:macAddress:{},error msg:{}",strSrc,e.toString()); log.error("pushNotifcation error:macAddress:{},error msg:{}",strSrc,e.toString());
} }
} }*/
} }

View File

@ -1,11 +0,0 @@
package com.ifish.util;
/**
* @author: yan.y
* @Description: json工具
* @Date: Created in 16:46 2019-04-10
* @Modified by:
*/
public class QueueMessageJsonUtil {
}

View File

@ -3,11 +3,14 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:tx="http://www.springframework.org/schema/tx" xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:context="http://www.springframework.org/schema/context" xmlns:context="http://www.springframework.org/schema/context"
xmlns:amq="http://activemq.apache.org/schema/core"
xsi:schemaLocation="http://www.springframework.org/schema/beans xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.0.xsd http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.0.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-4.0.xsd" http://www.springframework.org/schema/context/spring-context-4.0.xsd
http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core-5.14.1.xsd"
default-lazy-init="true"> default-lazy-init="true">
<!-- 注册识别注解 --> <!-- 注册识别注解 -->
@ -76,33 +79,43 @@
</constructor-arg> </constructor-arg>
</bean> </bean>
<!--<amq:connectionFactory id="amqConnectionFactory" <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory" >
brokerURL="${broker_url}" <property name="brokerURL" value="${broker_url}" />
userName="${username}" <property name="userName" value="${username}" />
password="${password}" />--> <property name="password" value="${password}" />
<!-- 配置JMS连接工长 --> </bean>
<!--<bean id="connectionFactory"
class="org.springframework.jms.connection.CachingConnectionFactory">
<constructor-arg ref="amqConnectionFactory" />
<property name="sessionCacheSize" value="100" />
</bean>-->
<!--<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
<property name="connectionFactory" ref="connectionFactory"/> destroy-method="stop">
</bean>--> <property name="connectionFactory" ref="connectionFactory"/>
<property name="maxConnections" value="100"></property>
</bean>
<!--使用缓存可以提升效率-->
<bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
<property name="targetConnectionFactory" ref="jmsFactory"/>
<property name="sessionCacheSize" value="1"/>
</bean>
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="cachingConnectionFactory"/>
<property name="messageConverter">
<bean class="org.springframework.jms.support.converter.SimpleMessageConverter"/>
</property>
</bean>
<!-- 定义数据存储消息队列Queue --> <!-- 定义数据存储消息队列Queue -->
<!--<bean id="ifish7DataQueueDestination" class="org.apache.activemq.command.ActiveMQQueue"> <bean id="ifish7DataQueueDestination" class="org.apache.activemq.command.ActiveMQQueue">
<!-- 设置消息队列的名字 -->
<constructor-arg> <constructor-arg>
<value>${queue_data_name}</value> <value>${queue_data_name}</value>
</constructor-arg> </constructor-arg>
</bean>--> </bean>
<!-- 定义消息推送消息队列Queue --> <!-- 定义消息推送消息队列Queue -->
<!--<bean id="ifish7PushQueueDestination" class="org.apache.activemq.command.ActiveMQQueue"> <bean id="ifish7PushQueueDestination" class="org.apache.activemq.command.ActiveMQQueue">
<!-- 设置消息队列的名字 -->
<constructor-arg> <constructor-arg>
<value>${queue_push_name}</value> <value>${queue_push_name}</value>
</constructor-arg> </constructor-arg>
</bean>--> </bean>
</beans> </beans>

View File

@ -1,5 +1,7 @@
broker_url=tcp://localhost:61616 #broker_url=tcp://test.ifish7.com:61616
broker_url=tcp://www.ifish7.com:61616
username=admin username=admin
password=admin #password=admin
password=adminifish7
queue_data_name=ifishDataMq queue_data_name=ifishDataMq
queue_push_name=ifishPushMq queue_push_name=ifishPushMq