| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201 |
- using System;
- using System.Collections.Generic;
- using System.Threading;
- using System.Threading.Channels;
- using System.Threading.Tasks;
- using Newtonsoft.Json;
- namespace Aivfo.OperationLog
- {
- /// <summary>
- /// 异步管道(14§9/§11/§12):
- /// 业务线程非阻塞入队(有界 Channel,满了丢弃 + 本地兜底)→ 后台单线程批量取出 → IOplogTransport 发送。
- /// 入队绝不阻塞、绝不抛异常影响业务。
- /// </summary>
- internal sealed class OperationLogPipeline : IDisposable
- {
- private readonly OperationLogOptions _options;
- private readonly IOplogTransport _transport;
- private readonly LocalFileWriter _local;
- private readonly Channel<OperationLogMessage> _channel;
- private readonly CancellationTokenSource _cts = new CancellationTokenSource();
- private readonly Task _worker;
- private readonly Timer _resendTimer; // G3-3:兜底补送定时器
- private int _resending; // 补送重入保护(0=空闲,1=进行中)
- private volatile bool _disposed;
- public OperationLogPipeline(OperationLogOptions options, IOplogTransport transport, LocalFileWriter local)
- {
- _options = options;
- _transport = transport;
- _local = local;
- _channel = Channel.CreateBounded<OperationLogMessage>(new BoundedChannelOptions(Math.Max(16, options.QueueCapacity))
- {
- FullMode = BoundedChannelFullMode.DropWrite, // 满了让 TryWrite 返回 false,由我们走兜底,绝不阻塞
- SingleReader = true,
- SingleWriter = false
- });
- _worker = Task.Factory.StartNew(RunLoop, TaskCreationOptions.LongRunning).Unwrap();
- // G3-3 / 14§11:后台周期补送本地兜底文件(Kafka 恢复后把没发出的补回)。
- if (_options.EnableFallbackResend)
- {
- var periodMs = Math.Max(5, _options.FallbackResendSeconds) * 1000;
- _resendTimer = new Timer(TryResend, null, periodMs, periodMs);
- }
- }
- /// <summary>非阻塞入队。满了走本地兜底(不丢失),永不阻塞/抛异常。</summary>
- public void Enqueue(OperationLogMessage msg)
- {
- if (_disposed || msg == null) return;
- try
- {
- if (!_channel.Writer.TryWrite(msg))
- {
- // 队列满降级:操作级落本地兜底(不丢),调试级本不该到这(Debug 不入 Kafka 队列)。
- _local.WriteFallback(SafeJson(msg));
- }
- }
- catch (Exception ex)
- {
- _local.WriteSelfError("Enqueue failed: " + ex.Message);
- }
- }
- private async Task RunLoop()
- {
- var reader = _channel.Reader;
- var batch = new List<OperationLogMessage>(_options.BatchSize);
- try
- {
- while (await reader.WaitToReadAsync(_cts.Token).ConfigureAwait(false))
- {
- batch.Clear();
- while (batch.Count < _options.BatchSize && reader.TryRead(out var m))
- batch.Add(m);
- foreach (var m in batch)
- {
- var json = SafeJson(m);
- try
- {
- _transport.Send(json);
- }
- catch (Exception ex)
- {
- // 发送失败:本地兜底(恢复后可选补送),并记自身错误。
- _local.WriteFallback(json);
- _local.WriteSelfError("Transport.Send failed: " + ex.Message);
- }
- }
- }
- }
- catch (OperationCanceledException) { /* 正常退出 */ }
- catch (Exception ex)
- {
- _local.WriteSelfError("Pipeline loop crashed: " + ex.Message);
- }
- }
- private string SafeJson(OperationLogMessage m)
- {
- try { return JsonConvert.SerializeObject(m); }
- catch (Exception ex)
- {
- _local.WriteSelfError("Message serialize failed: " + ex.Message);
- return "{}";
- }
- }
- /// <summary>
- /// G3-3 / 14§11 兜底补送:周期把 oplog-fallback-*.log 里没发出的日志重新投递。
- /// 做法:认领文件(重命名 .resending,避开 append 竞争)→ 逐行 Send → 删除认领文件。
- /// 若此刻 Kafka 仍不可用,投递失败会由传输层 onDeliveryFailed 重新落新兜底文件,下轮再试——
- /// 故删除认领文件不会丢日志(最坏是极小概率进程崩溃窗口内的重复/丢失,best-effort)。
- /// 重入保护 + 每轮限流 + 全 try 兜底,绝不影响业务。
- /// </summary>
- private void TryResend(object state)
- {
- if (_disposed || !_options.EnableFallbackResend) return;
- // 同一时刻只允许一个补送在跑。
- if (Interlocked.CompareExchange(ref _resending, 1, 0) != 0) return;
- try
- {
- var files = _local.ListFallbackFiles();
- if (files == null || files.Length == 0) return;
- var limit = Math.Min(files.Length, Math.Max(1, _options.ResendFilesPerCycle));
- int resentLines = 0;
- for (int i = 0; i < limit; i++)
- {
- if (_disposed) break;
- var claimed = _local.ClaimFallback(files[i]);
- if (claimed == null) continue; // 已被别处认领/不存在
- var lines = _local.ReadLines(claimed);
- foreach (var line in lines)
- {
- if (string.IsNullOrWhiteSpace(line)) continue;
- try { _transport.Send(line); resentLines++; }
- catch (Exception ex) { _local.WriteSelfError("Resend Send failed: " + ex.Message); }
- }
- _local.DeleteQuietly(claimed);
- }
- // 尽量把刚补送的推到 broker(短超时,不长时间挂后台线程)。
- if (resentLines > 0)
- {
- try { _transport.Flush(TimeSpan.FromSeconds(2)); } catch { }
- _local.WriteSelfError($"Fallback resend cycle: 重投 {resentLines} 条。");
- }
- }
- catch (Exception ex)
- {
- _local.WriteSelfError("Resend cycle failed: " + ex.Message);
- }
- finally
- {
- Interlocked.Exchange(ref _resending, 0);
- }
- }
- /// <summary>排空队列并 flush 传输(用于测试/退出)。</summary>
- public void Flush(TimeSpan timeout)
- {
- try
- {
- var deadline = DateTime.UtcNow + timeout;
- // 等待 channel 排空(后台线程在消费)。
- while (_channel.Reader.Count > 0 && DateTime.UtcNow < deadline)
- Thread.Sleep(20);
- _transport.Flush(timeout);
- }
- catch (Exception ex)
- {
- _local.WriteSelfError("Flush failed: " + ex.Message);
- }
- }
- public void Dispose()
- {
- if (_disposed) return;
- _disposed = true;
- try
- {
- try { _resendTimer?.Dispose(); } catch { }
- _channel.Writer.TryComplete();
- Flush(TimeSpan.FromSeconds(3));
- _cts.Cancel();
- _worker.Wait(TimeSpan.FromSeconds(3));
- }
- catch { /* ignore */ }
- finally
- {
- _transport.Dispose();
- _cts.Dispose();
- }
- }
- }
- }
|