0. 碎碎念
是向小白版的springboot中集成mqtt服务和springboot集成mqtt(超级无敌详细)学习整理的。但是因为我想要弄成一个,不是项目一启动就连接服务器的,而且连接成功服务器就订阅好固定的主题,太菜了一直理不明白(主要是注入的问题一直有点晕)。
目前这个是项目一启动就连接mqtt,然后连接成功后就可以订阅主题,这样。
1. 代码部分
按我个人理解的顺序放的,创建项目直接创建的默认springboot项目,jdk默认用的17。
1.1 pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.3.4</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.selma</groupId>
<artifactId>SpringBootMqttTest</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>SpringBootMqttTest</name>
<description>SpringBootMqttTest</description>
<url/>
<licenses>
<license/>
</licenses>
<developers>
<developer/>
</developers>
<scm>
<connection/>
<developerConnection/>
<tag/>
<url/>
</scm>
<properties>
<java.version>17</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- 小辣椒 -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<!-- MQTT有关依赖 -->
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.5</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
1.2 application.yml
server:
port: 8199
spring:
application:
name: SpringBootMqttTest
## MQTT配置
mqtt:
host: tcp://broker.emqx.io:1883
qos: 1
clientId: mqtt_server_common
timeout: 10
keepalive: 20
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
1.3 domain.AjaxResult
用来controller返回值用的
package com.selma.domain;
import java.util.HashMap;
public class AjaxResult extends HashMap<String, Object> {
/**
* 初始化一个新创建的 Message 对象
*/
public AjaxResult() {
}
/**
* 返回错误消息
*
* @return 错误消息
*/
public static AjaxResult error() {
return error(500, "操作失败");
}
/**
* 返回错误消息
*
* @param msg 内容
* @return 错误消息
*/
public static AjaxResult error(String msg) {
return error(500, msg);
}
/**
* 返回错误消息
*
* @param code 错误码
* @param msg 内容
* @return 错误消息
*/
public static AjaxResult error(int code, String msg) {
AjaxResult json = new AjaxResult();
json.put("code", code);
json.put("msg", msg);
return json;
}
/**
* 返回成功消息
*
* @param msg 内容
* @return 成功消息
*/
public static AjaxResult success(String msg) {
AjaxResult json = new AjaxResult();
json.put("msg", msg);
json.put("code", 200);
return json;
}
/**
* 返回成功消息
*
* @return 成功消息
*/
public static AjaxResult success() {
return AjaxResult.success("操作成功");
}
public static AjaxResult success(Object value) {
return AjaxResult.successData(200, value);
}
public static AjaxResult successData(int code, Object value) {
AjaxResult json = new AjaxResult();
json.put("code", code);
json.put("data", value);
return json;
}
/**
* 返回成功消息
*
* @param key 键值
* @param value 内容
* @return 成功消息
*/
@Override
public AjaxResult put(String key, Object value) {
super.put(key, value);
return this;
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
- 85
- 86
- 87
- 88
- 89
- 90
1.4 mqtt.callback.MqttMessageListenerCommon
回调方法综合类
package com.selma.mqtt.callback;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttMessage;
@Slf4j
public class MqttMessageListenerCommon implements MqttCallbackExtended {
@Override
public void connectComplete(boolean reconnect, String serverURI) {
log.info("MQTT {} 连接成功,连接方式:{}", serverURI, reconnect ? "重连" : "直连");
}
@Override
public void connectionLost(Throwable throwable) {
log.error("mqtt connectionLost 连接断开: {}", throwable.getMessage());
}
@Override
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
log.info("Message received from topic {} : {}", topic, new String(mqttMessage.getPayload()));
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
log.info("==========deliveryComplete={}==========", iMqttDeliveryToken.isComplete());
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
1.5 mqtt.config.MqttConfiguration
mqtt对象注入config
package com.selma.mqtt.config;
import com.selma.mqtt.callback.MqttMessageListenerCommon;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MqttConfiguration {
@Value("${mqtt.host}")
String mqttHostCommon;
@Value("${mqtt.clientId}")
String mqttClientIdCommon;
@Bean
public MqttClient mqttClientCommon() {
try {
MqttClient mqttClient = new MqttClient(mqttHostCommon, mqttClientIdCommon);
mqttClient.setCallback(new MqttMessageListenerCommon());
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(true); // 设置为干净会话
mqttClient.connect(options);
return mqttClient;
} catch (MqttException e) {
throw new RuntimeException(e);
}
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
1.6 mqtt.service.MqttService
mqtt有关方法(目前只有我测试的)
package com.selma.mqtt.service;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.stereotype.Service;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.Resource;
@Service
@Slf4j
public class MqttService {
@Resource
private MqttClient mqttClientCommon;
/**
* 项目启动后就调用方法
*
* @throws MqttException
*/
@PostConstruct
public void defaultSubscribe() {
try {
//up是传输上来的信息
this.subscribeCommon("selma/up", 0);
this.publishMessageCommon("selma/down", "hello", 0);
// } catch (MqttException mqttException) {
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 给commonClient订阅指定主题
*
* @param topic
* @param qos
*/
public void subscribeCommon(String topic, int qos) {
try {
mqttClientCommon.subscribe(topic, qos);
} catch (MqttException mqttException) {
mqttException.printStackTrace();
}
}
/**
* 给commonClient的指定主题发送指定消息
*
* @param topic
* @param msg
* @param qos
*/
public void publishMessageCommon(String topic, String msg, int qos) {
try {
byte[] msgBytes = msg.getBytes();
MqttMessage message = new MqttMessage(msgBytes);
message.setQos(qos);
mqttClientCommon.publish(topic, message);
} catch (MqttException mqttException) {
mqttException.printStackTrace();
}
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
1.7 mqtt.controller.MqttController
package com.selma.mqtt.controller;
import com.selma.domain.AjaxResult;
import com.selma.mqtt.service.MqttService;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/selma/mqtt")
@Slf4j
public class MqttController {
@Resource
MqttService mqttService;
@PostMapping("/subscribeCommon")
public AjaxResult subscribeCommon(String topic) {
mqttService.subscribeCommon(topic, 0);
return AjaxResult.success();
}
@PostMapping("/publishMessageCommon")
public AjaxResult publishMessageCommon(String topic, String msg) {
mqttService.publishMessageCommon(topic, msg, 0);
return AjaxResult.success();
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
2.测试
2.1 项目初启动即能收到消息
代码部分对应mqtt.service.MqttService.defaultSubscribe()
2.2 向已订阅的主题发送消息
已订阅的主题来源对应mqtt.service.MqttService.defaultSubscribe()
2.3 调用接口订阅主题
接口对应mqtt.controller.MqttController.subscribeCommon(…)
2.4 调用接口给主题发送信息
接口对应mqtt.controller.MqttController.publishMessageCommon(…, …)
评论记录:
回复评论: