using System; using System.Collections.Generic; using System.Threading; using System.Threading.Channels; using System.Threading.Tasks; using Newtonsoft.Json; namespace Aivfo.OperationLog { /// /// 异步管道(14§9/§11/§12): /// 业务线程非阻塞入队(有界 Channel,满了丢弃 + 本地兜底)→ 后台单线程批量取出 → IOplogTransport 发送。 /// 入队绝不阻塞、绝不抛异常影响业务。 /// internal sealed class OperationLogPipeline : IDisposable { private readonly OperationLogOptions _options; private readonly IOplogTransport _transport; private readonly LocalFileWriter _local; private readonly Channel _channel; private readonly CancellationTokenSource _cts = new CancellationTokenSource(); private readonly Task _worker; private readonly Timer _resendTimer; // G3-3:兜底补送定时器 private int _resending; // 补送重入保护(0=空闲,1=进行中) private volatile bool _disposed; public OperationLogPipeline(OperationLogOptions options, IOplogTransport transport, LocalFileWriter local) { _options = options; _transport = transport; _local = local; _channel = Channel.CreateBounded(new BoundedChannelOptions(Math.Max(16, options.QueueCapacity)) { FullMode = BoundedChannelFullMode.DropWrite, // 满了让 TryWrite 返回 false,由我们走兜底,绝不阻塞 SingleReader = true, SingleWriter = false }); _worker = Task.Factory.StartNew(RunLoop, TaskCreationOptions.LongRunning).Unwrap(); // G3-3 / 14§11:后台周期补送本地兜底文件(Kafka 恢复后把没发出的补回)。 if (_options.EnableFallbackResend) { var periodMs = Math.Max(5, _options.FallbackResendSeconds) * 1000; _resendTimer = new Timer(TryResend, null, periodMs, periodMs); } } /// 非阻塞入队。满了走本地兜底(不丢失),永不阻塞/抛异常。 public void Enqueue(OperationLogMessage msg) { if (_disposed || msg == null) return; try { if (!_channel.Writer.TryWrite(msg)) { // 队列满降级:操作级落本地兜底(不丢),调试级本不该到这(Debug 不入 Kafka 队列)。 _local.WriteFallback(SafeJson(msg)); } } catch (Exception ex) { _local.WriteSelfError("Enqueue failed: " + ex.Message); } } private async Task RunLoop() { var reader = _channel.Reader; var batch = new List(_options.BatchSize); try { while (await reader.WaitToReadAsync(_cts.Token).ConfigureAwait(false)) { batch.Clear(); while (batch.Count < _options.BatchSize && reader.TryRead(out var m)) batch.Add(m); foreach (var m in batch) { var json = SafeJson(m); try { _transport.Send(json); } catch (Exception ex) { // 发送失败:本地兜底(恢复后可选补送),并记自身错误。 _local.WriteFallback(json); _local.WriteSelfError("Transport.Send failed: " + ex.Message); } } } } catch (OperationCanceledException) { /* 正常退出 */ } catch (Exception ex) { _local.WriteSelfError("Pipeline loop crashed: " + ex.Message); } } private string SafeJson(OperationLogMessage m) { try { return JsonConvert.SerializeObject(m); } catch (Exception ex) { _local.WriteSelfError("Message serialize failed: " + ex.Message); return "{}"; } } /// /// G3-3 / 14§11 兜底补送:周期把 oplog-fallback-*.log 里没发出的日志重新投递。 /// 做法:认领文件(重命名 .resending,避开 append 竞争)→ 逐行 Send → 删除认领文件。 /// 若此刻 Kafka 仍不可用,投递失败会由传输层 onDeliveryFailed 重新落新兜底文件,下轮再试—— /// 故删除认领文件不会丢日志(最坏是极小概率进程崩溃窗口内的重复/丢失,best-effort)。 /// 重入保护 + 每轮限流 + 全 try 兜底,绝不影响业务。 /// private void TryResend(object state) { if (_disposed || !_options.EnableFallbackResend) return; // 同一时刻只允许一个补送在跑。 if (Interlocked.CompareExchange(ref _resending, 1, 0) != 0) return; try { var files = _local.ListFallbackFiles(); if (files == null || files.Length == 0) return; var limit = Math.Min(files.Length, Math.Max(1, _options.ResendFilesPerCycle)); int resentLines = 0; for (int i = 0; i < limit; i++) { if (_disposed) break; var claimed = _local.ClaimFallback(files[i]); if (claimed == null) continue; // 已被别处认领/不存在 var lines = _local.ReadLines(claimed); foreach (var line in lines) { if (string.IsNullOrWhiteSpace(line)) continue; try { _transport.Send(line); resentLines++; } catch (Exception ex) { _local.WriteSelfError("Resend Send failed: " + ex.Message); } } _local.DeleteQuietly(claimed); } // 尽量把刚补送的推到 broker(短超时,不长时间挂后台线程)。 if (resentLines > 0) { try { _transport.Flush(TimeSpan.FromSeconds(2)); } catch { } _local.WriteSelfError($"Fallback resend cycle: 重投 {resentLines} 条。"); } } catch (Exception ex) { _local.WriteSelfError("Resend cycle failed: " + ex.Message); } finally { Interlocked.Exchange(ref _resending, 0); } } /// 排空队列并 flush 传输(用于测试/退出)。 public void Flush(TimeSpan timeout) { try { var deadline = DateTime.UtcNow + timeout; // 等待 channel 排空(后台线程在消费)。 while (_channel.Reader.Count > 0 && DateTime.UtcNow < deadline) Thread.Sleep(20); _transport.Flush(timeout); } catch (Exception ex) { _local.WriteSelfError("Flush failed: " + ex.Message); } } public void Dispose() { if (_disposed) return; _disposed = true; try { try { _resendTimer?.Dispose(); } catch { } _channel.Writer.TryComplete(); Flush(TimeSpan.FromSeconds(3)); _cts.Cancel(); _worker.Wait(TimeSpan.FromSeconds(3)); } catch { /* ignore */ } finally { _transport.Dispose(); _cts.Dispose(); } } } }