KafkaOplogTransport.cs 3.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788
  1. using System;
  2. using System.Text;
  3. using Confluent.Kafka;
  4. namespace Aivfo.OperationLog
  5. {
  6. /// <summary>
  7. /// Kafka 传输实现。复用 operate/control 现有 Confluent.Kafka 2.1.1 客户端用法
  8. /// (ProducerBuilder&lt;string,string&gt; → Produce JSON,UTF-8)。
  9. /// Producer 单例、复用;发送在后台线程,失败回调写本地兜底(由上层 pipeline 处理日志)。
  10. /// </summary>
  11. public sealed class KafkaOplogTransport : IOplogTransport
  12. {
  13. private readonly string _topic;
  14. private readonly IProducer<Null, string> _producer;
  15. private readonly Action<string> _onError; // 传输错误回调(写本地启动/错误日志)
  16. private readonly Action<string> _onDeliveryFailed; // 投递失败回调(把整条 JSON 落本地兜底,恢复后补送)
  17. public KafkaOplogTransport(string bootstrapServers, string topic,
  18. Action<string> onError = null, Action<string> onDeliveryFailed = null)
  19. {
  20. _topic = topic;
  21. _onError = onError;
  22. _onDeliveryFailed = onDeliveryFailed;
  23. var config = new ProducerConfig
  24. {
  25. BootstrapServers = bootstrapServers,
  26. // 不阻塞业务:缓冲满时本地缓冲(librdkafka queue),由后台线程驱动。
  27. MessageMaxBytes = 2097152,
  28. // 让发送失败快速回来走兜底,不要长时间挂住后台线程。
  29. MessageTimeoutMs = 10000
  30. };
  31. var builder = new ProducerBuilder<Null, string>(config);
  32. builder.SetErrorHandler((_, e) =>
  33. {
  34. // 仅记录非致命/连接类错误,避免影响业务。
  35. _onError?.Invoke($"[KafkaError] {e.Code} {e.Reason}");
  36. });
  37. _producer = builder.Build();
  38. }
  39. public void Send(string json)
  40. {
  41. // UTF-8 由 Confluent 默认 Serializers.Utf8 处理。
  42. var message = new Message<Null, string> { Value = json };
  43. try
  44. {
  45. _producer.Produce(_topic, message, report =>
  46. {
  47. if (report.Error != null && report.Error.IsError)
  48. {
  49. // broker 宕/超时等投递失败:记错误 + 落本地兜底(恢复后由 pipeline 补送,不静默丢)。
  50. _onError?.Invoke($"[KafkaDeliveryFailed] {report.Error.Reason}");
  51. _onDeliveryFailed?.Invoke(json);
  52. }
  53. });
  54. }
  55. catch (ProduceException<Null, string> ex)
  56. {
  57. // 本地 librdkafka 队列满(QueueFull)等同步异常:同样落兜底,不丢、不抛给上层。
  58. _onError?.Invoke($"[KafkaProduceError] {ex.Error.Reason}");
  59. _onDeliveryFailed?.Invoke(json);
  60. }
  61. }
  62. public void Flush(TimeSpan timeout)
  63. {
  64. try { _producer.Flush(timeout); }
  65. catch (Exception ex) { _onError?.Invoke($"[KafkaFlushError] {ex.Message}"); }
  66. }
  67. public void Dispose()
  68. {
  69. try
  70. {
  71. _producer.Flush(TimeSpan.FromSeconds(5));
  72. _producer.Dispose();
  73. }
  74. catch { /* 退出兜底,吞掉 */ }
  75. }
  76. /// <summary>UTF-8 字节(备用,当前 Confluent 已内部按 UTF-8 序列化字符串)。</summary>
  77. public static byte[] Utf8Bytes(string s) => Encoding.UTF8.GetBytes(s ?? string.Empty);
  78. }
  79. }