using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using Newtonsoft.Json;
namespace Aivfo.OperationLog
{
///
/// 异步管道(14§9/§11/§12):
/// 业务线程非阻塞入队(有界 Channel,满了丢弃 + 本地兜底)→ 后台单线程批量取出 → IOplogTransport 发送。
/// 入队绝不阻塞、绝不抛异常影响业务。
///
internal sealed class OperationLogPipeline : IDisposable
{
private readonly OperationLogOptions _options;
private readonly IOplogTransport _transport;
private readonly LocalFileWriter _local;
private readonly Channel _channel;
private readonly CancellationTokenSource _cts = new CancellationTokenSource();
private readonly Task _worker;
private readonly Timer _resendTimer; // G3-3:兜底补送定时器
private int _resending; // 补送重入保护(0=空闲,1=进行中)
private volatile bool _disposed;
public OperationLogPipeline(OperationLogOptions options, IOplogTransport transport, LocalFileWriter local)
{
_options = options;
_transport = transport;
_local = local;
_channel = Channel.CreateBounded(new BoundedChannelOptions(Math.Max(16, options.QueueCapacity))
{
FullMode = BoundedChannelFullMode.DropWrite, // 满了让 TryWrite 返回 false,由我们走兜底,绝不阻塞
SingleReader = true,
SingleWriter = false
});
_worker = Task.Factory.StartNew(RunLoop, TaskCreationOptions.LongRunning).Unwrap();
// G3-3 / 14§11:后台周期补送本地兜底文件(Kafka 恢复后把没发出的补回)。
if (_options.EnableFallbackResend)
{
var periodMs = Math.Max(5, _options.FallbackResendSeconds) * 1000;
_resendTimer = new Timer(TryResend, null, periodMs, periodMs);
}
}
/// 非阻塞入队。满了走本地兜底(不丢失),永不阻塞/抛异常。
public void Enqueue(OperationLogMessage msg)
{
if (_disposed || msg == null) return;
try
{
if (!_channel.Writer.TryWrite(msg))
{
// 队列满降级:操作级落本地兜底(不丢),调试级本不该到这(Debug 不入 Kafka 队列)。
_local.WriteFallback(SafeJson(msg));
}
}
catch (Exception ex)
{
_local.WriteSelfError("Enqueue failed: " + ex.Message);
}
}
private async Task RunLoop()
{
var reader = _channel.Reader;
var batch = new List(_options.BatchSize);
try
{
while (await reader.WaitToReadAsync(_cts.Token).ConfigureAwait(false))
{
batch.Clear();
while (batch.Count < _options.BatchSize && reader.TryRead(out var m))
batch.Add(m);
foreach (var m in batch)
{
var json = SafeJson(m);
try
{
_transport.Send(json);
}
catch (Exception ex)
{
// 发送失败:本地兜底(恢复后可选补送),并记自身错误。
_local.WriteFallback(json);
_local.WriteSelfError("Transport.Send failed: " + ex.Message);
}
}
}
}
catch (OperationCanceledException) { /* 正常退出 */ }
catch (Exception ex)
{
_local.WriteSelfError("Pipeline loop crashed: " + ex.Message);
}
}
private string SafeJson(OperationLogMessage m)
{
try { return JsonConvert.SerializeObject(m); }
catch (Exception ex)
{
_local.WriteSelfError("Message serialize failed: " + ex.Message);
return "{}";
}
}
///
/// G3-3 / 14§11 兜底补送:周期把 oplog-fallback-*.log 里没发出的日志重新投递。
/// 做法:认领文件(重命名 .resending,避开 append 竞争)→ 逐行 Send → 删除认领文件。
/// 若此刻 Kafka 仍不可用,投递失败会由传输层 onDeliveryFailed 重新落新兜底文件,下轮再试——
/// 故删除认领文件不会丢日志(最坏是极小概率进程崩溃窗口内的重复/丢失,best-effort)。
/// 重入保护 + 每轮限流 + 全 try 兜底,绝不影响业务。
///
private void TryResend(object state)
{
if (_disposed || !_options.EnableFallbackResend) return;
// 同一时刻只允许一个补送在跑。
if (Interlocked.CompareExchange(ref _resending, 1, 0) != 0) return;
try
{
var files = _local.ListFallbackFiles();
if (files == null || files.Length == 0) return;
var limit = Math.Min(files.Length, Math.Max(1, _options.ResendFilesPerCycle));
int resentLines = 0;
for (int i = 0; i < limit; i++)
{
if (_disposed) break;
var claimed = _local.ClaimFallback(files[i]);
if (claimed == null) continue; // 已被别处认领/不存在
var lines = _local.ReadLines(claimed);
foreach (var line in lines)
{
if (string.IsNullOrWhiteSpace(line)) continue;
try { _transport.Send(line); resentLines++; }
catch (Exception ex) { _local.WriteSelfError("Resend Send failed: " + ex.Message); }
}
_local.DeleteQuietly(claimed);
}
// 尽量把刚补送的推到 broker(短超时,不长时间挂后台线程)。
if (resentLines > 0)
{
try { _transport.Flush(TimeSpan.FromSeconds(2)); } catch { }
_local.WriteSelfError($"Fallback resend cycle: 重投 {resentLines} 条。");
}
}
catch (Exception ex)
{
_local.WriteSelfError("Resend cycle failed: " + ex.Message);
}
finally
{
Interlocked.Exchange(ref _resending, 0);
}
}
/// 排空队列并 flush 传输(用于测试/退出)。
public void Flush(TimeSpan timeout)
{
try
{
var deadline = DateTime.UtcNow + timeout;
// 等待 channel 排空(后台线程在消费)。
while (_channel.Reader.Count > 0 && DateTime.UtcNow < deadline)
Thread.Sleep(20);
_transport.Flush(timeout);
}
catch (Exception ex)
{
_local.WriteSelfError("Flush failed: " + ex.Message);
}
}
public void Dispose()
{
if (_disposed) return;
_disposed = true;
try
{
try { _resendTimer?.Dispose(); } catch { }
_channel.Writer.TryComplete();
Flush(TimeSpan.FromSeconds(3));
_cts.Cancel();
_worker.Wait(TimeSpan.FromSeconds(3));
}
catch { /* ignore */ }
finally
{
_transport.Dispose();
_cts.Dispose();
}
}
}
}