mqtt监听

This commit is contained in:
yiyan 2019-04-09 00:46:23 +08:00
parent 6ce2d80599
commit 936dc784f4
14 changed files with 189 additions and 105 deletions

View File

@ -27,13 +27,23 @@
<sourceFolder url="file://$MODULE_DIR$/src/main/java" isTestSource="false" /> <sourceFolder url="file://$MODULE_DIR$/src/main/java" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/src/test/java" isTestSource="true" /> <sourceFolder url="file://$MODULE_DIR$/src/test/java" isTestSource="true" />
<sourceFolder url="file://$MODULE_DIR$/src/test/resources" type="java-test-resource" /> <sourceFolder url="file://$MODULE_DIR$/src/test/resources" type="java-test-resource" />
<excludeFolder url="file://$MODULE_DIR$/src/main/webapp/WEB-INF/classes" />
<excludeFolder url="file://$MODULE_DIR$/target" /> <excludeFolder url="file://$MODULE_DIR$/target" />
</content> </content>
<orderEntry type="inheritedJdk" /> <orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" /> <orderEntry type="sourceFolder" forTests="false" />
<orderEntry type="library" scope="TEST" name="Maven: junit:junit:4.12" level="project" /> <orderEntry type="library" scope="TEST" name="Maven: junit:junit:4.12" level="project" />
<orderEntry type="library" scope="TEST" name="Maven: org.hamcrest:hamcrest-core:1.3" level="project" /> <orderEntry type="library" scope="TEST" name="Maven: org.hamcrest:hamcrest-core:1.3" level="project" />
<orderEntry type="library" name="Maven: com.aliyun:aliyun-java-sdk-push:3.10.1" level="project" />
<orderEntry type="library" name="Maven: com.aliyun:aliyun-java-sdk-core:4.4.0" level="project" />
<orderEntry type="library" name="Maven: com.google.code.gson:gson:2.8.5" level="project" />
<orderEntry type="library" name="Maven: org.apache.httpcomponents:httpcore:4.4.11" level="project" />
<orderEntry type="library" name="Maven: commons-logging:commons-logging:1.2" level="project" />
<orderEntry type="library" name="Maven: javax.xml.bind:jaxb-api:2.1" level="project" />
<orderEntry type="library" name="Maven: javax.xml.stream:stax-api:1.0-2" level="project" />
<orderEntry type="library" name="Maven: javax.activation:activation:1.1" level="project" />
<orderEntry type="library" name="Maven: org.jacoco:org.jacoco.agent:runtime:0.8.3" level="project" />
<orderEntry type="library" name="Maven: org.ini4j:ini4j:0.5.4" level="project" />
<orderEntry type="library" name="Maven: org.slf4j:slf4j-api:1.7.26" level="project" />
<orderEntry type="library" name="Maven: org.apache.activemq:activemq-all:5.15.4" level="project" /> <orderEntry type="library" name="Maven: org.apache.activemq:activemq-all:5.15.4" level="project" />
<orderEntry type="library" name="Maven: org.springframework:spring-core:5.0.8.RELEASE" level="project" /> <orderEntry type="library" name="Maven: org.springframework:spring-core:5.0.8.RELEASE" level="project" />
<orderEntry type="library" name="Maven: org.springframework:spring-jcl:5.0.8.RELEASE" level="project" /> <orderEntry type="library" name="Maven: org.springframework:spring-jcl:5.0.8.RELEASE" level="project" />
@ -49,18 +59,17 @@
<orderEntry type="library" name="Maven: org.springframework:spring-jms:5.0.8.RELEASE" level="project" /> <orderEntry type="library" name="Maven: org.springframework:spring-jms:5.0.8.RELEASE" level="project" />
<orderEntry type="library" name="Maven: org.springframework:spring-messaging:5.0.8.RELEASE" level="project" /> <orderEntry type="library" name="Maven: org.springframework:spring-messaging:5.0.8.RELEASE" level="project" />
<orderEntry type="library" name="Maven: org.apache.xbean:xbean-spring:3.16" level="project" /> <orderEntry type="library" name="Maven: org.apache.xbean:xbean-spring:3.16" level="project" />
<orderEntry type="library" name="Maven: commons-logging:commons-logging:1.0.3" level="project" />
<orderEntry type="library" scope="TEST" name="Maven: org.springframework:spring-test:5.0.8.RELEASE" level="project" /> <orderEntry type="library" scope="TEST" name="Maven: org.springframework:spring-test:5.0.8.RELEASE" level="project" />
<orderEntry type="library" name="Maven: org.mybatis:mybatis:3.4.5" level="project" /> <orderEntry type="library" name="Maven: org.mybatis:mybatis:3.4.5" level="project" />
<orderEntry type="library" name="Maven: org.mybatis:mybatis-spring:1.3.2" level="project" /> <orderEntry type="library" name="Maven: org.mybatis:mybatis-spring:1.3.2" level="project" />
<orderEntry type="library" name="Maven: com.alibaba:druid:1.1.6" level="project" /> <orderEntry type="library" name="Maven: com.alibaba:druid:1.1.6" level="project" />
<orderEntry type="library" name="Maven: org.aspectj:aspectjweaver:1.8.4" level="project" /> <orderEntry type="library" name="Maven: org.aspectj:aspectjweaver:1.8.4" level="project" />
<orderEntry type="library" name="Maven: log4j:log4j:1.2.17" level="project" /> <orderEntry type="library" name="Maven: log4j:log4j:1.2.17" level="project" />
<orderEntry type="library" name="Maven: org.projectlombok:lombok:1.18.6" level="project" />
<orderEntry type="library" name="Maven: org.apache.httpcomponents:httpclient:4.2" level="project" /> <orderEntry type="library" name="Maven: org.apache.httpcomponents:httpclient:4.2" level="project" />
<orderEntry type="library" name="Maven: org.apache.httpcomponents:httpcore:4.2" level="project" />
<orderEntry type="library" name="Maven: commons-codec:commons-codec:1.6" level="project" /> <orderEntry type="library" name="Maven: commons-codec:commons-codec:1.6" level="project" />
<orderEntry type="library" name="Maven: org.apache.commons:commons-lang3:3.1" level="project" /> <orderEntry type="library" name="Maven: org.apache.commons:commons-lang3:3.1" level="project" />
<orderEntry type="library" name="Maven: commons-httpclient:commons-httpclient:3.1" level="project" /> <orderEntry type="library" name="Maven: commons-httpclient:commons-httpclient:3.1" level="project" />
<orderEntry type="library" name="Maven: com.google.code.gson:gson:2.3" level="project" /> <orderEntry type="library" name="Maven: com.alibaba:fastjson:1.2.56" level="project" />
</component> </component>
</module> </module>

