| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244 |
- using ivf_tl_Entity.GlobalEnums;
- using MQTTnet;
- using MQTTnet.Client;
- using MQTTnet.Protocol;
- using System;
- using System.Collections.Generic;
- using System.Linq;
- using System.Text;
- using System.Threading;
- using System.Threading.Tasks;
- namespace ivf_tl_Services
- {
- public class MqttHelper
- {
- public event Action<string> MessEvent;
- public event Action<string, LogEnum> ErrorLogEvent;
- public event Action<Exception, string, LogEnum> ExceptionLogEvent;
- private string _ip;
- private int _port;
- private string _username;
- private string _password;
- private string _clientId;
- private string _topicName;
- private IMqttClient _mqttClient;
- private MqttClientOptions clientOptions;
- public bool IsConnect = false;
- public MqttHelper(string ip, int port, string username, string password, string clientId, string topicName)
- {
- this._ip = ip;
- this._port = port;
- this._username = username;
- this._password = password;
- this._clientId = clientId;
- this._topicName = topicName;
- }
- public void DisPoseMqtt()
- {
- _mqttClient.Dispose();
- _mqttClient = null;
- }
- public void StartMqtt()
- {
- try
- {
- var mqttClientOptionsBuilder = new MqttClientOptionsBuilder()
- .WithTcpServer(_ip, _port)
- .WithCredentials(_username, _password)
- .WithClientId(_clientId)
- .WithCleanSession()
- .WithoutPacketFragmentation()
- .WithTls(new MqttClientOptionsBuilderTlsParameters()
- {
- UseTls = false,
- });
- clientOptions = mqttClientOptionsBuilder.Build();
- _mqttClient = new MqttFactory().CreateMqttClient();
- _mqttClient.ConnectedAsync -= _mqttClient_ConnectedAsync;
- _mqttClient.DisconnectedAsync -= _mqttClient_DisconnectedAsync;
- _mqttClient.ApplicationMessageReceivedAsync -= _mqttClient_ApplicationMessageReceivedAsync;
- _mqttClient.ConnectedAsync += _mqttClient_ConnectedAsync;
- _mqttClient.DisconnectedAsync += _mqttClient_DisconnectedAsync;
- _mqttClient.ApplicationMessageReceivedAsync += _mqttClient_ApplicationMessageReceivedAsync;
- Task.Run(() => ClientMqtt());
- //Task.Factory.StartNew(async () =>
- //{
- // while (true)
- // {
- // try
- // {
- // if (IsDispose)
- // {
- // ErrorLogEvent?.Invoke($"[clientId:{_clientId}][_topicName:{_topicName}][_ip:{_ip}][_port:{_port}] 主动关闭客户端", LogEnum.MqttClient);
- // await _mqttClient.DisconnectAsync();
- // _mqttClient.Dispose();
- // _mqttClient = null;
- // return;
- // }
- // else
- // {
- // if (!await _mqttClient.TryPingAsync())
- // {
- // ErrorLogEvent?.Invoke($"[clientId:{_clientId}][_topicName:{_topicName}][_ip:{_ip}][_port:{_port}] 客户端尝试连接服务器", LogEnum.MqttClient);
- // }
- // }
- // }
- // catch (Exception ex)
- // {
- // ExceptionLogEvent?.Invoke(ex, $"[clientId:{_clientId}][_topicName:{_topicName}][_ip:{_ip}][_port:{_port}] MQTT连接服务器", LogEnum.RunException);
- // }
- // finally
- // {
- // await Task.Delay(10000);
- // }
- // }
- //}, TaskCreationOptions.LongRunning);
- }
- catch (Exception ex)
- {
- ExceptionLogEvent?.Invoke(ex, "MQTT初始化", LogEnum.RunException);
- }
- }
- public void ClientMqtt()
- {
- bool b = false;
- for (int i = 1; i <= 3; i++)
- {
- ErrorLogEvent?.Invoke($"[clientId:{_clientId}][_topicName:{_topicName}][_ip:{_ip}][_port:{_port}] 第{i}次连接服务器", LogEnum.MqttClient);
- try
- {
- var a = _mqttClient.ConnectAsync(clientOptions).GetAwaiter().GetResult();
- if (a == null)
- {
- ErrorLogEvent?.Invoke($"[clientId:{_clientId}][_topicName:{_topicName}][_ip:{_ip}][_port:{_port}] 第{i}次连接服务器失败,返回空", LogEnum.RunError);
- Thread.Sleep(1000);
- continue;
- }
- if (a.ResultCode == MqttClientConnectResultCode.Success)
- {
- IsConnect = true;
- b = true;
- break;
- }
- else
- {
- ErrorLogEvent?.Invoke($"[clientId:{_clientId}][_topicName:{_topicName}][_ip:{_ip}][_port:{_port}] 第{i}次连接服务器失败{a.ResultCode}", LogEnum.RunError);
- Thread.Sleep(1000);
- }
- }
- catch (Exception ex)
- {
- ExceptionLogEvent?.Invoke(ex, $"[clientId:{_clientId}][_topicName:{_topicName}][_ip:{_ip}][_port:{_port}] 第{i}次连接MQTT服务器", LogEnum.RunException);
- Thread.Sleep(1000);
- }
- }
- if (!b)
- {
- Thread.Sleep(10000);
- ClientMqtt();
- }
- }
- /// <summary>
- /// 接收消息
- /// </summary>
- /// <param name="arg"></param>
- /// <returns></returns>
- /// <exception cref="NotImplementedException"></exception>
- private Task _mqttClient_ApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs arg)
- {
- try
- {
- //ErrorLogEvent?.Invoke($"ApplicationMessageReceivedAsync:客户端ID=【{arg.ClientId}】接收到消息。 Topic主题=【{arg.ApplicationMessage.Topic}】 消息=【{Encoding.UTF8.GetString(arg.ApplicationMessage.PayloadSegment)}】 qos等级=【{arg.ApplicationMessage.QualityOfServiceLevel}】", LogEnum.MqttClient);
- MessEvent?.Invoke(Encoding.UTF8.GetString(arg.ApplicationMessage.PayloadSegment));
- return Task.CompletedTask;
- }
- catch (Exception ex)
- {
- ExceptionLogEvent?.Invoke(ex, "MQTT消息处理", LogEnum.RunException);
- return Task.CompletedTask;
- }
- }
- /// <summary>
- /// 客户端连接关闭
- /// </summary>
- /// <param name="arg"></param>
- /// <returns></returns>
- /// <exception cref="NotImplementedException"></exception>
- private Task _mqttClient_DisconnectedAsync(MqttClientDisconnectedEventArgs arg)
- {
- try
- {
- if (!IsConnect) return Task.CompletedTask;
- ErrorLogEvent?.Invoke($"[clientId:{_clientId}][_topicName:{_topicName}][_ip:{_ip}][_port:{_port}]客户端已断开与服务端的连接……", LogEnum.MqttClient);
- ErrorLogEvent?.Invoke($"[clientId:{_clientId}][_topicName:{_topicName}][_ip:{_ip}][_port:{_port}]客户端已断开与服务端的连接……", LogEnum.RunError);
- IsConnect = false;
- ClientMqtt();
- return Task.CompletedTask;
- }
- catch (Exception ex)
- {
- ExceptionLogEvent?.Invoke(ex, "MQTT客户端连接关闭", LogEnum.RunException);
- return Task.CompletedTask;
- }
- }
- /// <summary>
- /// 客户端连接成功
- /// </summary>
- /// <param name="arg"></param>
- /// <returns></returns>
- /// <exception cref="NotImplementedException"></exception>
- private Task _mqttClient_ConnectedAsync(MqttClientConnectedEventArgs arg)
- {
- // 订阅消息主题 MqttQualityOfServiceLevel: (QoS):
- // 0 最多一次,接收者不确认收到消息,并且消息不被发送者存储和重新发送提供与底层 TCP 协议相同的保证。
- // 1: 保证一条消息至少有一次会传递给接收方。发送方存储消息,直到它从接收方收到确认收到消息的数据包。一条消息可以多次发送或传递。
- // 2: 保证每条消息仅由预期的收件人接收一次。级别2是最安全和最慢的服务质量级别,保证由发送方和接收方之间的至少两个请求/响应(四次握手)。
- try
- {
- ErrorLogEvent?.Invoke($"[clientId:{_clientId}][_topicName:{_topicName}][_ip:{_ip}][_port:{_port}]客户端连接服务器成功", LogEnum.MqttClient);
- _mqttClient.SubscribeAsync(_topicName, MQTTnet.Protocol.MqttQualityOfServiceLevel.ExactlyOnce);
- return Task.CompletedTask;
- }
- catch (Exception ex)
- {
- ExceptionLogEvent?.Invoke(ex, $"MQTT订阅{_topicName}", LogEnum.RunException);
- return Task.CompletedTask;
- }
- }
- public async Task PublishAsync(string data)
- {
- try
- {
- var message = new MqttApplicationMessage
- {
- Topic = "TL/House/collecting-data",
- PayloadSegment = Encoding.UTF8.GetBytes(data),
- QualityOfServiceLevel = MqttQualityOfServiceLevel.ExactlyOnce,
- Retain = false,
- // 服务端是否保留消息。true为保留,如果有新的订阅者连接,就会立马收到该消息。
- };
- await _mqttClient.PublishAsync(message);
- }
- catch (Exception ex)
- {
- ExceptionLogEvent?.Invoke(ex, $"MQTT发送消息", LogEnum.RunException);
- }
- }
- }
- }
|