KafkaOplogTransport.cs 2.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374
  1. using System;
  2. using System.Text;
  3. using Confluent.Kafka;
  4. namespace Aivfo.OperationLog
  5. {
  6. /// <summary>
  7. /// Kafka 传输实现。复用 operate/control 现有 Confluent.Kafka 2.1.1 客户端用法
  8. /// (ProducerBuilder&lt;string,string&gt; → Produce JSON,UTF-8)。
  9. /// Producer 单例、复用;发送在后台线程,失败回调写本地兜底(由上层 pipeline 处理日志)。
  10. /// </summary>
  11. public sealed class KafkaOplogTransport : IOplogTransport
  12. {
  13. private readonly string _topic;
  14. private readonly IProducer<Null, string> _producer;
  15. private readonly Action<string> _onError; // 传输错误回调(写本地启动/错误日志)
  16. public KafkaOplogTransport(string bootstrapServers, string topic, Action<string> onError = null)
  17. {
  18. _topic = topic;
  19. _onError = onError;
  20. var config = new ProducerConfig
  21. {
  22. BootstrapServers = bootstrapServers,
  23. // 不阻塞业务:缓冲满时本地缓冲(librdkafka queue),由后台线程驱动。
  24. MessageMaxBytes = 2097152,
  25. // 让发送失败快速回来走兜底,不要长时间挂住后台线程。
  26. MessageTimeoutMs = 10000
  27. };
  28. var builder = new ProducerBuilder<Null, string>(config);
  29. builder.SetErrorHandler((_, e) =>
  30. {
  31. // 仅记录非致命/连接类错误,避免影响业务。
  32. _onError?.Invoke($"[KafkaError] {e.Code} {e.Reason}");
  33. });
  34. _producer = builder.Build();
  35. }
  36. public void Send(string json)
  37. {
  38. // UTF-8 由 Confluent 默认 Serializers.Utf8 处理。
  39. var message = new Message<Null, string> { Value = json };
  40. _producer.Produce(_topic, message, report =>
  41. {
  42. if (report.Error != null && report.Error.IsError)
  43. {
  44. _onError?.Invoke($"[KafkaDeliveryFailed] {report.Error.Reason} | {json}");
  45. }
  46. });
  47. }
  48. public void Flush(TimeSpan timeout)
  49. {
  50. try { _producer.Flush(timeout); }
  51. catch (Exception ex) { _onError?.Invoke($"[KafkaFlushError] {ex.Message}"); }
  52. }
  53. public void Dispose()
  54. {
  55. try
  56. {
  57. _producer.Flush(TimeSpan.FromSeconds(5));
  58. _producer.Dispose();
  59. }
  60. catch { /* 退出兜底,吞掉 */ }
  61. }
  62. /// <summary>UTF-8 字节(备用,当前 Confluent 已内部按 UTF-8 序列化字符串)。</summary>
  63. public static byte[] Utf8Bytes(string s) => Encoding.UTF8.GetBytes(s ?? string.Empty);
  64. }
  65. }