using System; using System.Text; using Confluent.Kafka; namespace Aivfo.OperationLog { /// /// Kafka 传输实现。复用 operate/control 现有 Confluent.Kafka 2.1.1 客户端用法 /// (ProducerBuilder<string,string> → Produce JSON,UTF-8)。 /// Producer 单例、复用;发送在后台线程,失败回调写本地兜底(由上层 pipeline 处理日志)。 /// public sealed class KafkaOplogTransport : IOplogTransport { private readonly string _topic; private readonly IProducer _producer; private readonly Action _onError; // 传输错误回调(写本地启动/错误日志) private readonly Action _onDeliveryFailed; // 投递失败回调(把整条 JSON 落本地兜底,恢复后补送) public KafkaOplogTransport(string bootstrapServers, string topic, Action onError = null, Action onDeliveryFailed = null) { _topic = topic; _onError = onError; _onDeliveryFailed = onDeliveryFailed; var config = new ProducerConfig { BootstrapServers = bootstrapServers, // 不阻塞业务:缓冲满时本地缓冲(librdkafka queue),由后台线程驱动。 MessageMaxBytes = 2097152, // 让发送失败快速回来走兜底,不要长时间挂住后台线程。 MessageTimeoutMs = 10000 }; var builder = new ProducerBuilder(config); builder.SetErrorHandler((_, e) => { // 仅记录非致命/连接类错误,避免影响业务。 _onError?.Invoke($"[KafkaError] {e.Code} {e.Reason}"); }); _producer = builder.Build(); } public void Send(string json) { // UTF-8 由 Confluent 默认 Serializers.Utf8 处理。 var message = new Message { Value = json }; try { _producer.Produce(_topic, message, report => { if (report.Error != null && report.Error.IsError) { // broker 宕/超时等投递失败:记错误 + 落本地兜底(恢复后由 pipeline 补送,不静默丢)。 _onError?.Invoke($"[KafkaDeliveryFailed] {report.Error.Reason}"); _onDeliveryFailed?.Invoke(json); } }); } catch (ProduceException ex) { // 本地 librdkafka 队列满(QueueFull)等同步异常:同样落兜底,不丢、不抛给上层。 _onError?.Invoke($"[KafkaProduceError] {ex.Error.Reason}"); _onDeliveryFailed?.Invoke(json); } } public void Flush(TimeSpan timeout) { try { _producer.Flush(timeout); } catch (Exception ex) { _onError?.Invoke($"[KafkaFlushError] {ex.Message}"); } } public void Dispose() { try { _producer.Flush(TimeSpan.FromSeconds(5)); _producer.Dispose(); } catch { /* 退出兜底,吞掉 */ } } /// UTF-8 字节(备用,当前 Confluent 已内部按 UTF-8 序列化字符串)。 public static byte[] Utf8Bytes(string s) => Encoding.UTF8.GetBytes(s ?? string.Empty); } }