using System; using System.Collections.Generic; using System.Threading; using System.Threading.Channels; using System.Threading.Tasks; using Newtonsoft.Json; namespace Aivfo.OperationLog { /// /// 异步管道(14§9/§11/§12): /// 业务线程非阻塞入队(有界 Channel,满了丢弃 + 本地兜底)→ 后台单线程批量取出 → IOplogTransport 发送。 /// 入队绝不阻塞、绝不抛异常影响业务。 /// internal sealed class OperationLogPipeline : IDisposable { private readonly OperationLogOptions _options; private readonly IOplogTransport _transport; private readonly LocalFileWriter _local; private readonly Channel _channel; private readonly CancellationTokenSource _cts = new CancellationTokenSource(); private readonly Task _worker; private volatile bool _disposed; public OperationLogPipeline(OperationLogOptions options, IOplogTransport transport, LocalFileWriter local) { _options = options; _transport = transport; _local = local; _channel = Channel.CreateBounded(new BoundedChannelOptions(Math.Max(16, options.QueueCapacity)) { FullMode = BoundedChannelFullMode.DropWrite, // 满了让 TryWrite 返回 false,由我们走兜底,绝不阻塞 SingleReader = true, SingleWriter = false }); _worker = Task.Factory.StartNew(RunLoop, TaskCreationOptions.LongRunning).Unwrap(); } /// 非阻塞入队。满了走本地兜底(不丢失),永不阻塞/抛异常。 public void Enqueue(OperationLogMessage msg) { if (_disposed || msg == null) return; try { if (!_channel.Writer.TryWrite(msg)) { // 队列满降级:操作级落本地兜底(不丢),调试级本不该到这(Debug 不入 Kafka 队列)。 _local.WriteFallback(SafeJson(msg)); } } catch (Exception ex) { _local.WriteSelfError("Enqueue failed: " + ex.Message); } } private async Task RunLoop() { var reader = _channel.Reader; var batch = new List(_options.BatchSize); try { while (await reader.WaitToReadAsync(_cts.Token).ConfigureAwait(false)) { batch.Clear(); while (batch.Count < _options.BatchSize && reader.TryRead(out var m)) batch.Add(m); foreach (var m in batch) { var json = SafeJson(m); try { _transport.Send(json); } catch (Exception ex) { // 发送失败:本地兜底(恢复后可选补送),并记自身错误。 _local.WriteFallback(json); _local.WriteSelfError("Transport.Send failed: " + ex.Message); } } } } catch (OperationCanceledException) { /* 正常退出 */ } catch (Exception ex) { _local.WriteSelfError("Pipeline loop crashed: " + ex.Message); } } private string SafeJson(OperationLogMessage m) { try { return JsonConvert.SerializeObject(m); } catch (Exception ex) { _local.WriteSelfError("Message serialize failed: " + ex.Message); return "{}"; } } /// 排空队列并 flush 传输(用于测试/退出)。 public void Flush(TimeSpan timeout) { try { var deadline = DateTime.UtcNow + timeout; // 等待 channel 排空(后台线程在消费)。 while (_channel.Reader.Count > 0 && DateTime.UtcNow < deadline) Thread.Sleep(20); _transport.Flush(timeout); } catch (Exception ex) { _local.WriteSelfError("Flush failed: " + ex.Message); } } public void Dispose() { if (_disposed) return; _disposed = true; try { _channel.Writer.TryComplete(); Flush(TimeSpan.FromSeconds(3)); _cts.Cancel(); _worker.Wait(TimeSpan.FromSeconds(3)); } catch { /* ignore */ } finally { _transport.Dispose(); _cts.Dispose(); } } } }