使用RabbitMQ和Spring Cloud的MQTT客户端负载平衡

扩展您对MQTT Client和负载平衡的了解

前言

MQTT是机器对机器(M2M)的IoT连接协议。 它被设计为一种极其轻量级的发布和订阅消息传递。 对于与需要较小代码占用和/或网络带宽非常宝贵的远程位置的连接很有用。

每个MQTT客户端都订阅某些主题,并在发布者开始推送有关这些主题的消息时接收消息。

如何横向扩展?

水平缩放的目的是在同一应用程序的多个实例之间分配负载。 如果那些实例中的MQTT客户端订阅了相同的主题,则相同的MQTT消息将传递到每个实例,这不是所需的行为。

MQTT消息订阅
MQTT消息订阅

竞争客户端

Spring Cloud Stream将“客户端组”的概念定义如下:

虽然发布-订阅模型使通过共享主题轻松连接应用程序变得很重要,但是通过创建给定应用程序的多个实例进行扩展的能力同样重要。在这样做时,应用程序的不同实例处于竞争的消费者关系中。 ,其中只有一个实例可以处理给定消息。

根据此定义,Spring Cloud Stream允许跨多个客户端分配主题的负载,如下图所示。

MQTT订阅分配负载
MQTT订阅分配负载

示例

在此示例中,MQTT客户端将消息发布到RabbitMQ中的一个主题,并且多个使用者将共享该主题的消息。

1.安装RabbitMQ和MQTT插件

首先,我们将使用Docker映像运行RabbitMQ的实例。 然后,我们将安装MQTT插件。

docker run  -d --hostname vs29 --name vs29 -p 8081:15672 -p 5672:5672 -p 1883:1883 rabbitmq:3-management

现在,让我们检查该容器的启动日志:

>docker ps
CONTAINER ID        IMAGE                     COMMAND                  CREATED             STATUS              PORTS                                                                                                               NAMES
fbd443154bf6        rabbitmq:3-management     "docker-entrypoint.s…"   6 seconds ago       Up 2 seconds        4369/tcp, 0.0.0.0:1883->1883/tcp, 5671/tcp, 15671/tcp, 0.0.0.0:5672->5672/tcp, 25672/tcp, 0.0.0.0:8081->15672/tcp   vs29

>docker logs fbd443154bf6 -f
....
....
...

2019-02-03 07:34:16.709 [info] <0.198.0>
node : rabbit@vs29
home dir : /var/lib/rabbitmq
config file(s) : /etc/rabbitmq/rabbitmq.conf
cookie hash : O+z+vUDvSh3J1vK/lV08Xw==
log(s) : <stdout>
database dir : /var/lib/rabbitmq/mnesia/rabbit@vs29

日志的最后几行指示现在已读取服务器。 现在,让我们安装MQTT插件:

Navigate to the container first:
> docker exec -u 0 -it fbd443154bf6 /bin/bash

Enable the plugin now:
root@vs29:/#rabbitmq-plugins enable rabbitmq_mqtt

2.在RabbitMQ中添加新用户

让我们使用管理UI在RabbitMQ中添加一个新用户。

打开URL http:// RabbitMQhost:8081 /,然后导航至选项卡“ Admin”(RabbitMQ中的默认凭据为guest / guest)。

  • 在“用户名”字段中添加用户名。 然后,让我们添加用户“ client1”。 在“密码”字段中设置密码,让我们将密码设置为“ client1”
  • 点击“添加用户”
在RabbitMQ中添加新用户
在RabbitMQ中添加新用户

默认用户无权访问任何虚拟主机。 单击“ client1”以编辑该用户的权限。 在新页面中,单击“设置权限”以授予用户访问所有虚拟主机的权限。

为了验证一切正常,请使用MQTT客户端将数据推送到我们的新服务器。 在本教程中,我们将使用Mosquitto服务器提供的命令“ mosquitto_pub”和“ mosquitto_sub”。 首先,让我们订阅服务器中的所有主题。 其次,我们将一些数据推送到服务器。

..>mosquitto_pub -h xxx.xxx.xxx.xxx -t /my/topic -m "Hello World" -u client1 -P client1 -p 1883
..>mosquitto_sub -h xxx.xxx.xxx.xxx -t "#" -u client1 -P client1
Hello World

如果一切顺利,您应该会收到“ Hello World”

3.创建消息接收器服务

本教程的目的是在同一应用程序的多个实例之间分配负载。 因此,让我们创建一个简单的服务来使用Spring Boot和Spring Cloud Stream消耗消息。

  • 创建一个新的Spring Boot项目。 您可以使用IDE或Spring Initializer
  • 调整您的mvn文件,使其包含以下内容:
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.2.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>

<properties>
<java.version>1.8</java.version>
<spring-cloud.version>Greenwich.RELEASE</spring-cloud.version>
</properties>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-test-support</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>

<repositories>
<repository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>https://repo.spring.io/milestone</url>
</repository>
</repositories>

