如何用MQTT协议实现消息的订阅接收?
admin
2023-02-20 04:40:05
0

MQTT协议因低延迟、效率高在工业物联网领域使用的频率特别高。前文介绍了如何用代码发送MQTT消息,本文在前文的基础上实现MQTT消息的订阅接收。
操作步骤:

  1. 引入相关的依赖

    org.springframework.boot
    spring-boot-starter-integration


    org.springframework.integration
    spring-integration-mqtt



    org.projectlombok
    lombok
    true
  1. 在application.yml配置MQTT服务器信息
server:
  port: 8090

mqtt:
  host: tcp://127.0.0.1:1883
  clientinid: mqttinId
  clientoutid: mqttoutid
  topic: virus
  qoslevel: 1
  #MQTT 认证
  username: xxx
  password: xxx
  timeout: 10000
  #20s
  keepalive: 20
  1. 配置MQTT消息推送配置
package com.favccxx.mqtt.config;

import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;

@Slf4j
@Configuration
@IntegrationComponentScan
public class MQTTReceiveConfig {

    @Value("${mqtt.username}")
    private String username;

    @Value("${mqtt.password}")
    private String password;

    @Value("${mqtt.host}")
    private String hostUrl;

    @Value("${mqtt.clientinid}")
    private String clientId;

    @Value("${mqtt.topic}")
    private String defaultTopic;

    @Value("${mqtt.timeout}")
    private int completionTimeout ;   //连接超时

    @Bean
    public MqttConnectOptions getReceiverMqttConnectOptions(){
        MqttConnectOptions mqttConnectOptions=new MqttConnectOptions();
        mqttConnectOptions.setCleanSession(true);
        mqttConnectOptions.setConnectionTimeout(10);
        mqttConnectOptions.setKeepAliveInterval(90);
        mqttConnectOptions.setAutomaticReconnect(true);
        mqttConnectOptions.setUserName(username);
        mqttConnectOptions.setPassword(password.toCharArray());
        mqttConnectOptions.setServerURIs(new String[]{hostUrl});
        mqttConnectOptions.setKeepAliveInterval(2);
        return mqttConnectOptions;
    }
    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        factory.setConnectionOptions(getReceiverMqttConnectOptions());
        return factory;
    }

    //接收通道
    @Bean
    public MessageChannel mqttInputChannel() {
        return new DirectChannel();
    }

    //配置client,监听的topic
    @Bean
    public MessageProducer inbound() {
        MqttPahoMessageDrivenChannelAdapter adapter =
                new MqttPahoMessageDrivenChannelAdapter(clientId+"_inbound", mqttClientFactory(),
                        defaultTopic);
        adapter.setCompletionTimeout(completionTimeout);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(1);
        adapter.setOutputChannel(mqttInputChannel());
        return adapter;
    }

    //通过通道获取数据
    @Bean
    @ServiceActivator(inputChannel = "mqttInputChannel")
    public MessageHandler handler() {
        return new MessageHandler() {
            @Override
            public void handleMessage(Message message) throws MessagingException {
                log.info("主题:{},消息接收到的数据:{}", message.getHeaders().get("mqtt_receivedTopic"), message.getPayload());
            }
        };
    }

}

相关内容

热门资讯

安徽省委常委、合肥市委书记费高... 安徽省委常委、合肥市委书记费高云涉嫌严重违纪违法,目前正接受中央纪委国家监委纪律审查和监察调查。
蓝绿对决白热化!民进党负面选战... 离台湾县市长选举还有半年时间,蓝绿之间的博弈日渐白热化。继民进党民代沈伯洋拿“鼠患”文章攻击台北市长...
俄安全局逮捕多名为乌收集情报的... 总台记者当地时间6日获悉,俄罗斯联邦安全局消息称,抓获5名为乌克兰情报机构工作的特工,包括4名俄罗斯...
美以官员:若僵局持续,特朗普本... 美伊目前虽然维持停火局面,但是外交谈判迟迟没有进展。美国和以色列官员说,僵局若持续,美国总统特朗普本...
警方介入女游客高空秋千坠亡,家... 近日,女游客体验高空秋千坠亡事件引发关注。5月5日,四川省广安市华蓥市“5·3”事故调查组发布情况通...
存储涨价之后,如何让AI走向数... 国家数据局数据显示,从2024年到2026年,中国日均Token调用量从1000亿飙升至140万亿(...
华为MatePad Pro M... IT之家 5 月 6 日消息,华为海外账号 Huawei Mobile 今日公布了一款华为 Mate...
万象 一体化防爆气象仪:做高危... WX-FBQ2万象 一体化防爆气象仪:做高危环境的“硬核守护者” 在石油化工、油气储备及煤矿矿井等高...
母亲节好礼推荐 三星Galax... 母亲节悄然临近,一份兼具心意与实用性的礼物,成为子女们表达感恩的最佳载体。在智能手机成为生活必需品的...
视频丨联合国举行AI主题会议 ... 日,中国、赞比亚常驻联合国代表团和中国科学技术协会在纽约联合国总部共同举办“人工智能能力建设国际合作...