一 spingboot整合mqtt
原理:
 二 操作案例
2.1 工程结构
 2.2 配置pom文件
<dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> 
<version>4.13</version> <scope>test</scope> </dependency> <!-- mqtt --> 
<dependency> <groupId>org.springframework.boot</groupId> 
<artifactId>spring-boot-starter-integration</artifactId> </dependency> 
<dependency> <groupId>org.springframework.integration</groupId> 
<artifactId>spring-integration-stream</artifactId> </dependency> <dependency> 
<groupId>org.springframework.integration</groupId> 
<artifactId>spring-integration-mqtt</artifactId> </dependency> <!-- lombok --> 
<dependency> <groupId>org.projectlombok</groupId> 
<artifactId>lombok</artifactId> <version>1.16.16</version> </dependency> <!-- 
springBoot的启动器 --> <dependency> <groupId>org.springframework.boot</groupId> 
<artifactId>spring-boot-starter-web</artifactId> 
<version>2.0.1.RELEASE</version> </dependency> <dependency> 
<groupId>org.springframework</groupId> <artifactId>spring-web</artifactId> 
<version>5.1.5.RELEASE</version> </dependency> 
 2.3 配置application配置文件
server: port: 8081 spring: mqtt: username: admin # 账号 password: public # 密码 
host-url: tcp://172.16.71.150:1883 # mqtt连接tcp地址 client-id: mq-dky-0813 # 
客户端Id,每个启动的id要不同 default-topic: mq-dky-guolu # 默认主题 timeout: 100 # 超时时间 
keepalive: 100 
2.4 读取配置文件,初始客户端
package com.ljf.mqtt.demo.config; import 
com.ljf.mqtt.demo.client.MqttPushClient; import lombok.Getter; import 
lombok.Setter; import org.springframework.beans.factory.annotation.Autowired; 
import org.springframework.boot.context.properties.ConfigurationProperties; 
import org.springframework.context.annotation.Bean; import 
org.springframework.stereotype.Component; /** * @ClassName: MqttConfig * 
@Description: TODO * @Author: liujianfu * @Date: 2021/08/16 14:43:39  * 
@Version: V1.0 **/ @Component @ConfigurationProperties("spring.mqtt") @Setter 
@Getter public class MqttConfig { @Autowired private MqttPushClient 
mqttPushClient; /** * 用户名 */ private String username; /** * 密码 */ private 
String password; /** * 连接地址 */ private String hostUrl; /** * 客户Id */ private 
String clientId; /** * 默认连接话题 */ private String defaultTopic; /** * 超时时间 */ 
private int timeout; /** * 保持连接数 */ private int keepalive; @Bean public 
MqttPushClient getMqttPushClient() { mqttPushClient.connect(hostUrl, clientId, 
username, password, timeout, keepalive); // 以/#结尾表示订阅所有以test开头的主题 
mqttPushClient.subscribe(defaultTopic, 0); return mqttPushClient; } } 
2.4 订阅推送客户端
package com.ljf.mqtt.demo.client; import 
com.ljf.mqtt.demo.listener.PushCallback; import 
org.eclipse.paho.client.mqttv3.*; import 
org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import 
org.slf4j.Logger; import org.slf4j.LoggerFactory; import 
org.springframework.beans.factory.annotation.Autowired; import 
org.springframework.stereotype.Component; /** * @ClassName: MqttPushClient * 
@Description: TODO * @Author: liujianfu * @Date: 2021/08/16 14:48:38  * 
@Version: V1.0 **/ @Component public class MqttPushClient { private static 
final Logger logger = LoggerFactory.getLogger(MqttPushClient.class); @Autowired 
private PushCallback pushCallback; private static MqttClient client; private 
static MqttClient getClient() { return client; } private static void 
setClient(MqttClient client) { MqttPushClient.client = client; } /** * 客户端连接 * 
* @param host ip+端口 * @param clientID 客户端Id * @param username 用户名 * @param 
password 密码 * @param timeout 超时时间 * @param keepalive 保留数 */ public void 
connect(String host, String clientID, String username, String password, int 
timeout, int keepalive) { MqttClient client; try { client = new 
MqttClient(host, clientID, new MemoryPersistence()); MqttConnectOptions options 
= new MqttConnectOptions(); options.setCleanSession(true); 
options.setUserName(username); options.setPassword(password.toCharArray()); 
options.setConnectionTimeout(timeout); options.setKeepAliveInterval(keepalive); 
MqttPushClient.setClient(client); try { client.setCallback(pushCallback); 
client.connect(options); } catch (Exception e) { e.printStackTrace(); } } catch 
(Exception e) { e.printStackTrace(); } } /** * 发布 * * @param qos 连接方式 * @param 
retained 是否保留 * @param topic 主题 * @param pushMessage 消息体 */ public void 
publish(int qos, boolean retained, String topic, String pushMessage) { 
MqttMessage message = new MqttMessage(); message.setQos(qos); 
message.setRetained(retained); message.setPayload(pushMessage.getBytes()); 
MqttTopic mTopic = MqttPushClient.getClient().getTopic(topic); if (null == 
mTopic) { logger.error("topic not exist"); } MqttDeliveryToken token; try { 
token = mTopic.publish(message); token.waitForCompletion(); } catch 
(MqttPersistenceException e) { e.printStackTrace(); } catch (MqttException e) { 
e.printStackTrace(); } } /** * 订阅某个主题 * * @param topic 主题 * @param qos 连接方式 */ 
public void subscribe(String topic, int qos) { 
logger.info("==============开始订阅主题=========" + topic); try { 
MqttPushClient.getClient().subscribe(topic, qos); } catch (MqttException e) { 
e.printStackTrace(); } } } 
2.5 定制监听订阅者
package com.ljf.mqtt.demo.listener; import 
com.ljf.mqtt.demo.config.MqttConfig; import 
org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import 
org.eclipse.paho.client.mqttv3.MqttCallback; import 
org.eclipse.paho.client.mqttv3.MqttClient; import 
org.eclipse.paho.client.mqttv3.MqttMessage; import org.slf4j.Logger; import 
org.slf4j.LoggerFactory; import 
org.springframework.beans.factory.annotation.Autowired; import 
org.springframework.stereotype.Component; /** * @ClassName: PushCallback * 
@Description: TODO * @Author: liujianfu * @Date: 2021/08/16 14:52:20  * 
@Version: V1.0 **/ @Component public class PushCallback implements MqttCallback 
{ private static final Logger logger = 
LoggerFactory.getLogger(PushCallback.class); @Autowired private MqttConfig 
mqttConfig; private static MqttClient client; @Override public void 
connectionLost(Throwable throwable) { // 连接丢失后,一般在这里面进行重连 
logger.info("连接断开,可以做重连"); if (client == null || !client.isConnected()) { 
mqttConfig.getMqttPushClient(); } } @Override public void messageArrived(String 
topic, MqttMessage mqttMessage) throws Exception { // subscribe后得到的消息会执行到这里面 
logger.info("接收消息主题 : " + topic); logger.info("接收消息Qos : " + 
mqttMessage.getQos()); logger.info("接收消息内容 : " + new 
String(mqttMessage.getPayload())); } @Override public void 
deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { 
logger.info("deliveryComplete---------" + iMqttDeliveryToken.isComplete()); } } 
2.6 发布数据
package com.ljf.mqtt.demo.controller; import 
com.ljf.mqtt.demo.client.MqttPushClient; import com.ljf.mqtt.demo.utils.R; 
import org.springframework.beans.factory.annotation.Autowired; import 
org.springframework.web.bind.annotation.GetMapping; import 
org.springframework.web.bind.annotation.RequestMapping; import 
org.springframework.web.bind.annotation.RestController; /** * @ClassName: 
PullController * @Description: TODO * @Author: liujianfu * @Date: 
2021/08/16 14:56:18  * @Version: V1.0 **/ @RestController @RequestMapping("/") 
public class PullController { @Autowired private MqttPushClient mqttPushClient; 
/** * @author liujianfu * @description 测试发布主题 * @date 2021/8/16 15:04 * @param 
[] * @return RUtils */ @GetMapping(value = "/publishTopic") public R 
publishTopic(String sendMessage) { System.out.println("message:"+sendMessage); 
sendMessage=sendMessage+" : {\"name\":\"ljf\",\"age\":345}"; 
mqttPushClient.publish(0,false,"mq-dky-guolu",sendMessage); return R.ok("OK"); 
} } 
2.7 发布数据
1.发布数据:
 2.订阅消费数据
 3.emqx页面
4.在页面进行模拟
连接
订阅
 推送:
java代码客户端: