如何用MQTT协议实现消息的订阅接收?
admin
2023-02-20 04:40:05
0

MQTT协议因低延迟、效率高在工业物联网领域使用的频率特别高。前文介绍了如何用代码发送MQTT消息,本文在前文的基础上实现MQTT消息的订阅接收。
操作步骤:

  1. 引入相关的依赖

    org.springframework.boot
    spring-boot-starter-integration


    org.springframework.integration
    spring-integration-mqtt



    org.projectlombok
    lombok
    true
  1. 在application.yml配置MQTT服务器信息
server:
  port: 8090

mqtt:
  host: tcp://127.0.0.1:1883
  clientinid: mqttinId
  clientoutid: mqttoutid
  topic: virus
  qoslevel: 1
  #MQTT 认证
  username: xxx
  password: xxx
  timeout: 10000
  #20s
  keepalive: 20
  1. 配置MQTT消息推送配置
package com.favccxx.mqtt.config;

import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;

@Slf4j
@Configuration
@IntegrationComponentScan
public class MQTTReceiveConfig {

    @Value("${mqtt.username}")
    private String username;

    @Value("${mqtt.password}")
    private String password;

    @Value("${mqtt.host}")
    private String hostUrl;

    @Value("${mqtt.clientinid}")
    private String clientId;

    @Value("${mqtt.topic}")
    private String defaultTopic;

    @Value("${mqtt.timeout}")
    private int completionTimeout ;   //连接超时

    @Bean
    public MqttConnectOptions getReceiverMqttConnectOptions(){
        MqttConnectOptions mqttConnectOptions=new MqttConnectOptions();
        mqttConnectOptions.setCleanSession(true);
        mqttConnectOptions.setConnectionTimeout(10);
        mqttConnectOptions.setKeepAliveInterval(90);
        mqttConnectOptions.setAutomaticReconnect(true);
        mqttConnectOptions.setUserName(username);
        mqttConnectOptions.setPassword(password.toCharArray());
        mqttConnectOptions.setServerURIs(new String[]{hostUrl});
        mqttConnectOptions.setKeepAliveInterval(2);
        return mqttConnectOptions;
    }
    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        factory.setConnectionOptions(getReceiverMqttConnectOptions());
        return factory;
    }

    //接收通道
    @Bean
    public MessageChannel mqttInputChannel() {
        return new DirectChannel();
    }

    //配置client,监听的topic
    @Bean
    public MessageProducer inbound() {
        MqttPahoMessageDrivenChannelAdapter adapter =
                new MqttPahoMessageDrivenChannelAdapter(clientId+"_inbound", mqttClientFactory(),
                        defaultTopic);
        adapter.setCompletionTimeout(completionTimeout);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(1);
        adapter.setOutputChannel(mqttInputChannel());
        return adapter;
    }

    //通过通道获取数据
    @Bean
    @ServiceActivator(inputChannel = "mqttInputChannel")
    public MessageHandler handler() {
        return new MessageHandler() {
            @Override
            public void handleMessage(Message message) throws MessagingException {
                log.info("主题:{},消息接收到的数据:{}", message.getHeaders().get("mqtt_receivedTopic"), message.getPayload());
            }
        };
    }

}

相关内容

热门资讯

【第一财经】“人人乐麻将.可以... 【第一财经】“人人乐麻将.可以开挂吗?”原来真的有挂您好,人人乐麻将这个游戏其实有挂的,确实是有挂的...
【第一财经】“aapoker.... 家人们!今天小编来为大家解答aapoker透视挂怎么安装这个问题咨询软件客服徽9784099的挂在哪...
终于懂了“科乐填大坑.怎么开挂... 终于懂了“科乐填大坑.怎么开挂?”透视曝光猫腻您好,科乐填大坑这个游戏其实有挂的,确实是有挂的,需要...
【今日要闻】“微乐云南麻将.开... 有 亲,根据资深记者爆料微乐云南麻将是可以开挂的,确实有挂(咨询软件无需...
今日重大发现“九线拉王.到底有... 家人们!今天小编来为大家解答九线拉王透视挂怎么安装这个问题咨询软件客服徽9784099的挂在哪里买很...
【第一消息】“十三十三水.辅助... 家人们!今天小编来为大家解答十三十三水透视挂怎么安装这个问题咨询软件客服徽9784099的挂在哪里买...
今日重大发现“福建兄弟十三水.... 网上科普关于“福建兄弟十三水有没有挂”话题很是火热,小编也是针对福建兄弟十三水作*弊开挂的方法以及开...
重磅消息“开心休闲.到底有挂吗... 家人们!今天小编来为大家解答开心休闲透视挂怎么安装这个问题咨询软件客服徽9752949的挂在哪里买很...
【第一财经】“火神牛牛.辅助器... 网上科普关于“火神牛牛有没有挂”话题很是火热,小编也是针对火神牛牛作*弊开挂的方法以及开挂对应的知识...
【今日要闻】“功夫熊猫炸金花.... 家人们!今天小编来为大家解答功夫熊猫炸金花透视挂怎么安装这个问题咨询软件客服徽4282891的挂在哪...