数据修改推送队列

This commit is contained in:
yiyan 2019-04-10 17:33:03 +08:00
parent a7e8cb7884
commit c5dbae94d8
10 changed files with 428 additions and 13 deletions

32
pom.xml
View File

@ -9,6 +9,26 @@
<url>http://mvnrepository.com</url>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>4.1.6.RELEASE</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.15.4</version>
</dependency>
<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz</artifactId>
@ -103,7 +123,13 @@
<artifactId>httpclient</artifactId>
<version>4.3.5</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.56</version>
</dependency>
<dependency>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
@ -148,8 +174,8 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.6</source>
<target>1.6</target>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>

View File

@ -0,0 +1,37 @@
package com.ifish.entity.event;
import com.alibaba.fastjson.JSONObject;
/**
* @author: yan.y
* @Description:
* @Date: Created in 16:58 2019-04-10
* @Modified by:
*/
public class QueueEventBody {
public QueueEventBody(String entity, JSONObject data) {
this.entity = entity;
this.data = data;
}
private String entity;
private JSONObject data;
public String getEntity() {
return entity;
}
public void setEntity(String entity) {
this.entity = entity;
}
public JSONObject getData() {
return data;
}
public void setData(JSONObject data) {
this.data = data;
}
}

View File

@ -0,0 +1,42 @@
package com.ifish.entity.event;
import com.alibaba.fastjson.JSONObject;
/**
* @author: yan.y
* @Description:
* @Date: Created in 16:46 2019-04-10
* @Modified by:
*/
public class QueueEventEntity {
private String eventProcess;
private String eventName;
private QueueEventBody eventBody;
public String getEventProcess() {
return eventProcess;
}
public void setEventProcess(String eventProcess) {
this.eventProcess = eventProcess;
}
public String getEventName() {
return eventName;
}
public void setEventName(String eventName) {
this.eventName = eventName;
}
public QueueEventBody getEventBody() {
return eventBody;
}
public void setEventBody(QueueEventBody eventBody) {
this.eventBody = eventBody;
}
}

View File

