| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140 |
- using System;
- using System.Collections.Generic;
- using System.Threading;
- using System.Threading.Channels;
- using System.Threading.Tasks;
- using Newtonsoft.Json;
- namespace Aivfo.OperationLog
- {
- /// <summary>
- /// 异步管道(14§9/§11/§12):
- /// 业务线程非阻塞入队(有界 Channel,满了丢弃 + 本地兜底)→ 后台单线程批量取出 → IOplogTransport 发送。
- /// 入队绝不阻塞、绝不抛异常影响业务。
- /// </summary>
- internal sealed class OperationLogPipeline : IDisposable
- {
- private readonly OperationLogOptions _options;
- private readonly IOplogTransport _transport;
- private readonly LocalFileWriter _local;
- private readonly Channel<OperationLogMessage> _channel;
- private readonly CancellationTokenSource _cts = new CancellationTokenSource();
- private readonly Task _worker;
- private volatile bool _disposed;
- public OperationLogPipeline(OperationLogOptions options, IOplogTransport transport, LocalFileWriter local)
- {
- _options = options;
- _transport = transport;
- _local = local;
- _channel = Channel.CreateBounded<OperationLogMessage>(new BoundedChannelOptions(Math.Max(16, options.QueueCapacity))
- {
- FullMode = BoundedChannelFullMode.DropWrite, // 满了让 TryWrite 返回 false,由我们走兜底,绝不阻塞
- SingleReader = true,
- SingleWriter = false
- });
- _worker = Task.Factory.StartNew(RunLoop, TaskCreationOptions.LongRunning).Unwrap();
- }
- /// <summary>非阻塞入队。满了走本地兜底(不丢失),永不阻塞/抛异常。</summary>
- public void Enqueue(OperationLogMessage msg)
- {
- if (_disposed || msg == null) return;
- try
- {
- if (!_channel.Writer.TryWrite(msg))
- {
- // 队列满降级:操作级落本地兜底(不丢),调试级本不该到这(Debug 不入 Kafka 队列)。
- _local.WriteFallback(SafeJson(msg));
- }
- }
- catch (Exception ex)
- {
- _local.WriteSelfError("Enqueue failed: " + ex.Message);
- }
- }
- private async Task RunLoop()
- {
- var reader = _channel.Reader;
- var batch = new List<OperationLogMessage>(_options.BatchSize);
- try
- {
- while (await reader.WaitToReadAsync(_cts.Token).ConfigureAwait(false))
- {
- batch.Clear();
- while (batch.Count < _options.BatchSize && reader.TryRead(out var m))
- batch.Add(m);
- foreach (var m in batch)
- {
- var json = SafeJson(m);
- try
- {
- _transport.Send(json);
- }
- catch (Exception ex)
- {
- // 发送失败:本地兜底(恢复后可选补送),并记自身错误。
- _local.WriteFallback(json);
- _local.WriteSelfError("Transport.Send failed: " + ex.Message);
- }
- }
- }
- }
- catch (OperationCanceledException) { /* 正常退出 */ }
- catch (Exception ex)
- {
- _local.WriteSelfError("Pipeline loop crashed: " + ex.Message);
- }
- }
- private string SafeJson(OperationLogMessage m)
- {
- try { return JsonConvert.SerializeObject(m); }
- catch (Exception ex)
- {
- _local.WriteSelfError("Message serialize failed: " + ex.Message);
- return "{}";
- }
- }
- /// <summary>排空队列并 flush 传输(用于测试/退出)。</summary>
- public void Flush(TimeSpan timeout)
- {
- try
- {
- var deadline = DateTime.UtcNow + timeout;
- // 等待 channel 排空(后台线程在消费)。
- while (_channel.Reader.Count > 0 && DateTime.UtcNow < deadline)
- Thread.Sleep(20);
- _transport.Flush(timeout);
- }
- catch (Exception ex)
- {
- _local.WriteSelfError("Flush failed: " + ex.Message);
- }
- }
- public void Dispose()
- {
- if (_disposed) return;
- _disposed = true;
- try
- {
- _channel.Writer.TryComplete();
- Flush(TimeSpan.FromSeconds(3));
- _cts.Cancel();
- _worker.Wait(TimeSpan.FromSeconds(3));
- }
- catch { /* ignore */ }
- finally
- {
- _transport.Dispose();
- _cts.Dispose();
- }
- }
- }
- }
|