首页 最新 热门 推荐

  • 首页
  • 最新
  • 热门
  • 推荐

springboot集成mqtt【启动即连接服务器与订阅主题】

  • 25-03-07 10:43
  • 2931
  • 7031
blog.csdn.net

目录

  • 0. 碎碎念
  • 1. 代码部分
    • 1.1 pom.xml
    • 1.2 application.yml
    • 1.3 domain.AjaxResult
    • 1.4 mqtt.callback.MqttMessageListenerCommon
    • 1.5 mqtt.config.MqttConfiguration
    • 1.6 mqtt.service.MqttService
    • 1.7 mqtt.controller.MqttController
  • 2.测试
  • 2.1 项目初启动即能收到消息
    • 2.2 向已订阅的主题发送消息
    • 2.3 调用接口订阅主题
    • 2.4 调用接口给主题发送信息

0. 碎碎念

    是向小白版的springboot中集成mqtt服务和springboot集成mqtt(超级无敌详细)学习整理的。但是因为我想要弄成一个,不是项目一启动就连接服务器的,而且连接成功服务器就订阅好固定的主题,太菜了一直理不明白(主要是注入的问题一直有点晕)。
    目前这个是项目一启动就连接mqtt,然后连接成功后就可以订阅主题,这样。

1. 代码部分

    按我个人理解的顺序放的,创建项目直接创建的默认springboot项目,jdk默认用的17。

1.1 pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>3.3.4</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.selma</groupId>
    <artifactId>SpringBootMqttTest</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>SpringBootMqttTest</name>
    <description>SpringBootMqttTest</description>
    <url/>
    <licenses>
        <license/>
    </licenses>
    <developers>
        <developer/>
    </developers>
    <scm>
        <connection/>
        <developerConnection/>
        <tag/>
        <url/>
    </scm>
    <properties>
        <java.version>17</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!-- 小辣椒 -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>

        <!-- MQTT有关依赖 -->
        <dependency>
            <groupId>org.eclipse.paho</groupId>
            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
            <version>1.2.5</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73

1.2 application.yml

server:
  port: 8199

spring:
  application:
    name: SpringBootMqttTest

## MQTT配置
mqtt:
  host: tcp://broker.emqx.io:1883
  qos: 1
  clientId: mqtt_server_common
  timeout: 10
  keepalive: 20
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

1.3 domain.AjaxResult

    用来controller返回值用的

package com.selma.domain;

import java.util.HashMap;

public class AjaxResult extends HashMap<String, Object> {
    /**
     * 初始化一个新创建的 Message 对象
     */
    public AjaxResult() {
    }

    /**
     * 返回错误消息
     *
     * @return 错误消息
     */
    public static AjaxResult error() {
        return error(500, "操作失败");
    }

    /**
     * 返回错误消息
     *
     * @param msg 内容
     * @return 错误消息
     */
    public static AjaxResult error(String msg) {
        return error(500, msg);
    }

    /**
     * 返回错误消息
     *
     * @param code 错误码
     * @param msg  内容
     * @return 错误消息
     */
    public static AjaxResult error(int code, String msg) {
        AjaxResult json = new AjaxResult();
        json.put("code", code);
        json.put("msg", msg);
        return json;
    }

    /**
     * 返回成功消息
     *
     * @param msg 内容
     * @return 成功消息
     */
    public static AjaxResult success(String msg) {
        AjaxResult json = new AjaxResult();
        json.put("msg", msg);
        json.put("code", 200);
        return json;
    }

    /**
     * 返回成功消息
     *
     * @return 成功消息
     */
    public static AjaxResult success() {
        return AjaxResult.success("操作成功");
    }

    public static AjaxResult success(Object value) {
        return AjaxResult.successData(200, value);
    }

    public static AjaxResult successData(int code, Object value) {
        AjaxResult json = new AjaxResult();
        json.put("code", code);
        json.put("data", value);
        return json;
    }

