using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using Newtonsoft.Json;
namespace Aivfo.OperationLog
{
///
/// 异步管道(14§9/§11/§12):
/// 业务线程非阻塞入队(有界 Channel,满了丢弃 + 本地兜底)→ 后台单线程批量取出 → IOplogTransport 发送。
/// 入队绝不阻塞、绝不抛异常影响业务。
///
internal sealed class OperationLogPipeline : IDisposable
{
private readonly OperationLogOptions _options;
private readonly IOplogTransport _transport;
private readonly LocalFileWriter _local;
private readonly Channel _channel;
private readonly CancellationTokenSource _cts = new CancellationTokenSource();
private readonly Task _worker;
private volatile bool _disposed;
public OperationLogPipeline(OperationLogOptions options, IOplogTransport transport, LocalFileWriter local)
{
_options = options;
_transport = transport;
_local = local;
_channel = Channel.CreateBounded(new BoundedChannelOptions(Math.Max(16, options.QueueCapacity))
{
FullMode = BoundedChannelFullMode.DropWrite, // 满了让 TryWrite 返回 false,由我们走兜底,绝不阻塞
SingleReader = true,
SingleWriter = false
});
_worker = Task.Factory.StartNew(RunLoop, TaskCreationOptions.LongRunning).Unwrap();
}
/// 非阻塞入队。满了走本地兜底(不丢失),永不阻塞/抛异常。
public void Enqueue(OperationLogMessage msg)
{
if (_disposed || msg == null) return;
try
{
if (!_channel.Writer.TryWrite(msg))
{
// 队列满降级:操作级落本地兜底(不丢),调试级本不该到这(Debug 不入 Kafka 队列)。
_local.WriteFallback(SafeJson(msg));
}
}
catch (Exception ex)
{
_local.WriteSelfError("Enqueue failed: " + ex.Message);
}
}
private async Task RunLoop()
{
var reader = _channel.Reader;
var batch = new List(_options.BatchSize);
try
{
while (await reader.WaitToReadAsync(_cts.Token).ConfigureAwait(false))
{
batch.Clear();
while (batch.Count < _options.BatchSize && reader.TryRead(out var m))
batch.Add(m);
foreach (var m in batch)
{
var json = SafeJson(m);
try
{
_transport.Send(json);
}
catch (Exception ex)
{
// 发送失败:本地兜底(恢复后可选补送),并记自身错误。
_local.WriteFallback(json);
_local.WriteSelfError("Transport.Send failed: " + ex.Message);
}
}
}
}
catch (OperationCanceledException) { /* 正常退出 */ }
catch (Exception ex)
{
_local.WriteSelfError("Pipeline loop crashed: " + ex.Message);
}
}
private string SafeJson(OperationLogMessage m)
{
try { return JsonConvert.SerializeObject(m); }
catch (Exception ex)
{
_local.WriteSelfError("Message serialize failed: " + ex.Message);
return "{}";
}
}
/// 排空队列并 flush 传输(用于测试/退出)。
public void Flush(TimeSpan timeout)
{
try
{
var deadline = DateTime.UtcNow + timeout;
// 等待 channel 排空(后台线程在消费)。
while (_channel.Reader.Count > 0 && DateTime.UtcNow < deadline)
Thread.Sleep(20);
_transport.Flush(timeout);
}
catch (Exception ex)
{
_local.WriteSelfError("Flush failed: " + ex.Message);
}
}
public void Dispose()
{
if (_disposed) return;
_disposed = true;
try
{
_channel.Writer.TryComplete();
Flush(TimeSpan.FromSeconds(3));
_cts.Cancel();
_worker.Wait(TimeSpan.FromSeconds(3));
}
catch { /* ignore */ }
finally
{
_transport.Dispose();
_cts.Dispose();
}
}
}
}