使用.NET C#MQTT客户端库将MQTT发布消息和订阅消息到MQTT Broker(CloudMQTT)

最近,我正在评估一些.NET C#MQTT客户端库。 经过一些研究,我发现了以下有趣的.NET C#MQTT客户端库:

  • MqttDotNet
  • paho.mqtt.m2mqtt
  • MQTTnet

经过评估,我发现MQTTnet是涵盖我所有用例的一个。 在本文中,我将分享我们如何使用MQTTnet .NET C#MQTT客户端库向MQTT Broker发布和订阅消息。 我将在本文中使用CloudMQTT MQTT Broker Free Instance。

添加“ MQTTnet.Extensions.ManagedClient” Nuget程序包

MQTTnet.Extensions.ManagedClient
MQTTnet.Extensions.ManagedClient

使用托管客户端的好处

  • 自动重新连接
  • 内部排队支持
  • 支持存储,即使在应用程序重新启动后也可以发送消息
  • 与服务器断开连接后,无需手动订阅

连接到MQTT代理(CloudMQTT)

/// <summary>
/// Connect to broker.
/// </summary>
/// <returns>Task.</returns>
public static async Task ConnectAsync()
{
  string clientId = Guid.NewGuid().ToString();
  string mqttURI = {REPLACE THIS WITH YOUR MQTT SERVER URI HERE}
  string mqttUser = { REPLACE THIS WITH YOUR MQTT USER HERE }
  string mqttPassword = { REPLACE THIS WITH YOUR MQTT PASSWORD HERE }
  int mqttPort = { REPLACE THIS WITH YOUR MQTT PORT HERE }
  bool mqttSecure = {IF YOU ARE USING SSL Port THEN SET true OTHERWISE SET false}

  var messageBuilder = new MqttClientOptionsBuilder()
    .WithClientId(clientId)
    .WithCredentials(mqttUser, mqttPassword)
    .WithTcpServer(mqttURI, mqttPort)
    .WithCleanSession();

  var options = mqttSecure
    ? messageBuilder
      .WithTls()
      .Build()
    : messageBuilder
      .Build();

  var managedOptions = new ManagedMqttClientOptionsBuilder()
    .WithAutoReconnectDelay(TimeSpan.FromSeconds(5))
    .WithClientOptions(options)
    .Build();

  client = new MqttFactory().CreateManagedMqttClient();

  await client.StartAsync(managedOptions);
}

设置CloudMQTT Broker实例并获取连接所需的实例信息

1.创建新实例

创建CloudMQTT实例
创建CloudMQTT实例

2.选择计划和名称

CloudMQTT-设置计划和名称
CloudMQTT-设置计划和名称

3.选择地区和数据中心

CloudMQTT-选择地区和数据中心
CloudMQTT-选择地区和数据中心

4.确认新实例

CloudMQTT-确认新实例
CloudMQTT-确认新实例

5.选择新创建的实例

CloudMQTT-选择新创建的实例
CloudMQTT-选择新创建的实例

6.获取实例信息

CloudMQTT-获取实例信息
CloudMQTT-获取实例信息

将消息发布到MQTT代理(CloudMQTT)

/// <summary>
/// Publish Message.
/// </summary>
/// <param name="topic">Topic.</param>
/// <param name="payload">Payload.</param>
/// <param name="retainFlag">Retain flag.</param>
/// <param name="qos">Quality of Service.</param>
/// <returns>Task.</returns>
public static async Task PublishAsync(string topic, string payload, bool retainFlag = true, int qos = 1) => 
  await client.PublishAsync(new MqttApplicationMessageBuilder()
    .WithTopic(topic)
    .WithPayload(payload)
    .WithQualityOfServiceLevel((MQTTnet.Protocol.MqttQualityOfServiceLevel)qos)
    .WithRetainFlag(retainFlag)
    .Build());

在这里,服务质量(QoS)可以是:

  • 最多一次(0)
  • 至少一次(1)
  • 恰好一次(2)

保留消息标志可以为真/假

发布时,我们可以通过将保留消息标志设置为true来告诉MQTT代理保留有关该主题的最后一条消息。

订阅来自MQTT代理(CloudMQTT)的消息

/// <summary>
/// Subscribe topic.
/// </summary>
/// <param name="topic">Topic.</param>
/// <param name="qos">Quality of Service.</param>
/// <returns>Task.</returns>
public static async Task SubscribeAsync(string topic, int qos = 1) =>
  await client.SubscribeAsync(new TopicFilterBuilder()
    .WithTopic(topic)
    .WithQualityOfServiceLevel((MQTTnet.Protocol.MqttQualityOfServiceLevel)qos)
    .Build());

处理事件

连接Handler

client.UseConnectedHandler(e =>
{
  Console.WriteLine("Connected successfully with MQTT Brokers.");
});

关闭连接

client.UseDisconnectedHandler(e =>
{
  Console.WriteLine("Disconnected from MQTT Brokers.");
});

接收消息

client.UseApplicationMessageReceivedHandler(e =>
{
  try
  {
    string topic = e.ApplicationMessage.Topic;

    if (string.IsNullOrWhiteSpace(topic) == false)
    {
      string payload = Encoding.UTF8.GetString(e.ApplicationMessage.Payload);
      Console.WriteLine($"Topic: {topic}. Message Received: {payload}");
    }
  }
  catch (Exception ex)
  {
    Console.WriteLine(ex.Message, ex);
  }
});

在MQTT Broker(CloudMQTT)上检查已发布的消息

在MQTT Broker(CloudMQTT)上检查已发布的消息
在MQTT Broker(CloudMQTT)上检查已发布的消息
SO资源郑重声明:
1. 本站所有资源来源于用户上传和网络,因此不包含技术服务请大家谅解!如有侵权请邮件联系客服!3187589@qq.com
2. 本站不保证所提供下载的资源的准确性、安全性和完整性,资源仅供下载学习之用!如有链接无法下载、失效或广告,请联系客服处理,有奖励!
3. 您必须在下载后的24个小时之内,从您的电脑中彻底删除上述内容资源!如用于商业或者非法用途,与本站无关,一切后果请用户自负!

SO资源 » 使用.NET C#MQTT客户端库将MQTT发布消息和订阅消息到MQTT Broker(CloudMQTT)