using System;
using System.Text;
using Confluent.Kafka;
namespace Aivfo.OperationLog
{
///
/// Kafka 传输实现。复用 operate/control 现有 Confluent.Kafka 2.1.1 客户端用法
/// (ProducerBuilder<string,string> → Produce JSON,UTF-8)。
/// Producer 单例、复用;发送在后台线程,失败回调写本地兜底(由上层 pipeline 处理日志)。
///
public sealed class KafkaOplogTransport : IOplogTransport
{
private readonly string _topic;
private readonly IProducer _producer;
private readonly Action _onError; // 传输错误回调(写本地启动/错误日志)
private readonly Action _onDeliveryFailed; // 投递失败回调(把整条 JSON 落本地兜底,恢复后补送)
public KafkaOplogTransport(string bootstrapServers, string topic,
Action onError = null, Action onDeliveryFailed = null)
{
_topic = topic;
_onError = onError;
_onDeliveryFailed = onDeliveryFailed;
var config = new ProducerConfig
{
BootstrapServers = bootstrapServers,
// 不阻塞业务:缓冲满时本地缓冲(librdkafka queue),由后台线程驱动。
MessageMaxBytes = 2097152,
// 让发送失败快速回来走兜底,不要长时间挂住后台线程。
MessageTimeoutMs = 10000
};
var builder = new ProducerBuilder(config);
builder.SetErrorHandler((_, e) =>
{
// 仅记录非致命/连接类错误,避免影响业务。
_onError?.Invoke($"[KafkaError] {e.Code} {e.Reason}");
});
_producer = builder.Build();
}
public void Send(string json)
{
// UTF-8 由 Confluent 默认 Serializers.Utf8 处理。
var message = new Message { Value = json };
try
{
_producer.Produce(_topic, message, report =>
{
if (report.Error != null && report.Error.IsError)
{
// broker 宕/超时等投递失败:记错误 + 落本地兜底(恢复后由 pipeline 补送,不静默丢)。
_onError?.Invoke($"[KafkaDeliveryFailed] {report.Error.Reason}");
_onDeliveryFailed?.Invoke(json);
}
});
}
catch (ProduceException ex)
{
// 本地 librdkafka 队列满(QueueFull)等同步异常:同样落兜底,不丢、不抛给上层。
_onError?.Invoke($"[KafkaProduceError] {ex.Error.Reason}");
_onDeliveryFailed?.Invoke(json);
}
}
public void Flush(TimeSpan timeout)
{
try { _producer.Flush(timeout); }
catch (Exception ex) { _onError?.Invoke($"[KafkaFlushError] {ex.Message}"); }
}
public void Dispose()
{
try
{
_producer.Flush(TimeSpan.FromSeconds(5));
_producer.Dispose();
}
catch { /* 退出兜底,吞掉 */ }
}
/// UTF-8 字节(备用,当前 Confluent 已内部按 UTF-8 序列化字符串)。
public static byte[] Utf8Bytes(string s) => Encoding.UTF8.GetBytes(s ?? string.Empty);
}
}