| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788 |
- using System;
- using System.Text;
- using Confluent.Kafka;
- namespace Aivfo.OperationLog
- {
- /// <summary>
- /// Kafka 传输实现。复用 operate/control 现有 Confluent.Kafka 2.1.1 客户端用法
- /// (ProducerBuilder<string,string> → Produce JSON,UTF-8)。
- /// Producer 单例、复用;发送在后台线程,失败回调写本地兜底(由上层 pipeline 处理日志)。
- /// </summary>
- public sealed class KafkaOplogTransport : IOplogTransport
- {
- private readonly string _topic;
- private readonly IProducer<Null, string> _producer;
- private readonly Action<string> _onError; // 传输错误回调(写本地启动/错误日志)
- private readonly Action<string> _onDeliveryFailed; // 投递失败回调(把整条 JSON 落本地兜底,恢复后补送)
- public KafkaOplogTransport(string bootstrapServers, string topic,
- Action<string> onError = null, Action<string> onDeliveryFailed = null)
- {
- _topic = topic;
- _onError = onError;
- _onDeliveryFailed = onDeliveryFailed;
- var config = new ProducerConfig
- {
- BootstrapServers = bootstrapServers,
- // 不阻塞业务:缓冲满时本地缓冲(librdkafka queue),由后台线程驱动。
- MessageMaxBytes = 2097152,
- // 让发送失败快速回来走兜底,不要长时间挂住后台线程。
- MessageTimeoutMs = 10000
- };
- var builder = new ProducerBuilder<Null, string>(config);
- builder.SetErrorHandler((_, e) =>
- {
- // 仅记录非致命/连接类错误,避免影响业务。
- _onError?.Invoke($"[KafkaError] {e.Code} {e.Reason}");
- });
- _producer = builder.Build();
- }
- public void Send(string json)
- {
- // UTF-8 由 Confluent 默认 Serializers.Utf8 处理。
- var message = new Message<Null, string> { Value = json };
- try
- {
- _producer.Produce(_topic, message, report =>
- {
- if (report.Error != null && report.Error.IsError)
- {
- // broker 宕/超时等投递失败:记错误 + 落本地兜底(恢复后由 pipeline 补送,不静默丢)。
- _onError?.Invoke($"[KafkaDeliveryFailed] {report.Error.Reason}");
- _onDeliveryFailed?.Invoke(json);
- }
- });
- }
- catch (ProduceException<Null, string> ex)
- {
- // 本地 librdkafka 队列满(QueueFull)等同步异常:同样落兜底,不丢、不抛给上层。
- _onError?.Invoke($"[KafkaProduceError] {ex.Error.Reason}");
- _onDeliveryFailed?.Invoke(json);
- }
- }
- public void Flush(TimeSpan timeout)
- {
- try { _producer.Flush(timeout); }
- catch (Exception ex) { _onError?.Invoke($"[KafkaFlushError] {ex.Message}"); }
- }
- public void Dispose()
- {
- try
- {
- _producer.Flush(TimeSpan.FromSeconds(5));
- _producer.Dispose();
- }
- catch { /* 退出兜底,吞掉 */ }
- }
- /// <summary>UTF-8 字节(备用,当前 Confluent 已内部按 UTF-8 序列化字符串)。</summary>
- public static byte[] Utf8Bytes(string s) => Encoding.UTF8.GetBytes(s ?? string.Empty);
- }
- }
|