OperationLogPipeline.cs 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Threading;
  4. using System.Threading.Channels;
  5. using System.Threading.Tasks;
  6. using Newtonsoft.Json;
  7. namespace Aivfo.OperationLog
  8. {
  9. /// <summary>
  10. /// 异步管道(14§9/§11/§12):
  11. /// 业务线程非阻塞入队(有界 Channel,满了丢弃 + 本地兜底)→ 后台单线程批量取出 → IOplogTransport 发送。
  12. /// 入队绝不阻塞、绝不抛异常影响业务。
  13. /// </summary>
  14. internal sealed class OperationLogPipeline : IDisposable
  15. {
  16. private readonly OperationLogOptions _options;
  17. private readonly IOplogTransport _transport;
  18. private readonly LocalFileWriter _local;
  19. private readonly Channel<OperationLogMessage> _channel;
  20. private readonly CancellationTokenSource _cts = new CancellationTokenSource();
  21. private readonly Task _worker;
  22. private readonly Timer _resendTimer; // G3-3:兜底补送定时器
  23. private int _resending; // 补送重入保护(0=空闲,1=进行中)
  24. private volatile bool _disposed;
  25. public OperationLogPipeline(OperationLogOptions options, IOplogTransport transport, LocalFileWriter local)
  26. {
  27. _options = options;
  28. _transport = transport;
  29. _local = local;
  30. _channel = Channel.CreateBounded<OperationLogMessage>(new BoundedChannelOptions(Math.Max(16, options.QueueCapacity))
  31. {
  32. FullMode = BoundedChannelFullMode.DropWrite, // 满了让 TryWrite 返回 false,由我们走兜底,绝不阻塞
  33. SingleReader = true,
  34. SingleWriter = false
  35. });
  36. _worker = Task.Factory.StartNew(RunLoop, TaskCreationOptions.LongRunning).Unwrap();
  37. // G3-3 / 14§11:后台周期补送本地兜底文件(Kafka 恢复后把没发出的补回)。
  38. if (_options.EnableFallbackResend)
  39. {
  40. var periodMs = Math.Max(5, _options.FallbackResendSeconds) * 1000;
  41. _resendTimer = new Timer(TryResend, null, periodMs, periodMs);
  42. }
  43. }
  44. /// <summary>非阻塞入队。满了走本地兜底(不丢失),永不阻塞/抛异常。</summary>
  45. public void Enqueue(OperationLogMessage msg)
  46. {
  47. if (_disposed || msg == null) return;
  48. try
  49. {
  50. if (!_channel.Writer.TryWrite(msg))
  51. {
  52. // 队列满降级:操作级落本地兜底(不丢),调试级本不该到这(Debug 不入 Kafka 队列)。
  53. _local.WriteFallback(SafeJson(msg));
  54. }
  55. }
  56. catch (Exception ex)
  57. {
  58. _local.WriteSelfError("Enqueue failed: " + ex.Message);
  59. }
  60. }
  61. private async Task RunLoop()
  62. {
  63. var reader = _channel.Reader;
  64. var batch = new List<OperationLogMessage>(_options.BatchSize);
  65. try
  66. {
  67. while (await reader.WaitToReadAsync(_cts.Token).ConfigureAwait(false))
  68. {
  69. batch.Clear();
  70. while (batch.Count < _options.BatchSize && reader.TryRead(out var m))
  71. batch.Add(m);
  72. foreach (var m in batch)
  73. {
  74. var json = SafeJson(m);
  75. try
  76. {
  77. _transport.Send(json);
  78. }
  79. catch (Exception ex)
  80. {
  81. // 发送失败:本地兜底(恢复后可选补送),并记自身错误。
  82. _local.WriteFallback(json);
  83. _local.WriteSelfError("Transport.Send failed: " + ex.Message);
  84. }
  85. }
  86. }
  87. }
  88. catch (OperationCanceledException) { /* 正常退出 */ }
  89. catch (Exception ex)
  90. {
  91. _local.WriteSelfError("Pipeline loop crashed: " + ex.Message);
  92. }
  93. }
  94. private string SafeJson(OperationLogMessage m)
  95. {
  96. try { return JsonConvert.SerializeObject(m); }
  97. catch (Exception ex)
  98. {
  99. _local.WriteSelfError("Message serialize failed: " + ex.Message);
  100. return "{}";
  101. }
  102. }
  103. /// <summary>
  104. /// G3-3 / 14§11 兜底补送:周期把 oplog-fallback-*.log 里没发出的日志重新投递。
  105. /// 做法:认领文件(重命名 .resending,避开 append 竞争)→ 逐行 Send → 删除认领文件。
  106. /// 若此刻 Kafka 仍不可用,投递失败会由传输层 onDeliveryFailed 重新落新兜底文件,下轮再试——
  107. /// 故删除认领文件不会丢日志(最坏是极小概率进程崩溃窗口内的重复/丢失,best-effort)。
  108. /// 重入保护 + 每轮限流 + 全 try 兜底,绝不影响业务。
  109. /// </summary>
  110. private void TryResend(object state)
  111. {
  112. if (_disposed || !_options.EnableFallbackResend) return;
  113. // 同一时刻只允许一个补送在跑。
  114. if (Interlocked.CompareExchange(ref _resending, 1, 0) != 0) return;
  115. try
  116. {
  117. var files = _local.ListFallbackFiles();
  118. if (files == null || files.Length == 0) return;
  119. var limit = Math.Min(files.Length, Math.Max(1, _options.ResendFilesPerCycle));
  120. int resentLines = 0;
  121. for (int i = 0; i < limit; i++)
  122. {
  123. if (_disposed) break;
  124. var claimed = _local.ClaimFallback(files[i]);
  125. if (claimed == null) continue; // 已被别处认领/不存在
  126. var lines = _local.ReadLines(claimed);
  127. foreach (var line in lines)
  128. {
  129. if (string.IsNullOrWhiteSpace(line)) continue;
  130. try { _transport.Send(line); resentLines++; }
  131. catch (Exception ex) { _local.WriteSelfError("Resend Send failed: " + ex.Message); }
  132. }
  133. _local.DeleteQuietly(claimed);
  134. }
  135. // 尽量把刚补送的推到 broker(短超时,不长时间挂后台线程)。
  136. if (resentLines > 0)
  137. {
  138. try { _transport.Flush(TimeSpan.FromSeconds(2)); } catch { }
  139. _local.WriteSelfError($"Fallback resend cycle: 重投 {resentLines} 条。");
  140. }
  141. }
  142. catch (Exception ex)
  143. {
  144. _local.WriteSelfError("Resend cycle failed: " + ex.Message);
  145. }
  146. finally
  147. {
  148. Interlocked.Exchange(ref _resending, 0);
  149. }
  150. }
  151. /// <summary>排空队列并 flush 传输(用于测试/退出)。</summary>
  152. public void Flush(TimeSpan timeout)
  153. {
  154. try
  155. {
  156. var deadline = DateTime.UtcNow + timeout;
  157. // 等待 channel 排空(后台线程在消费)。
  158. while (_channel.Reader.Count > 0 && DateTime.UtcNow < deadline)
  159. Thread.Sleep(20);
  160. _transport.Flush(timeout);
  161. }
  162. catch (Exception ex)
  163. {
  164. _local.WriteSelfError("Flush failed: " + ex.Message);
  165. }
  166. }
  167. public void Dispose()
  168. {
  169. if (_disposed) return;
  170. _disposed = true;
  171. try
  172. {
  173. try { _resendTimer?.Dispose(); } catch { }
  174. _channel.Writer.TryComplete();
  175. Flush(TimeSpan.FromSeconds(3));
  176. _cts.Cancel();
  177. _worker.Wait(TimeSpan.FromSeconds(3));
  178. }
  179. catch { /* ignore */ }
  180. finally
  181. {
  182. _transport.Dispose();
  183. _cts.Dispose();
  184. }
  185. }
  186. }
  187. }