现在,让我们添加流监听器(Stream Listener):

import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.SubscribableChannel;

@EnableBinding(MessageSink.InputChannel.class)
public class MessageSink {

    @StreamListener(InputChannel.SINK)
    public void handle(String message) {
System.out.println("new message:" + message + ", from worker :" + Thread.currentThread().getName());
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public interface InputChannel {
        String SINK = "message-sink";

        @Input(SINK)
        SubscribableChannel sink();
    }
}

下一步是定义和配置我们的频道(这是本教程最重要的部分)。 该配置将添加到文件application.yml中:

spring:
  cloud:
    stream:
      bindings:
        message-sink :
         destination: amq.topic
         binder: rabbit
         group: messages-consumer-group
         consumer :
           concurrency: 1
      rabbit:
        bindings:
          message-sink:
            consumer:
              durableSubscription: true
              declareExchange: true
              exchangeDurable: true
              exchangeType: topic
              queueNameGroupOnly: true 
  rabbitmq:
    host: xxx.xxx.xxx.xxx
    password: client1
    username: client1

我们来看一下application.yml中的重要配置:

destination:amq.topic是MQTT插件使用的默认Exchange,因此我们需要订阅它。

group:根据Spring Cloud Documents,“所有订阅给定目标的组都会收到一份已发布数据的副本,但每个组中只有一个成员会从该目标接收给定消息”

Consumer.concurrency:可用于处理此使用者中接收到的消息的最大线程数。我们将此数字修改为任何正值,并且仍然使用“分组消费者”的概念。

queueNameGroupOnly:根据Spring Cloud Documents,“为true时,从名称等于组的队列中使用。否则,队列名称为destination.group。例如,这在使用Spring Cloud Stream从现有RabbitMQ队列中使用时非常有用。确实,这是一个非常重要的配置。忽略此属性将在启动服务时导致许多错误,因为Spring会生成以“ amq”开头的队列名称,RabbitMQ不允许这样做。

4.验证负载分配

让我们启动该服务的两个实例,并使用MQTT客户端推送一些数据。 首先,打开命令Shell窗口,导航到您的项目源,并使用以下命令构建项目

mvn clean compile package

其次,打开两个命令Shell窗口,导航到您的项目文件夹,然后使用以下命令启动服务

java -jar target\balanced_mqtt_client-0.0.1-SNAPSHOT.jar

现在,我们将从MQTT客户端推送一些数据:

>mosquitto_pub -h xxx.xxx.xxx.xxx -t /my/topic -m "message 1" -u client1 -P client1 -p 1883

>mosquitto_pub -h xxx.xxx.xxx.xxx -t /my/topic -m "message 2" -u client1 -P client1 -p 1883

>mosquitto_pub -h xxx.xxx.xxx.xxx -t /my/topic -m "message 3" -u client1 -P client1 -p 1883

>mosquitto_pub -h xxx.xxx.xxx.xxx -t /my/topic -m "message 4" -u client1 -P client1 -p 1883

>mosquitto_pub -h xxx.xxx.xxx.xxx -t /my/topic -m "message 5" -u client1 -P client1 -p 1883

客户端将看到以下消息:

consumer 1:

2019-02-07 10:33:55.848  INFO 14284 --- [           main] o.s.i.a.i.AmqpInboundChannelAdapter      : started inbound.messages-consumer-group
2019-02-07 10:33:55.858  INFO 14284 --- [           main] r.n.cloud.rabbitmq.mqtt.MqttApplication  : Started MqttApplication in 8.824 seconds (JVM running for 9.318)
new message:message 1, from worker :messages-consumer-group-1
new message:message 3, from worker :messages-consumer-group-1
new message:message 5, from worker :messages-consumer-group-1

consumer 2:

O 13832 --- [           main] o.s.i.a.i.AmqpInboundChannelAdapter      : started inbound.messages-consumer-group
O 13832 --- [           main] r.n.cloud.rabbitmq.mqtt.MqttApplication  : Started MqttApplication in 7.8 seconds (JVM running for 8.495)
worker :messages-consumer-group-1
worker :messages-consumer-group-1

如我们所见,消息是在两个使用者之间分配的。

总结

本教程显示了如何使用RabbitMQ服务器和“分组使用方”功能来实现负载平衡的MQTT使用方。

SO资源郑重声明:
1. 本站所有资源来源于用户上传和网络,因此不包含技术服务请大家谅解!如有侵权请邮件联系客服!3187589@qq.com
2. 本站不保证所提供下载的资源的准确性、安全性和完整性,资源仅供下载学习之用!如有链接无法下载、失效或广告,请联系客服处理,有奖励!
3. 您必须在下载后的24个小时之内,从您的电脑中彻底删除上述内容资源!如用于商业或者非法用途,与本站无关,一切后果请用户自负!

SO资源 » 使用RabbitMQ和Spring Cloud的MQTT客户端负载平衡