15
pom.xml
View File

@ -5,7 +5,7 @@
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<groupId>com.ifish7</groupId> <groupId>com.ifish7</groupId>
<artifactId>ifish7.ifishMQ</artifactId> <artifactId>ifishMQ</artifactId>
<version>1.0.0-SNAPSHOT</version> <version>1.0.0-SNAPSHOT</version>
<packaging>war</packaging> <packaging>war</packaging>
@ -32,6 +32,19 @@
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<!-- 阿里云推送 -->
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>aliyun-java-sdk-push</artifactId>
<version>3.10.1</version>
</dependency>
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>aliyun-java-sdk-core</artifactId>
<version>[4.3.2,5.0.0)</version>
</dependency>
<!-- Active MQ --> <!-- Active MQ -->
<dependency> <dependency>
<groupId>org.apache.activemq</groupId> <groupId>org.apache.activemq</groupId>

View File

@ -1,10 +0,0 @@
package com.ifish7.mq;
/**
* @author: yan.y
* @Description:
* @Date: Created in 14:33 2018/11/18
* @Modified by:
*/
public class Main {
}

View File

@ -1,36 +0,0 @@
package com.ifish7.mq.activemq;
import javax.annotation.Resource;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.TextMessage;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.List;
@Service
public class ConsumerService {
@Resource(name="jmsTemplate")
private JmsTemplate jmsTemplate;
/**
* 接收消息
* @param destination
* @return
*/
public TextMessage receive(Destination destination) {
TextMessage tm=(TextMessage)jmsTemplate.receive(destination);
List list = new ArrayList<>();
try {
System.out.println("从队列--"+destination.toString()+"--接收到消息---"+tm.getText());
} catch (JMSException e) {
e.printStackTrace();
}
return tm;
}
}

View File