    /**
     * 返回成功消息
     *
     * @param key   键值
     * @param value 内容
     * @return 成功消息
     */
    @Override
    public AjaxResult put(String key, Object value) {
        super.put(key, value);
        return this;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90

1.4 mqtt.callback.MqttMessageListenerCommon

    回调方法综合类

package com.selma.mqtt.callback;

import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttMessage;

@Slf4j
public class MqttMessageListenerCommon implements MqttCallbackExtended {
    @Override
    public void connectComplete(boolean reconnect, String serverURI) {
        log.info("MQTT {} 连接成功,连接方式:{}", serverURI, reconnect ? "重连" : "直连");
    }

    @Override
    public void connectionLost(Throwable throwable) {
        log.error("mqtt connectionLost 连接断开: {}", throwable.getMessage());
    }

    @Override
    public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
        log.info("Message received from topic {} : {}", topic, new String(mqttMessage.getPayload()));
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
        log.info("==========deliveryComplete={}==========", iMqttDeliveryToken.isComplete());
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29

1.5 mqtt.config.MqttConfiguration

    mqtt对象注入config

package com.selma.mqtt.config;

import com.selma.mqtt.callback.MqttMessageListenerCommon;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class MqttConfiguration {
    @Value("${mqtt.host}")
    String mqttHostCommon;

    @Value("${mqtt.clientId}")
    String mqttClientIdCommon;

    @Bean
    public MqttClient mqttClientCommon() {
        try {
            MqttClient mqttClient = new MqttClient(mqttHostCommon, mqttClientIdCommon);
            mqttClient.setCallback(new MqttMessageListenerCommon());

            MqttConnectOptions options = new MqttConnectOptions();
            options.setCleanSession(true); // 设置为干净会话
            mqttClient.connect(options);
            return mqttClient;
        } catch (MqttException e) {
            throw new RuntimeException(e);
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33

1.6 mqtt.service.MqttService

    mqtt有关方法(目前只有我测试的)

package com.selma.mqtt.service;

import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.stereotype.Service;

import jakarta.annotation.PostConstruct;
import jakarta.annotation.Resource;

@Service
@Slf4j
public class MqttService {

    @Resource
    private MqttClient mqttClientCommon;

    /**
     * 项目启动后就调用方法
     *
     * @throws MqttException
     */
    @PostConstruct
    public void defaultSubscribe() {
        try {
            //up是传输上来的信息
            this.subscribeCommon("selma/up", 0);
            this.publishMessageCommon("selma/down", "hello", 0);
//        } catch (MqttException mqttException) {
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 给commonClient订阅指定主题
     *
     * @param topic
     * @param qos
     */
    public void subscribeCommon(String topic, int qos) {
        try {
            mqttClientCommon.subscribe(topic, qos);
        } catch (MqttException mqttException) {
            mqttException.printStackTrace();
        }
    }

    /**
     * 给commonClient的指定主题发送指定消息
     *
     * @param topic
     * @param msg
     * @param qos
     */
    public void publishMessageCommon(String topic, String msg, int qos) {
        try {
            byte[] msgBytes = msg.getBytes();
            MqttMessage message = new MqttMessage(msgBytes);
            message.setQos(qos);
            mqttClientCommon.publish(topic, message);
        } catch (MqttException mqttException) {
            mqttException.printStackTrace();
        }
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68

1.7 mqtt.controller.MqttController

package com.selma.mqtt.controller;

import com.selma.domain.AjaxResult;
import com.selma.mqtt.service.MqttService;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/selma/mqtt")
@Slf4j
public class MqttController {

    @Resource
    MqttService mqttService;

    @PostMapping("/subscribeCommon")
    public AjaxResult subscribeCommon(String topic) {
        mqttService.subscribeCommon(topic, 0);
        return AjaxResult.success();
    }

    @PostMapping("/publishMessageCommon")
    public AjaxResult publishMessageCommon(String topic, String msg) {
        mqttService.publishMessageCommon(topic, msg, 0);
        return AjaxResult.success();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30

2.测试

2.1 项目初启动即能收到消息

    代码部分对应mqtt.service.MqttService.defaultSubscribe()
图2.1

2.2 向已订阅的主题发送消息

    已订阅的主题来源对应mqtt.service.MqttService.defaultSubscribe()
图2.2

2.3 调用接口订阅主题

    接口对应mqtt.controller.MqttController.subscribeCommon(…)

图2.3
图2.4

2.4 调用接口给主题发送信息

    接口对应mqtt.controller.MqttController.publishMessageCommon(…, …)
图2.5

注:本文转载自blog.csdn.net的陈依劼的文章"https://blog.csdn.net/Selma003/article/details/142764383"。版权归原作者所有,此博客不拥有其著作权,亦不承担相应法律责任。如有侵权,请联系我们删除。
复制链接
复制链接
相关推荐
发表评论
登录后才能发表评论和回复 注册

/ 登录

评论记录:

未查询到任何数据!
回复评论:

分类栏目

后端 (14832) 前端 (14280) 移动开发 (3760) 编程语言 (3851) Java (3904) Python (3298) 人工智能 (10119) AIGC (2810) 大数据 (3499) 数据库 (3945) 数据结构与算法 (3757) 音视频 (2669) 云原生 (3145) 云平台 (2965) 前沿技术 (2993) 开源 (2160) 小程序 (2860) 运维 (2533) 服务器 (2698) 操作系统 (2325) 硬件开发 (2492) 嵌入式 (2955) 微软技术 (2769) 软件工程 (2056) 测试 (2865) 网络空间安全 (2948) 网络与通信 (2797) 用户体验设计 (2592) 学习和成长 (2593) 搜索 (2744) 开发工具 (7108) 游戏 (2829) HarmonyOS (2935) 区块链 (2782) 数学 (3112) 3C硬件 (2759) 资讯 (2909) Android (4709) iOS (1850) 代码人生 (3043) 阅读 (2841)

热门文章

121
服务器
关于我们 隐私政策 免责声明 联系我们
Copyright © 2020-2024 蚁人论坛 (iYenn.com) All Rights Reserved.
Scroll to Top