@ -4,7 +4,10 @@ import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import com.alibaba.fastjson.JSONObject;
import com.ifish.entity.*;
import com.ifish.entity.event.QueueEventBody;
import com.ifish.entity.event.QueueEventEntity;
import com.ifish.socketNew.model.receive.*;
import org.apache.mina.core.session.IoSession;
import org.slf4j.Logger;
@ -29,6 +32,10 @@ import com.ifish.socketNew.model.send.OrderFunctionCode9;
import com.ifish.socketNew.util.OrderModel;
import com.ifish.util.ByteUtil;
import com.ifish.util.IfishUtil;
import org.springframework.jms.core.JmsTemplate;
import javax.jms.Destination;
import javax.jms.Session;
/**
@ -44,6 +51,12 @@ public class SomeServer {
private ScheduleJob scheduleJob;
@Autowired
private NeteaseIM neteaseIM;
@Autowired
private JmsTemplate jmsTemplate;
@Autowired
private Destination ifish7DataQueueDestination;
@Autowired
private Destination ifish7PushQueueDestination;
//是否回复心跳
public static boolean isReplay = true;
@ -83,15 +96,31 @@ public class SomeServer {
BackFunctionCodeHeater heater = (BackFunctionCodeHeater) message;
String macAddress = ByteUtil.bytesToHexString(heater.getSrc());
String hour = String.valueOf(Calendar.getInstance().get(Calendar.HOUR_OF_DAY));
List<DeviceHeater> deviceHeaters = deviceService.getDeviceHeaterByProperty(hour, macAddress,IfishUtil.format1(new Date()));
if (deviceHeaters.size() > 0) {
DeviceHeater deviceHeater = deviceHeaters.get(0);
deviceHeater.setHeaterWaterTemperature(String.valueOf(heater.getWaterTemperature()));
deviceHeater.setHeaterPh(String.valueOf(heater.getPh()));
deviceService.update(deviceHeater);
} else {
deviceService.save(heater);
}
// List<DeviceHeater> deviceHeaters = deviceService.getDeviceHeaterByProperty(hour, macAddress,IfishUtil.format1(new Date()));
// if (deviceHeaters.size() > 0) {
// DeviceHeater deviceHeater = deviceHeaters.get(0);
// deviceHeater.setHeaterWaterTemperature(String.valueOf(heater.getWaterTemperature()));
// deviceHeater.setHeaterPh(String.valueOf(heater.getPh()));
// deviceService.update(deviceHeater);
// } else {
// deviceService.save(heater);
// }
JSONObject data = new JSONObject();
data.put("heaterMacAddress",macAddress);
data.put("heaterWaterTemperature",heater.getHeatingTemperature());
data.put("heaterPh",heater.getPh());
data.put("heaterGatheringDate",IfishUtil.format1(new Date()));
data.put("heaterGatheringTime",hour);
QueueEventEntity eventEntity = new QueueEventEntity();
eventEntity.setEventName("com.ifish7.mq.queues.event.IntelligentHeatingRodEvent");
eventEntity.setEventProcess("intelligentHeatingRodSaveOrUpdate");
QueueEventBody eventBody = new QueueEventBody("com.ifish7.mq.business.device.entity.TblDeviceHeater",data);
eventEntity.setEventBody(eventBody);
//智能加热棒数据更新及保存
sendDataQueueMessage(JSONObject.toJSONString(eventEntity));
//设备重新连接上则移除延时推送的任务
JobGroup jobGroup = new JobGroup();
jobGroup.setJobName(macAddress);
@ -151,6 +180,26 @@ public class SomeServer {
jobGroup.setTriggerName(strSrc);
scheduleJob.deleteJob(jobGroup);
try {
//macAddr地址
String stcMac = ByteUtil.bytesToHexString(model.getSrc());
//IP
byte[] log_Ip= receive.getLogin_ip();
StringBuilder ipStr = new StringBuilder();
for (int i = 0; i < log_Ip.length; i++) {
int v = log_Ip[i] & 0xff;
if(i==0){
ipStr.append(v);
}
else{
ipStr.append("."+v);
}
}
Integer version = receive.getVersion() & 0xff;
String factoryCode = ByteUtil.toHex(receive.getVendor());
factoryCode = factoryCode.equals("01")?"0a":factoryCode;
String typeCode = ByteUtil.toHex(receive.getHardware_type());
//更新设备信息
deviceService.update(receive);
// if(device==null){
@ -376,4 +425,20 @@ public class SomeServer {
log.error("pushNotifcation error:macAddress:{},error msg:{}",strSrc,e.toString());
}
}
/**
* 推送Data消息队列
* @param json 内容
*/
private void sendDataQueueMessage(final String json){
jmsTemplate.send(ifish7DataQueueDestination,(Session session) -> session.createTextMessage(json));
}
/**
* 推送Push消息队列
* @param json 内容
*/
private void sendPushQueueMessage(final String json) {
jmsTemplate.send(ifish7PushQueueDestination,(Session session) -> session.createTextMessage(json));
}
}

View File

@ -0,0 +1,11 @@
package com.ifish.util;
/**
* @author: yan.y
* @Description: json工具
* @Date: Created in 16:46 2019-04-10
* @Modified by:
*/
public class QueueMessageJsonUtil {
}

View File

@ -3,11 +3,14 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:amq="http://activemq.apache.org/schema/core"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-4.0.xsd"
http://www.springframework.org/schema/context/spring-context-4.0.xsd
http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core-5.14.1.xsd"
default-lazy-init="true">
<!-- 注册识别注解 -->
@ -15,6 +18,7 @@
<!-- 导入外部的properties文件-->
<context:property-placeholder location="classpath*:jdbc.properties" ignore-unresolvable="true"/>
<context:property-placeholder location="classpath*:mq.properties" ignore-unresolvable="true"/>
<!-- c3p0连接池 -->
<bean id="dataSource" class="com.mchange.v2.c3p0.ComboPooledDataSource">
<property name="driverClass" value="${c3p0.driverClassName}" />
@ -74,4 +78,36 @@
<value>${netease.appSecret}</value>
</constructor-arg>
</bean>
<amq:connectionFactory id="amqConnectionFactory"
brokerURL="${broker_url}"
userName="${username}"
password="${password}" />
<!-- 配置JMS连接工长 -->
<bean id="connectionFactory"
class="org.springframework.jms.connection.CachingConnectionFactory">
<constructor-arg ref="amqConnectionFactory" />
<property name="sessionCacheSize" value="100" />
</bean>
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactory"/>
</bean>
<!-- 定义数据存储消息队列Queue -->
<bean id="ifish7DataQueueDestination" class="org.apache.activemq.command.ActiveMQQueue">
<!-- 设置消息队列的名字 -->
<constructor-arg>
<value>${queue_data_name}</value>
</constructor-arg>
</bean>
<!-- 定义消息推送消息队列Queue -->
<bean id="ifish7PushQueueDestination" class="org.apache.activemq.command.ActiveMQQueue">
<!-- 设置消息队列的名字 -->
<constructor-arg>
<value>${queue_push_name}</value>
</constructor-arg>
</bean>
</beans>

View File

@ -0,0 +1,5 @@
broker_url=tcp://localhost:61616
username=admin
password=admin
queue_data_name=ifishDataMq
queue_push_name=ifishPushMq

View File

@ -0,0 +1,75 @@
package com.ifish.socketNew;
import com.alibaba.fastjson.JSONObject;
import com.ifish.entity.event.QueueEventBody;
import com.ifish.entity.event.QueueEventEntity;
import com.ifish.util.IfishUtil;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import javax.jms.Destination;
import javax.jms.Session;
import java.util.Date;
/**
* @author: yan.y
* @Description: 测试队列消息发送
* @Date: Created in 16:20 2019-04-10
* @Modified by:
*/
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations="classpath:application-context.xml")
public class SomeServerQueueTest {
@Autowired
private JmsTemplate jmsTemplate;
@Autowired
private Destination ifish7DataQueueDestination;
@Autowired
private Destination ifish7PushQueueDestination;
@Test
public void testDataMessageSend(){
String json = "this's a test message";
JSONObject data = new JSONObject();
data.put("heaterMacAddress","12345");
data.put("heaterWaterTemperature","130");
data.put("heaterPh","1200");
data.put("heaterGatheringDate", IfishUtil.format1(new Date()));
data.put("heaterGatheringTime","17");
QueueEventEntity eventEntity = new QueueEventEntity();
eventEntity.setEventName("com.ifish7.mq.queues.event.IntelligentHeatingRodEvent");
eventEntity.setEventProcess("intelligentHeatingRodSaveOrUpdate");
QueueEventBody eventBody = new QueueEventBody("com.ifish7.mq.business.device.entity.TblDeviceHeater",data);
eventEntity.setEventBody(eventBody);
String eventEntityStr = JSONObject.toJSONString(eventEntity);
jmsTemplate.send(ifish7DataQueueDestination,(Session session) -> session.createTextMessage(eventEntityStr));
}
@Test
public void testPushMessageSend(){
String json = "this's a test message";
JSONObject data = new JSONObject();
data.put("heaterMacAddress","12345");
data.put("heaterWaterTemperature","130");
data.put("heaterPh","1200");
data.put("heaterGatheringDate", IfishUtil.format1(new Date()));
data.put("heaterGatheringTime","17");
QueueEventEntity eventEntity = new QueueEventEntity();
eventEntity.setEventName("com.ifish7.mq.queues.event.IntelligentHeatingRodEvent");
eventEntity.setEventProcess("intelligentHeatingRodSaveOrUpdate");
QueueEventBody eventBody = new QueueEventBody("com.ifish7.mq.business.device.entity.TblDeviceHeater",data);
eventEntity.setEventBody(eventBody);
String eventEntityStr = JSONObject.toJSONString(eventEntity);
//智能加热棒数据更新及保存
jmsTemplate.send(ifish7PushQueueDestination,(Session session) -> session.createTextMessage(eventEntityStr));
}
}

View File

@ -0,0 +1,113 @@
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:amq="http://activemq.apache.org/schema/core"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-4.0.xsd
http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core-5.14.1.xsd"
default-lazy-init="true">
<!-- 注册识别注解 -->
<context:component-scan base-package="com.ifish.daoImpl,com.ifish.serviceImpl"/>
<!-- 导入外部的properties文件-->
<context:property-placeholder location="classpath*:jdbc.properties" ignore-unresolvable="true"/>
<context:property-placeholder location="classpath*:mq.properties" ignore-unresolvable="true"/>
<!-- c3p0连接池 -->
<bean id="dataSource" class="com.mchange.v2.c3p0.ComboPooledDataSource">
<property name="driverClass" value="${c3p0.driverClassName}" />
<property name="jdbcUrl" value="${c3p0.url}" />
<property name="user" value="${c3p0.username}" />
<property name="password" value="${c3p0.password}" />
<property name="autoCommitOnClose" value="${c3p0.autoCommitOnClose}" />
<property name="checkoutTimeout" value="${c3p0.checkoutTimeout}" />
<property name="initialPoolSize" value="${c3p0.initialPoolSize}" />
<property name="minPoolSize" value="${c3p0.minPoolSize}" />
<property name="maxPoolSize" value="${c3p0.maxPoolSize}" />
<property name="maxIdleTime" value="${c3p0.maxIdleTime}" />
<property name="acquireIncrement" value="${c3p0.acquireIncrement}" />
<property name="idleConnectionTestPeriod" value="${c3p0.idleConnectionTestPeriod}" />
</bean>
<!-- sessionFactory工厂 -->
<bean id="sessionFactory" class="org.springframework.orm.hibernate4.LocalSessionFactoryBean">
<property name="dataSource" ref="dataSource" />
<property name="mappingLocations">
<list>
<value>classpath:/com/ifish/entity/hbm/*.hbm.xml</value>
</list>
</property>
<property name="hibernateProperties">
<props>
<prop key="hibernate.dialect">${hibernate.dialect}</prop>
<prop key="hibernate.show_sql">${hibernate.show_sql}</prop>
<prop key="hibernate.format_sql">${hibernate.format_sql}</prop>
<prop key="hibernate.hbm2ddl.auto">${hibernate.hbm2ddl.auto}</prop>
<prop key="hibernate.query.substitutions">${hibernate.query.substitutions}</prop>
<prop key="hibernate.jdbc.batch_size">${"hibernate.jdbc.batch_size"}</prop>
<prop key="hibernate.cache.use_second_level_cache">${hibernate.cache.use_second_level_cache}</prop>
<prop key="hibernate.cache.use_query_cache">${hibernate.cache.use_query_cache}</prop>
<prop key="hibernate.cache.region.factory_class">${hibernate.cache.region.factory_class}</prop>
</props>
</property>
</bean>
<!-- 开启事务注解 -->
<tx:annotation-driven transaction-manager="txManager" />
<!-- Hibernate事务管理器 -->
<bean id="txManager" class="org.springframework.orm.hibernate4.HibernateTransactionManager">
<property name="sessionFactory" ref="sessionFactory" />
</bean>
<!-- 导入外部的properties文件-->
<context:property-placeholder location="classpath*:jPpush.properties" ignore-unresolvable="true"/>
<!-- 云信 -->
<bean id="neteaseIM" class="com.ifish.netease.NeteaseIM">
<constructor-arg index="0">
<!-- appKey -->
<value>${netease.appKey}</value>
</constructor-arg>
<constructor-arg index="1">
<!-- appSecret -->
<value>${netease.appSecret}</value>
</constructor-arg>
</bean>
<amq:connectionFactory id="amqConnectionFactory"
brokerURL="${broker_url}"
userName="${username}"
password="${password}" />
<!-- 配置JMS连接工长 -->
<bean id="connectionFactory"
class="org.springframework.jms.connection.CachingConnectionFactory">
<constructor-arg ref="amqConnectionFactory" />
<property name="sessionCacheSize" value="100" />
</bean>
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactory"/>
</bean>
<!-- 定义数据存储消息队列Queue -->
<bean id="ifish7DataQueueDestination" class="org.apache.activemq.command.ActiveMQQueue">
<!-- 设置消息队列的名字 -->
<constructor-arg>
<value>${queue_data_name}</value>
</constructor-arg>
</bean>
<!-- 定义消息推送消息队列Queue -->
<bean id="ifish7PushQueueDestination" class="org.apache.activemq.command.ActiveMQQueue">
<!-- 设置消息队列的名字 -->
<constructor-arg>
<value>${queue_push_name}</value>
</constructor-arg>
</bean>
</beans>

View File

@ -0,0 +1,5 @@
broker_url=tcp://localhost:61616
username=admin
password=admin
queue_data_name=ifishDataMq
queue_push_name=ifishPushMq