@ -1,24 +0,0 @@
package com.ifish7.mq.activemq.listener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
import org.apache.activemq.store.journal.JournalTransactionStore.Tx;
public class QueueMessageListener implements MessageListener{
@Override
public void onMessage(Message message) {
TextMessage tm = (TextMessage) message;
try {
System.out.println("收到的消息-----"+tm.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}

View File

@ -0,0 +1,46 @@
package com.ifish7.mq.mqtt.client;
import lombok.extern.log4j.Log4j;
import org.springframework.jms.listener.SessionAwareMessageListener;
import javax.jms.*;
/**
* @author: yan.y
* @Description: 客户端(android,IOS) mqtt消息监听
* @Date: Created in 23:21 2019/4/8
*/
@Log4j
public class ClientMqttMessageListener implements SessionAwareMessageListener<BytesMessage> {
@Override
public void onMessage(BytesMessage message, Session session) {
try {
Destination destination = message.getJMSDestination();
String topic = destination.toString();
long length = message.getBodyLength();
byte[] b = new byte[(int) length];
message.readBytes(b);
String s = new String(b);
log.info("Mqtt - message : " + s);
log.info("Mqtt - Topic : " + topic);
} catch (JMSException e) {
log.error(e.getMessage(),e);
}
}
public String bytesToHexString(byte[] src){
StringBuilder stringBuilder = new StringBuilder();
if (src == null || src.length <= 0) {
return null;
}
for (int i = 0; i < src.length; i++) {
int v = src[i] & 0xFF;
String hv = Integer.toHexString(v);
if (hv.length() < 2) {
stringBuilder.append(0);
}
stringBuilder.append(hv);
}
return stringBuilder.toString();
}
}

View File

@ -0,0 +1,31 @@
package com.ifish7.mq.queues.listener;
import lombok.extern.log4j.Log4j;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
/**
* @author: yan.y
* @Description: 数据推送队列监听
* @Date: Created in 21:24 2019/4/8
*/
@Log4j
public class IfishDataQueueMessageListener implements MessageListener{
@Override
public void onMessage(Message message) {
TextMessage tm = (TextMessage) message;
try {
log.info("Ifish7 - Data : " + tm.getText());
} catch (JMSException e) {
log.error(e);
}
}
}

View File

@ -0,0 +1,27 @@
package com.ifish7.mq.queues.listener;
import lombok.extern.log4j.Log4j;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
/**
* @author: yan.y
* @Description: 消息推送队列监听
* @Date: Created in 21:24 2019/4/8
*/
@Log4j
public class IfishPushQueueMessageListener implements MessageListener {
@Override
public void onMessage(Message message) {
TextMessage tm = (TextMessage) message;
try {
log.info("Ifish7 - Push : " + tm.getText());
} catch (JMSException e) {
log.error(e);
}
}
}

View File

@ -4,15 +4,15 @@ log4j.appender.Console=org.apache.log4j.ConsoleAppender
log4j.appender.Console.Target=System.out log4j.appender.Console.Target=System.out
#\u53EF\u4EE5\u7075\u6D3B\u5730\u6307\u5B9A\u65E5\u5FD7\u8F93\u51FA\u683C\u5F0F\uFF0C\u4E0B\u9762\u4E00\u884C\u662F\u6307\u5B9A\u5177\u4F53\u7684\u683C\u5F0F #\u53EF\u4EE5\u7075\u6D3B\u5730\u6307\u5B9A\u65E5\u5FD7\u8F93\u51FA\u683C\u5F0F\uFF0C\u4E0B\u9762\u4E00\u884C\u662F\u6307\u5B9A\u5177\u4F53\u7684\u683C\u5F0F
log4j.appender.Console.layout = org.apache.log4j.PatternLayout log4j.appender.Console.layout = org.apache.log4j.PatternLayout
log4j.appender.Console.layout.ConversionPattern=[%c] - %m%n log4j.appender.Console.layout.ConversionPattern=[ifishMQ][%c] - %m%n
#\u6587\u4EF6\u5927\u5C0F\u5230\u8FBE\u6307\u5B9A\u5C3A\u5BF8\u7684\u65F6\u5019\u4EA7\u751F\u4E00\u4E2A\u65B0\u7684\u6587\u4EF6 #\u6587\u4EF6\u5927\u5C0F\u5230\u8FBE\u6307\u5B9A\u5C3A\u5BF8\u7684\u65F6\u5019\u4EA7\u751F\u4E00\u4E2A\u65B0\u7684\u6587\u4EF6
log4j.appender.File = org.apache.log4j.RollingFileAppender log4j.appender.File = org.apache.log4j.RollingFileAppender
#\u6307\u5B9A\u8F93\u51FA\u76EE\u5F55 #\u6307\u5B9A\u8F93\u51FA\u76EE\u5F55
log4j.appender.File.File = logs/dnkx.log log4j.appender.File.File = logs/ifishMQ.log
#\u5B9A\u4E49\u6587\u4EF6\u6700\u5927\u5927\u5C0F #\u5B9A\u4E49\u6587\u4EF6\u6700\u5927\u5927\u5C0F
log4j.appender.File.MaxFileSize = 10MB log4j.appender.File.MaxFileSize = 100
#\u8F93\u51FA\u6240\u4EE5\u65E5\u5FD7\uFF0C\u5982\u679C\u6362\u6210DEBUG\u8868\u793A\u8F93\u51FADEBUG\u4EE5\u4E0A\u7EA7\u522B\u65E5\u5FD7 #\u8F93\u51FA\u6240\u4EE5\u65E5\u5FD7\uFF0C\u5982\u679C\u6362\u6210DEBUG\u8868\u793A\u8F93\u51FADEBUG\u4EE5\u4E0A\u7EA7\u522B\u65E5\u5FD7
log4j.appender.File.Threshold = ALL log4j.appender.File.Threshold = ALL
log4j.appender.File.layout = org.apache.log4j.PatternLayout log4j.appender.File.layout = org.apache.log4j.PatternLayout
log4j.appender.File.layout.ConversionPattern =[%p] [%d{yyyy-MM-dd HH\:mm\:ss}][%c]%m%n log4j.appender.File.layout.ConversionPattern =[ifishMQ][%p] [%d{yyyy-MM-dd HH\:mm\:ss}][%c]%m%n

View File

@ -0,0 +1,5 @@
broker_url=tcp://localhost:61616
username=admin
password=admin
queue_data_name=ifishDataMq
queue_push_name=ifishPushMq

View File

@ -8,15 +8,18 @@
http://www.springframework.org/schema/context http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-4.1.xsd http://www.springframework.org/schema/context/spring-context-4.1.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core-5.14.1.xsd"> http://activemq.apache.org/schema/core/activemq-core-5.14.1.xsd">
<bean id="propertyConfigurer"
<context:component-scan base-package="com.ifish7.mq.activemq" /> class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="location" value="classpath:mq.properties" />
</bean>
<context:component-scan base-package="com.ifish7.mq.queues.*" />
<amq:connectionFactory id="amqConnectionFactory" <amq:connectionFactory id="amqConnectionFactory"
brokerURL="tcp://localhost:61616" brokerURL="${broker_url}"
userName="admin" userName="${username}"
password="admin" /> password="${password}" />
<!-- 配置JMS连接工长 --> <!-- 配置JMS连接工长 -->
<bean id="connectionFactory" <bean id="connectionFactory"
@ -25,31 +28,51 @@
<property name="sessionCacheSize" value="100" /> <property name="sessionCacheSize" value="100" />
</bean> </bean>
<!-- 定义消息队列Queue --> <!-- 定义数据存储消息队列Queue -->
<bean id="dnkxQueueDestination" class="org.apache.activemq.command.ActiveMQQueue"> <bean id="ifish7DataQueueDestination" class="org.apache.activemq.command.ActiveMQQueue">
<!-- 设置消息队列的名字 --> <!-- 设置消息队列的名字 -->
<constructor-arg> <constructor-arg>
<value>ifishDataMq</value> <value>${queue_data_name}</value>
</constructor-arg> </constructor-arg>
</bean> </bean>
<!-- 定义消息推送消息队列Queue -->
<!-- 配置JMS模板QueueSpring提供的JMS工具类它发送、接收消息。 --> <bean id="ifish7PushQueueDestination" class="org.apache.activemq.command.ActiveMQQueue">
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <!-- 设置消息队列的名字 -->
<property name="connectionFactory" ref="connectionFactory" /> <constructor-arg>
<property name="defaultDestination" ref="dnkxQueueDestination" /> <value>${queue_push_name}</value>
<property name="receiveTimeout" value="10000" /> </constructor-arg>
<!-- true是topicfalse是queue默认是false此处显示写出false -->
<property name="pubSubDomain" value="false" />
</bean> </bean>
<!-- 配置消息队列监听者Queue --> <!-- 配置消息队列监听者Queue -->
<bean id="queueMessageListener" class="com.ifish7.mq.activemq.listener.QueueMessageListener" /> <bean id="ifishDataQueueMessageListener" class="com.ifish7.mq.queues.listener.IfishDataQueueMessageListener" />
<bean id="ifishPushQueueMessageListener" class="com.ifish7.mq.queues.listener.IfishPushQueueMessageListener" />
<!-- 显示注入消息监听容器Queue配置连接工厂监听的目标是demoQueueDestination监听器是上面定义的监听器 -->
<bean id="queueListenerContainer" <!-- 监听 -->
<bean id="queueDataListenerContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer"> class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" /> <property name="connectionFactory" ref="connectionFactory" />
<property name="destination" ref="dnkxQueueDestination" /> <property name="destination" ref="ifish7DataQueueDestination" />
<property name="messageListener" ref="queueMessageListener" /> <property name="messageListener" ref="ifishDataQueueMessageListener" />
</bean> </bean>
</beans>
<bean id="queuePushListenerContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destination" ref="ifish7PushQueueDestination" />
<property name="messageListener" ref="ifishPushQueueMessageListener" />
</bean>
<!-- topics -->
<bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg value="ruixin/2a"/>
</bean>
<bean id="consumerSessionAwareMessageListener" class="com.ifish7.mq.mqtt.client.ClientMqttMessageListener"/>
<bean id="sessionAwareListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destination" ref="topicDestination" />
<property name="messageListener" ref="consumerSessionAwareMessageListener" />
</bean>
</beans>