From 95b84eb3bd86a1cab5230d54774e6820ea6617f3 Mon Sep 17 00:00:00 2001 From: xiaolipro <2357729423@qq.com> Date: Wed, 31 Jan 2024 15:53:07 +0800 Subject: [PATCH 1/2] =?UTF-8?q?=E6=94=AF=E6=8C=81=E6=B5=81=E9=87=8F?= =?UTF-8?q?=E5=88=86=E6=9E=90?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/Gateway/Dto/FlowStatisticsDto.cs | 27 +++ .../ConnectionBuilderExtensions.cs | 33 ++++ .../FlowAnalytics/DelegatingDuplexPipe.cs | 67 +++++++ .../FlowAnalytics/DelegatingStream.cs | 136 +++++++++++++ .../FlowAnalytics/DuplexPipeStream.cs | 179 ++++++++++++++++++ .../FlowAnalytics/FlowAnalyzeDuplexPipe.cs | 11 ++ .../FlowAnalytics/FlowAnalyzeMiddleware.cs | 28 +++ .../FlowAnalytics/FlowAnalyzeStream.cs | 65 +++++++ .../Middlewares/FlowAnalytics/FlowAnalyzer.cs | 98 ++++++++++ .../Middlewares/FlowAnalytics/FlowType.cs | 17 ++ .../FlowAnalytics/IFlowAnalyzer.cs | 17 ++ .../Middlewares/FlowAnalytics/TaskToApm.cs | 100 ++++++++++ src/Gateway/Middlewares/IKestrelMiddleware.cs | 11 ++ src/Gateway/Program.cs | 14 +- src/Gateway/Services/SystemService.cs | 16 +- 15 files changed, 814 insertions(+), 5 deletions(-) create mode 100644 src/Gateway/Dto/FlowStatisticsDto.cs create mode 100644 src/Gateway/Middlewares/ConnectionBuilderExtensions.cs create mode 100644 src/Gateway/Middlewares/FlowAnalytics/DelegatingDuplexPipe.cs create mode 100644 src/Gateway/Middlewares/FlowAnalytics/DelegatingStream.cs create mode 100644 src/Gateway/Middlewares/FlowAnalytics/DuplexPipeStream.cs create mode 100644 src/Gateway/Middlewares/FlowAnalytics/FlowAnalyzeDuplexPipe.cs create mode 100644 src/Gateway/Middlewares/FlowAnalytics/FlowAnalyzeMiddleware.cs create mode 100644 src/Gateway/Middlewares/FlowAnalytics/FlowAnalyzeStream.cs create mode 100644 src/Gateway/Middlewares/FlowAnalytics/FlowAnalyzer.cs create mode 100644 src/Gateway/Middlewares/FlowAnalytics/FlowType.cs create mode 100644 src/Gateway/Middlewares/FlowAnalytics/IFlowAnalyzer.cs create mode 100644 src/Gateway/Middlewares/FlowAnalytics/TaskToApm.cs create mode 100644 src/Gateway/Middlewares/IKestrelMiddleware.cs diff --git a/src/Gateway/Dto/FlowStatisticsDto.cs b/src/Gateway/Dto/FlowStatisticsDto.cs new file mode 100644 index 0000000..7d677c2 --- /dev/null +++ b/src/Gateway/Dto/FlowStatisticsDto.cs @@ -0,0 +1,27 @@ +namespace Gateway.Dto; + +/// +/// 流量统计 +/// +public record FlowStatisticsDto +{ + /// + /// 获取总读上行 + /// + public long TotalRead { get; init; } + + /// + /// 获取总下行 + /// + public long TotalWrite { get; init; } + + /// + /// 获取读取速率 + /// + public double ReadRate { get; init; } + + /// + /// 获取写入速率 + /// + public double WriteRate { get; init; } +} \ No newline at end of file diff --git a/src/Gateway/Middlewares/ConnectionBuilderExtensions.cs b/src/Gateway/Middlewares/ConnectionBuilderExtensions.cs new file mode 100644 index 0000000..25fa1fb --- /dev/null +++ b/src/Gateway/Middlewares/ConnectionBuilderExtensions.cs @@ -0,0 +1,33 @@ +using Microsoft.AspNetCore.Connections; + +namespace Gateway.Middlewares; + +/// +/// IConnectionBuilder扩展 +/// +public static class ConnectionBuilderExtensions +{ + /// + /// 使用Kestrel中间件 + /// + /// + /// + /// + public static IConnectionBuilder Use(this IConnectionBuilder builder) + where TMiddleware : IKestrelMiddleware + { + var middleware = ActivatorUtilities.GetServiceOrCreateInstance(builder.ApplicationServices); + return builder.Use(middleware); + } + + /// + /// 使用Kestrel中间件 + /// + /// + /// + /// + public static IConnectionBuilder Use(this IConnectionBuilder builder, IKestrelMiddleware middleware) + { + return builder.Use(next => context => middleware.InvokeAsync(next, context)); + } +} \ No newline at end of file diff --git a/src/Gateway/Middlewares/FlowAnalytics/DelegatingDuplexPipe.cs b/src/Gateway/Middlewares/FlowAnalytics/DelegatingDuplexPipe.cs new file mode 100644 index 0000000..632da93 --- /dev/null +++ b/src/Gateway/Middlewares/FlowAnalytics/DelegatingDuplexPipe.cs @@ -0,0 +1,67 @@ +using System.IO.Pipelines; + +namespace Gateway.Middlewares.FlowAnalytics; + +/// +/// 基于委托流的DuplexPipe +/// +/// +public class DelegatingDuplexPipe : IDuplexPipe, IAsyncDisposable where TDelegatingStream : DelegatingStream +{ + private bool disposed; + private readonly object syncRoot = new(); + + /// + /// 输入对象 + /// + public PipeReader Input { get; } + + /// + /// 输出对象 + /// + public PipeWriter Output { get; } + + /// + /// 基于委托流的DuplexPipe + /// + /// + /// 委托流工厂 + public DelegatingDuplexPipe(IDuplexPipe duplexPipe, Func delegatingStreamFactory) : + this(duplexPipe, delegatingStreamFactory, new StreamPipeReaderOptions(leaveOpen: true), new StreamPipeWriterOptions(leaveOpen: true)) + { + } + + /// + /// 基于委托流的DuplexPipe + /// + /// + /// 委托流工厂 + /// + /// + public DelegatingDuplexPipe(IDuplexPipe duplexPipe, Func delegatingStreamFactory, StreamPipeReaderOptions readerOptions, StreamPipeWriterOptions writerOptions) + { + var duplexPipeStream = new DuplexPipeStream(duplexPipe); + var delegatingStream = delegatingStreamFactory(duplexPipeStream); + this.Input = PipeReader.Create(delegatingStream, readerOptions); + this.Output = PipeWriter.Create(delegatingStream, writerOptions); + } + + /// + /// 释放资源 + /// + /// + public virtual async ValueTask DisposeAsync() + { + lock (this.syncRoot) + { + if (this.disposed == true) + { + return; + } + this.disposed = true; + } + + await this.Input.CompleteAsync(); + await this.Output.CompleteAsync(); + } +} \ No newline at end of file diff --git a/src/Gateway/Middlewares/FlowAnalytics/DelegatingStream.cs b/src/Gateway/Middlewares/FlowAnalytics/DelegatingStream.cs new file mode 100644 index 0000000..3e4abc9 --- /dev/null +++ b/src/Gateway/Middlewares/FlowAnalytics/DelegatingStream.cs @@ -0,0 +1,136 @@ +namespace Gateway.Middlewares.FlowAnalytics; + +/// +/// 委托流 +/// +public abstract class DelegatingStream : Stream +{ + /// + /// 获取所包装的流对象 + /// + protected Stream Inner { get; } + + /// + /// 委托流 + /// + /// + public DelegatingStream(Stream inner) + { + Inner = inner; + } + + /// + public override bool CanRead => Inner.CanRead; + + /// + public override bool CanSeek => Inner.CanSeek; + + /// + public override bool CanWrite => Inner.CanWrite; + + /// + public override long Length => Inner.Length; + + /// + public override long Position + { + get => Inner.Position; + set => Inner.Position = value; + } + + /// + public override void Flush() + { + Inner.Flush(); + } + + /// + public override Task FlushAsync(CancellationToken cancellationToken) + { + return Inner.FlushAsync(cancellationToken); + } + + /// + public override int Read(byte[] buffer, int offset, int count) + { + return Inner.Read(buffer, offset, count); + } + + /// + public override int Read(Span destination) + { + return Inner.Read(destination); + } + + /// + public override Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + { + return Inner.ReadAsync(buffer, offset, count, cancellationToken); + } + + /// + public override ValueTask ReadAsync(Memory destination, CancellationToken cancellationToken = default) + { + return Inner.ReadAsync(destination, cancellationToken); + } + + /// + public override long Seek(long offset, SeekOrigin origin) + { + return Inner.Seek(offset, origin); + } + + /// + public override void SetLength(long value) + { + Inner.SetLength(value); + } + + /// + public override void Write(byte[] buffer, int offset, int count) + { + Inner.Write(buffer, offset, count); + } + + /// + public override void Write(ReadOnlySpan source) + { + Inner.Write(source); + } + + /// + public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + { + return Inner.WriteAsync(buffer, offset, count, cancellationToken); + } + + /// + public override ValueTask WriteAsync(ReadOnlyMemory source, CancellationToken cancellationToken = default) + { + return Inner.WriteAsync(source, cancellationToken); + } + + /// + public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state) + { + return TaskToApm.Begin(ReadAsync(buffer, offset, count), callback, state); + } + + /// + public override int EndRead(IAsyncResult asyncResult) + { + return TaskToApm.End(asyncResult); + } + + /// + public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state) + { + return TaskToApm.Begin(WriteAsync(buffer, offset, count), callback, state); + } + + /// + public override void EndWrite(IAsyncResult asyncResult) + { + TaskToApm.End(asyncResult); + } +} \ No newline at end of file diff --git a/src/Gateway/Middlewares/FlowAnalytics/DuplexPipeStream.cs b/src/Gateway/Middlewares/FlowAnalytics/DuplexPipeStream.cs new file mode 100644 index 0000000..85806cb --- /dev/null +++ b/src/Gateway/Middlewares/FlowAnalytics/DuplexPipeStream.cs @@ -0,0 +1,179 @@ +using System.Buffers; +using System.IO.Pipelines; +using System.Runtime.CompilerServices; + +namespace Gateway.Middlewares.FlowAnalytics; + +/// +/// IDuplexPipe封装为Stream +/// +public class DuplexPipeStream : Stream +{ + private readonly PipeReader input; + private readonly PipeWriter output; + private readonly bool throwOnCancelled; + private volatile bool cancelCalled; + + /// + /// IDuplexPipe封装为Stream + /// + /// + /// + public DuplexPipeStream(IDuplexPipe duplexPipe, bool throwOnCancelled = false) + { + input = duplexPipe.Input; + output = duplexPipe.Output; + this.throwOnCancelled = throwOnCancelled; + } + + /// + /// 取消挂起的读取操作 + /// + public void CancelPendingRead() + { + cancelCalled = true; + input.CancelPendingRead(); + } + + /// + public override bool CanRead => true; + + /// + public override bool CanSeek => false; + + /// + public override bool CanWrite => true; + + /// + public override long Length => throw new NotSupportedException(); + + /// + public override long Position + { + get => throw new NotSupportedException(); + set => throw new NotSupportedException(); + } + + /// + public override long Seek(long offset, SeekOrigin origin) + { + throw new NotSupportedException(); + } + + /// + public override void SetLength(long value) + { + throw new NotSupportedException(); + } + + /// + public override int Read(byte[] buffer, int offset, int count) + { + var task = ReadAsyncInternal(new Memory(buffer, offset, count), default); + return task.IsCompleted ? task.Result : task.AsTask().GetAwaiter().GetResult(); + } + + /// + public override Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken = default) + { + return ReadAsyncInternal(new Memory(buffer, offset, count), cancellationToken).AsTask(); + } + + /// + public override ValueTask ReadAsync(Memory destination, CancellationToken cancellationToken = default) + { + return ReadAsyncInternal(destination, cancellationToken); + } + + /// + public override void Write(byte[] buffer, int offset, int count) + { + WriteAsync(buffer, offset, count).GetAwaiter().GetResult(); + } + + /// + public override async Task WriteAsync(byte[]? buffer, int offset, int count, CancellationToken cancellationToken) + { + await output.WriteAsync(buffer.AsMemory(offset, count), cancellationToken); + } + + /// + public override async ValueTask WriteAsync(ReadOnlyMemory source, CancellationToken cancellationToken = default) + { + await output.WriteAsync(source, cancellationToken); + } + + /// + public override void Flush() + { + FlushAsync(CancellationToken.None).GetAwaiter().GetResult(); + } + + /// + public override async Task FlushAsync(CancellationToken cancellationToken) + { + await output.FlushAsync(cancellationToken); + } + + + [AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder<>))] + private async ValueTask ReadAsyncInternal(Memory destination, CancellationToken cancellationToken) + { + while (true) + { + var result = await input.ReadAsync(cancellationToken); + var readableBuffer = result.Buffer; + try + { + if (throwOnCancelled && result.IsCanceled && cancelCalled) + { + // Reset the bool + cancelCalled = false; + throw new OperationCanceledException(); + } + + if (!readableBuffer.IsEmpty) + { + // buffer.Count is int + var count = (int)Math.Min(readableBuffer.Length, destination.Length); + readableBuffer = readableBuffer.Slice(0, count); + readableBuffer.CopyTo(destination.Span); + return count; + } + + if (result.IsCompleted) + { + return 0; + } + } + finally + { + input.AdvanceTo(readableBuffer.End, readableBuffer.End); + } + } + } + + /// + public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state) + { + return TaskToApm.Begin(ReadAsync(buffer, offset, count), callback, state); + } + + /// + public override int EndRead(IAsyncResult asyncResult) + { + return TaskToApm.End(asyncResult); + } + + /// + public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state) + { + return TaskToApm.Begin(WriteAsync(buffer, offset, count), callback, state); + } + + /// + public override void EndWrite(IAsyncResult asyncResult) + { + TaskToApm.End(asyncResult); + } +} \ No newline at end of file diff --git a/src/Gateway/Middlewares/FlowAnalytics/FlowAnalyzeDuplexPipe.cs b/src/Gateway/Middlewares/FlowAnalytics/FlowAnalyzeDuplexPipe.cs new file mode 100644 index 0000000..0597ebc --- /dev/null +++ b/src/Gateway/Middlewares/FlowAnalytics/FlowAnalyzeDuplexPipe.cs @@ -0,0 +1,11 @@ +using System.IO.Pipelines; + +namespace Gateway.Middlewares.FlowAnalytics; + +sealed class FlowAnalyzeDuplexPipe : DelegatingDuplexPipe +{ + public FlowAnalyzeDuplexPipe(IDuplexPipe duplexPipe, IFlowAnalyzer flowAnalyzer) : + base(duplexPipe, stream => new FlowAnalyzeStream(stream, flowAnalyzer)) + { + } +} \ No newline at end of file diff --git a/src/Gateway/Middlewares/FlowAnalytics/FlowAnalyzeMiddleware.cs b/src/Gateway/Middlewares/FlowAnalytics/FlowAnalyzeMiddleware.cs new file mode 100644 index 0000000..8cf53b2 --- /dev/null +++ b/src/Gateway/Middlewares/FlowAnalytics/FlowAnalyzeMiddleware.cs @@ -0,0 +1,28 @@ +using Microsoft.AspNetCore.Connections; + +namespace Gateway.Middlewares.FlowAnalytics; + +public class FlowAnalyzeMiddleware : IKestrelMiddleware +{ + private readonly IFlowAnalyzer flowAnalyzer; + + public FlowAnalyzeMiddleware(IFlowAnalyzer flowAnalyzer) + { + this.flowAnalyzer = flowAnalyzer; + } + + public async Task InvokeAsync(ConnectionDelegate next, ConnectionContext context) + { + var oldTransport = context.Transport; + try + { + await using var duplexPipe = new FlowAnalyzeDuplexPipe(context.Transport, flowAnalyzer); + context.Transport = duplexPipe; + await next(context); + } + finally + { + context.Transport = oldTransport; + } + } +} \ No newline at end of file diff --git a/src/Gateway/Middlewares/FlowAnalytics/FlowAnalyzeStream.cs b/src/Gateway/Middlewares/FlowAnalytics/FlowAnalyzeStream.cs new file mode 100644 index 0000000..1d3f32d --- /dev/null +++ b/src/Gateway/Middlewares/FlowAnalytics/FlowAnalyzeStream.cs @@ -0,0 +1,65 @@ +namespace Gateway.Middlewares.FlowAnalytics; + +sealed class FlowAnalyzeStream : DelegatingStream +{ + private readonly IFlowAnalyzer flowAnalyzer; + + public FlowAnalyzeStream(Stream inner, IFlowAnalyzer flowAnalyzer) + : base(inner) + { + this.flowAnalyzer = flowAnalyzer; + } + + public override int Read(byte[] buffer, int offset, int count) + { + int read = base.Read(buffer, offset, count); + this.flowAnalyzer.OnFlow(FlowType.Read, read); + return read; + } + + public override int Read(Span destination) + { + int read = base.Read(destination); + this.flowAnalyzer.OnFlow(FlowType.Read, read); + return read; + } + + public override async Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + { + int read = await base.ReadAsync(buffer.AsMemory(offset, count), cancellationToken); + this.flowAnalyzer.OnFlow(FlowType.Read, read); + return read; + } + + public override async ValueTask ReadAsync(Memory destination, CancellationToken cancellationToken = default) + { + int read = await base.ReadAsync(destination, cancellationToken); + this.flowAnalyzer.OnFlow(FlowType.Read, read); + return read; + } + + + public override void Write(byte[] buffer, int offset, int count) + { + this.flowAnalyzer.OnFlow(FlowType.Wirte, count); + base.Write(buffer, offset, count); + } + + public override void Write(ReadOnlySpan source) + { + this.flowAnalyzer.OnFlow(FlowType.Wirte, source.Length); + base.Write(source); + } + + public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + { + this.flowAnalyzer.OnFlow(FlowType.Wirte, count); + return base.WriteAsync(buffer, offset, count, cancellationToken); + } + + public override ValueTask WriteAsync(ReadOnlyMemory source, CancellationToken cancellationToken = default) + { + this.flowAnalyzer.OnFlow(FlowType.Wirte, source.Length); + return base.WriteAsync(source, cancellationToken); + } +} \ No newline at end of file diff --git a/src/Gateway/Middlewares/FlowAnalytics/FlowAnalyzer.cs b/src/Gateway/Middlewares/FlowAnalytics/FlowAnalyzer.cs new file mode 100644 index 0000000..55636f4 --- /dev/null +++ b/src/Gateway/Middlewares/FlowAnalytics/FlowAnalyzer.cs @@ -0,0 +1,98 @@ +namespace Gateway.Middlewares.FlowAnalytics; + +public sealed class FlowAnalyzer : IFlowAnalyzer +{ + private const int INTERVAL_SECONDS = 5; + private readonly FlowQueues readQueues = new(INTERVAL_SECONDS); + private readonly FlowQueues writeQueues = new(INTERVAL_SECONDS); + + /// + /// 收到数据 + /// + /// + /// + public void OnFlow(FlowType flowType, int length) + { + if (flowType == FlowType.Read) + { + this.readQueues.OnFlow(length); + } + else + { + this.writeQueues.OnFlow(length); + } + } + + /// + /// 获取流量分析 + /// + /// + public FlowStatisticsDto GetFlowStatistics() + { + return new FlowStatisticsDto + { + TotalRead = this.readQueues.TotalBytes, + TotalWrite = this.writeQueues.TotalBytes, + ReadRate = this.readQueues.GetRate(), + WriteRate = this.writeQueues.GetRate() + }; + } + + private class FlowQueues + { + private int cleaning = 0; + private long totalBytes = 0L; + private record QueueItem(long Ticks, int Length); + private readonly ConcurrentQueue queues = new(); + + private readonly int intervalSeconds; + + public long TotalBytes => this.totalBytes; + + public FlowQueues(int intervalSeconds) + { + this.intervalSeconds = intervalSeconds; + } + + public void OnFlow(int length) + { + Interlocked.Add(ref this.totalBytes, length); + this.CleanInvalidRecords(); + this.queues.Enqueue(new QueueItem(Environment.TickCount64, length)); + } + + public double GetRate() + { + this.CleanInvalidRecords(); + return (double)this.queues.Sum(item => item.Length) / this.intervalSeconds; + } + + /// + /// 清除无效记录 + /// + /// + private bool CleanInvalidRecords() + { + if (Interlocked.CompareExchange(ref this.cleaning, 1, 0) != 0) + { + return false; + } + + var ticks = Environment.TickCount64; + while (this.queues.TryPeek(out var item)) + { + if (ticks - item.Ticks < this.intervalSeconds * 1000) + { + break; + } + else + { + this.queues.TryDequeue(out _); + } + } + + Interlocked.Exchange(ref this.cleaning, 0); + return true; + } + } +} \ No newline at end of file diff --git a/src/Gateway/Middlewares/FlowAnalytics/FlowType.cs b/src/Gateway/Middlewares/FlowAnalytics/FlowType.cs new file mode 100644 index 0000000..218eb11 --- /dev/null +++ b/src/Gateway/Middlewares/FlowAnalytics/FlowType.cs @@ -0,0 +1,17 @@ +namespace Gateway.Middlewares.FlowAnalytics; + +/// +/// 流量类型 +/// +public enum FlowType +{ + /// + /// 读取 + /// + Read, + + /// + /// 写入 + /// + Wirte +} \ No newline at end of file diff --git a/src/Gateway/Middlewares/FlowAnalytics/IFlowAnalyzer.cs b/src/Gateway/Middlewares/FlowAnalytics/IFlowAnalyzer.cs new file mode 100644 index 0000000..8f680d9 --- /dev/null +++ b/src/Gateway/Middlewares/FlowAnalytics/IFlowAnalyzer.cs @@ -0,0 +1,17 @@ +namespace Gateway.Middlewares.FlowAnalytics; + +public interface IFlowAnalyzer +{ + /// + /// 收到数据 + /// + /// + /// + void OnFlow(FlowType flowType, int length); + + /// + /// 获取速率 + /// + /// + FlowStatisticsDto GetFlowStatistics(); +} \ No newline at end of file diff --git a/src/Gateway/Middlewares/FlowAnalytics/TaskToApm.cs b/src/Gateway/Middlewares/FlowAnalytics/TaskToApm.cs new file mode 100644 index 0000000..9e4f7fa --- /dev/null +++ b/src/Gateway/Middlewares/FlowAnalytics/TaskToApm.cs @@ -0,0 +1,100 @@ +namespace Gateway.Middlewares.FlowAnalytics; + +static class TaskToApm +{ + /// + /// Marshals the Task as an IAsyncResult, using the supplied callback and state + /// to implement the APM pattern. + /// + /// The Task to be marshaled. + /// The callback to be invoked upon completion. + /// The state to be stored in the IAsyncResult. + /// An IAsyncResult to represent the task's asynchronous operation. + public static IAsyncResult Begin(Task task, AsyncCallback? callback, object? state) => + new TaskAsyncResult(task, state, callback); + + /// Processes an IAsyncResult returned by Begin. + /// The IAsyncResult to unwrap. + public static void End(IAsyncResult asyncResult) + { + if (asyncResult is TaskAsyncResult twar) + { + twar._task.GetAwaiter().GetResult(); + return; + } + + throw new ArgumentNullException(); + } + + /// Processes an IAsyncResult returned by Begin. + /// The IAsyncResult to unwrap. + public static TResult End(IAsyncResult asyncResult) + { + if (asyncResult is TaskAsyncResult twar && twar._task is Task task) + { + return task.GetAwaiter().GetResult(); + } + + throw new ArgumentNullException(); + } + + /// Provides a simple IAsyncResult that wraps a Task. + /// + /// We could use the Task as the IAsyncResult if the Task's AsyncState is the same as the object state, + /// but that's very rare, in particular in a situation where someone cares about allocation, and always + /// using TaskAsyncResult simplifies things and enables additional optimizations. + /// + internal sealed class TaskAsyncResult : IAsyncResult + { + /// The wrapped Task. + internal readonly Task _task; + /// Callback to invoke when the wrapped task completes. + private readonly AsyncCallback? _callback; + + /// Initializes the IAsyncResult with the Task to wrap and the associated object state. + /// The Task to wrap. + /// The new AsyncState value. + /// Callback to invoke when the wrapped task completes. + internal TaskAsyncResult(Task task, object? state, AsyncCallback? callback) + { + Debug.Assert(task != null); + _task = task; + AsyncState = state; + + if (task.IsCompleted) + { + // Synchronous completion. Invoke the callback. No need to store it. + CompletedSynchronously = true; + callback?.Invoke(this); + } + else if (callback != null) + { + // Asynchronous completion, and we have a callback; schedule it. We use OnCompleted rather than ContinueWith in + // order to avoid running synchronously if the task has already completed by the time we get here but still run + // synchronously as part of the task's completion if the task completes after (the more common case). + _callback = callback; + _task.ConfigureAwait(continueOnCapturedContext: false) + .GetAwaiter() + .OnCompleted(InvokeCallback); // allocates a delegate, but avoids a closure + } + } + + /// Invokes the callback. + private void InvokeCallback() + { + Debug.Assert(!CompletedSynchronously); + Debug.Assert(_callback != null); + _callback.Invoke(this); + } + + /// Gets a user-defined object that qualifies or contains information about an asynchronous operation. + public object? AsyncState { get; } + /// Gets a value that indicates whether the asynchronous operation completed synchronously. + /// This is set lazily based on whether the has completed by the time this object is created. + public bool CompletedSynchronously { get; } + /// Gets a value that indicates whether the asynchronous operation has completed. + public bool IsCompleted => _task.IsCompleted; + /// Gets a that is used to wait for an asynchronous operation to complete. + public WaitHandle AsyncWaitHandle => ((IAsyncResult)_task).AsyncWaitHandle; + } +} \ No newline at end of file diff --git a/src/Gateway/Middlewares/IKestrelMiddleware.cs b/src/Gateway/Middlewares/IKestrelMiddleware.cs new file mode 100644 index 0000000..7db518b --- /dev/null +++ b/src/Gateway/Middlewares/IKestrelMiddleware.cs @@ -0,0 +1,11 @@ +using Microsoft.AspNetCore.Connections; + +namespace Gateway.Middlewares; + +/// +/// Kestrel的中间件接口 +/// +public interface IKestrelMiddleware +{ + Task InvokeAsync(ConnectionDelegate next, ConnectionContext context); +} \ No newline at end of file diff --git a/src/Gateway/Program.cs b/src/Gateway/Program.cs index c50e208..65f0763 100644 --- a/src/Gateway/Program.cs +++ b/src/Gateway/Program.cs @@ -1,12 +1,14 @@ #region FreeSql类型转换 +using Gateway.Middlewares.FlowAnalytics; + Utils.TypeHandlers.TryAdd(typeof(Dictionary), new StringJsonHandler>()); Utils.TypeHandlers.TryAdd(typeof(List), new StringJsonHandler>()); Utils.TypeHandlers.TryAdd(typeof(string[]), new StringJsonHandler()); #endregion -var builder = WebApplication.CreateBuilder(args); +var builder = WebApplication.CreateSlimBuilder(args); var directory = new DirectoryInfo("/data"); if (!directory.Exists) @@ -61,6 +63,8 @@ builder.WebHost.UseKestrel(options => }); }); +builder.Services.AddSingleton(); + builder.WebHost.ConfigureKestrel(kestrel => { kestrel.Limits.MaxRequestBodySize = null; @@ -69,9 +73,14 @@ builder.WebHost.ConfigureKestrel(kestrel => { portOptions.Protocols = HttpProtocols.Http1AndHttp2AndHttp3; portOptions.UseHttps(); + portOptions.Use(); }); - kestrel.ListenAnyIP(8080, portOptions => { portOptions.Protocols = HttpProtocols.Http1AndHttp2; }); + kestrel.ListenAnyIP(8080, portOptions => + { + portOptions.Protocols = HttpProtocols.Http1AndHttp2; + portOptions.Use(); + }); }); #region Jwt @@ -124,6 +133,7 @@ builder.Services.AddSingleton(); builder.Services.AddSingleton(); builder.Services.AddSingleton(); builder.Services.AddSingleton(); +builder.Services.AddSingleton(); builder.Services.AddSingleton(); diff --git a/src/Gateway/Services/SystemService.cs b/src/Gateway/Services/SystemService.cs index 830edf9..c4b5e03 100644 --- a/src/Gateway/Services/SystemService.cs +++ b/src/Gateway/Services/SystemService.cs @@ -1,6 +1,8 @@ -namespace Gateway.Services; +using Gateway.Middlewares.FlowAnalytics; -public class SystemService +namespace Gateway.Services; + +public class SystemService(IFlowAnalyzer flowAnalyzer) { public static async Task StreamAsync(HttpContext context) { @@ -27,7 +29,7 @@ public class SystemService initialBytesSent += interfaceStats.BytesSent; initialBytesReceived += interfaceStats.BytesReceived; } - + // 等待1秒钟 await Task.Delay(1000, context.RequestAborted); @@ -65,6 +67,11 @@ public class SystemService } } } + + public FlowStatisticsDto FlowStatistics() + { + return flowAnalyzer.GetFlowStatistics(); + } } public static class SystemExtension @@ -73,5 +80,8 @@ public static class SystemExtension { app.MapGet("/api/gateway/system", async context => await SystemService.StreamAsync(context)); + + app.MapGet("/api/gateway/system/flow-statistics", + (SystemService systemService) => systemService.FlowStatistics()); } } \ No newline at end of file -- Gitee From c9b4718c2b88f1fc0eb5b5d1d91de8a13b726c8d Mon Sep 17 00:00:00 2001 From: token <239573049@qq.com> Date: Wed, 31 Jan 2024 19:09:48 +0800 Subject: [PATCH 2/2] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E6=B5=81=E9=87=8F?= =?UTF-8?q?=E7=9B=91=E6=8E=A7=20=E5=9F=BA=E4=BA=8EKestrel=E5=AE=9E?= =?UTF-8?q?=E7=8E=B0=E7=9B=91=E6=8E=A7=E5=87=BA=E5=85=A5=E5=8F=A3=E6=B5=81?= =?UTF-8?q?=E9=87=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 40 +++++++++- .../BackgroundServices/GatewayBackground.cs | 14 +++- src/Gateway/Dto/NetWorkDto.cs | 33 ++++++++ src/Gateway/Entities/SystemLoggerEntity.cs | 10 ++- .../FlowAnalytics/DelegatingDuplexPipe.cs | 10 +-- .../FlowAnalytics/FlowAnalyzeDuplexPipe.cs | 9 +-- .../FlowAnalytics/FlowAnalyzeMiddleware.cs | 9 +-- .../FlowAnalytics/FlowAnalyzeStream.cs | 26 +++--- .../Middlewares/FlowAnalytics/FlowAnalyzer.cs | 68 ++++++++-------- .../FlowAnalytics/IFlowAnalyzer.cs | 5 ++ src/Gateway/Program.cs | 17 +++- src/Gateway/Services/SystemService.cs | 41 +++++++--- src/Gateway/TokenBucket.cs | 37 +++++++++ web/src/pages/home/index.tsx | 79 +++++++++++-------- 14 files changed, 277 insertions(+), 121 deletions(-) create mode 100644 src/Gateway/TokenBucket.cs diff --git a/README.md b/README.md index 1880e08..7f54968 100644 --- a/README.md +++ b/README.md @@ -10,6 +10,7 @@ Gateway提供了基本的管理服务,提供简单的登录授权,和实时 - [x] dashboard监控 - [x] 静态文件服务代理 - [x] 穿透隧道功能 +- [x] 出入口流量监控 - [ ] 动态插件管理 ## 技术栈 @@ -29,7 +30,6 @@ Gateway提供了基本的管理服务,提供简单的登录授权,和实时 - semi 用于提供基础组件 - react-router-dom 用于路由管理 - ## 镜像执行指令 ```bash @@ -118,6 +118,7 @@ services: `/app/certificates`: - 这个是系统证书默认存放目录,如果映射了目录则需要提供自己的证书。 + ## 使用隧道 ```yml @@ -152,7 +153,42 @@ services: 增加`TUNNEL_PASSWORD`环境变量,默认为空不设置密码 -下载隧道客户端 https://gitee.com/hejiale010426/Gateway/releases 然后解压压缩包,打开appsettings.json文件修改Tunnel节点的Url,如果Gateway使用了TUNNEL_PASSWORD,那么你的URL应该是`https://localhost:8081/api/gateway/connect-h2?host=backend1.app&password=dd666666`, +下载隧道客户端 然后解压压缩包,打开appsettings.json文件修改Tunnel节点的Url,如果Gateway使用了TUNNEL_PASSWORD,那么你的URL应该是`https://localhost:8081/api/gateway/connect-h2?host=backend1.app&password=dd666666`, `host`是在集群中的集群端点的域名,这个域名就是定义到我们的隧道客户端的`host`的这个参数,请保证值的唯一性,当绑定集群的路由匹配成功以后则会访问图片定义的端点,如果并没有存在节点那么他会直接代理。 ![输入图片说明](img/%E9%9B%86%E7%BE%A4-01.png.png) + +## 出入流量监控 + +使用环境变量控制是否启用流量监控,使用环境变量`ENABLE_FLOW_MONITORING`设置我们是否启用流量监控,如果为空则默认启动流量监控,然后可以打开我们的控制面板查看流量监控的数据。 + +```yml + +services: + gateway-api: + image: registry.cn-shenzhen.aliyuncs.com/tokengo/gateway-api + restart: always + container_name: gateway-api + environment: + USER: root + PASS: Aa010426. + HTTPS_PASSWORD: dd666666 + HTTPS_FILE: gateway.pfx + ENABLE_FLOW_MONITORING: true + ports: + - 8200:8080 + volumes: + - ./data:/data/ + - ./app/certificates:/app/certificates + + gateway-web: + image: registry.cn-shenzhen.aliyuncs.com/tokengo/gateway-web + restart: always + container_name: gateway-web + privileged: true + environment: + api_url: http://token-ai.cn:8200 + ports: + - 10800:80 + +``` diff --git a/src/Gateway/BackgroundServices/GatewayBackground.cs b/src/Gateway/BackgroundServices/GatewayBackground.cs index 4db3a71..2d63e0a 100644 --- a/src/Gateway/BackgroundServices/GatewayBackground.cs +++ b/src/Gateway/BackgroundServices/GatewayBackground.cs @@ -1,8 +1,11 @@ -namespace Gateway.BackgroundServices; +using Gateway.Middlewares.FlowAnalytics; + +namespace Gateway.BackgroundServices; public class GatewayBackgroundService( GatewayService gatewayService, CertificateService certificateService, + IFlowAnalyzer flowAnalyzer, IFreeSql freeSql) : BackgroundService { protected override async Task ExecuteAsync(CancellationToken stoppingToken) @@ -42,15 +45,20 @@ public class GatewayBackgroundService( #endregion + var flow = flowAnalyzer.GetFlowStatistics(); + var systemLoggerEntity = new SystemLoggerEntity { RequestCount = GatewayMiddleware.CurrentRequestCount, ErrorRequestCount = GatewayMiddleware.CurrentErrorCount, - CurrentTime = DateTime.Now.AddDays(-1) + CurrentTime = DateTime.Now.AddDays(-1), + ReadRate = flow.TotalRead, + WriteRate = flow.TotalWrite }; - + // 清空请求计数器 GatewayMiddleware.ClearRequestCount(); + flowAnalyzer.CleanRecords(); await freeSql.Insert(systemLoggerEntity).ExecuteAffrowsAsync(); } diff --git a/src/Gateway/Dto/NetWorkDto.cs b/src/Gateway/Dto/NetWorkDto.cs index a0fdc69..729fbe3 100644 --- a/src/Gateway/Dto/NetWorkDto.cs +++ b/src/Gateway/Dto/NetWorkDto.cs @@ -60,4 +60,37 @@ public class NetWorkDto(long received, long sent) get => _totalErrorCount + CurrentErrorCount; set => value = _totalErrorCount; } + + + /// + /// 获取读取速率 + /// + public double ReadRate { get; init; } + + /// + /// 获取写入速率 + /// + public double WriteRate { get; init; } + + private double _totalRead; + + /// + /// 获取总读上行 + /// + public double TotalRead + { + get => (double)((decimal)_totalRead + (decimal)ReadRate); + init => _totalRead = value; + } + + private double _totalWrite; + + /// + /// 获取总下行 + /// + public double TotalWrite + { + get => (double)((decimal)_totalWrite + (decimal)WriteRate); + init => _totalWrite = value; + } } \ No newline at end of file diff --git a/src/Gateway/Entities/SystemLoggerEntity.cs b/src/Gateway/Entities/SystemLoggerEntity.cs index 89ad2ec..7bef5fe 100644 --- a/src/Gateway/Entities/SystemLoggerEntity.cs +++ b/src/Gateway/Entities/SystemLoggerEntity.cs @@ -17,5 +17,13 @@ public sealed class SystemLoggerEntity : Entity /// public DateTime CurrentTime { get; set; } - + /// + /// 获取读取速率 + /// + public double ReadRate { get; init; } + + /// + /// 获取写入速率 + /// + public double WriteRate { get; init; } } \ No newline at end of file diff --git a/src/Gateway/Middlewares/FlowAnalytics/DelegatingDuplexPipe.cs b/src/Gateway/Middlewares/FlowAnalytics/DelegatingDuplexPipe.cs index 632da93..a2cf5f8 100644 --- a/src/Gateway/Middlewares/FlowAnalytics/DelegatingDuplexPipe.cs +++ b/src/Gateway/Middlewares/FlowAnalytics/DelegatingDuplexPipe.cs @@ -8,8 +8,8 @@ namespace Gateway.Middlewares.FlowAnalytics; /// public class DelegatingDuplexPipe : IDuplexPipe, IAsyncDisposable where TDelegatingStream : DelegatingStream { - private bool disposed; - private readonly object syncRoot = new(); + private bool _disposed; + private readonly object _syncRoot = new(); /// /// 输入对象 @@ -52,13 +52,13 @@ public class DelegatingDuplexPipe : IDuplexPipe, IAsyncDispos /// public virtual async ValueTask DisposeAsync() { - lock (this.syncRoot) + lock (this._syncRoot) { - if (this.disposed == true) + if (this._disposed == true) { return; } - this.disposed = true; + this._disposed = true; } await this.Input.CompleteAsync(); diff --git a/src/Gateway/Middlewares/FlowAnalytics/FlowAnalyzeDuplexPipe.cs b/src/Gateway/Middlewares/FlowAnalytics/FlowAnalyzeDuplexPipe.cs index 0597ebc..92de095 100644 --- a/src/Gateway/Middlewares/FlowAnalytics/FlowAnalyzeDuplexPipe.cs +++ b/src/Gateway/Middlewares/FlowAnalytics/FlowAnalyzeDuplexPipe.cs @@ -2,10 +2,5 @@ using System.IO.Pipelines; namespace Gateway.Middlewares.FlowAnalytics; -sealed class FlowAnalyzeDuplexPipe : DelegatingDuplexPipe -{ - public FlowAnalyzeDuplexPipe(IDuplexPipe duplexPipe, IFlowAnalyzer flowAnalyzer) : - base(duplexPipe, stream => new FlowAnalyzeStream(stream, flowAnalyzer)) - { - } -} \ No newline at end of file +sealed class FlowAnalyzeDuplexPipe(IDuplexPipe duplexPipe, IFlowAnalyzer flowAnalyzer) + : DelegatingDuplexPipe(duplexPipe, stream => new FlowAnalyzeStream(stream, flowAnalyzer)); \ No newline at end of file diff --git a/src/Gateway/Middlewares/FlowAnalytics/FlowAnalyzeMiddleware.cs b/src/Gateway/Middlewares/FlowAnalytics/FlowAnalyzeMiddleware.cs index 8cf53b2..60c0152 100644 --- a/src/Gateway/Middlewares/FlowAnalytics/FlowAnalyzeMiddleware.cs +++ b/src/Gateway/Middlewares/FlowAnalytics/FlowAnalyzeMiddleware.cs @@ -2,15 +2,8 @@ namespace Gateway.Middlewares.FlowAnalytics; -public class FlowAnalyzeMiddleware : IKestrelMiddleware +public sealed class FlowAnalyzeMiddleware(IFlowAnalyzer flowAnalyzer) : IKestrelMiddleware { - private readonly IFlowAnalyzer flowAnalyzer; - - public FlowAnalyzeMiddleware(IFlowAnalyzer flowAnalyzer) - { - this.flowAnalyzer = flowAnalyzer; - } - public async Task InvokeAsync(ConnectionDelegate next, ConnectionContext context) { var oldTransport = context.Transport; diff --git a/src/Gateway/Middlewares/FlowAnalytics/FlowAnalyzeStream.cs b/src/Gateway/Middlewares/FlowAnalytics/FlowAnalyzeStream.cs index 1d3f32d..1ac61a7 100644 --- a/src/Gateway/Middlewares/FlowAnalytics/FlowAnalyzeStream.cs +++ b/src/Gateway/Middlewares/FlowAnalytics/FlowAnalyzeStream.cs @@ -1,65 +1,57 @@ namespace Gateway.Middlewares.FlowAnalytics; -sealed class FlowAnalyzeStream : DelegatingStream +sealed class FlowAnalyzeStream(Stream inner, IFlowAnalyzer flowAnalyzer) : DelegatingStream(inner) { - private readonly IFlowAnalyzer flowAnalyzer; - - public FlowAnalyzeStream(Stream inner, IFlowAnalyzer flowAnalyzer) - : base(inner) - { - this.flowAnalyzer = flowAnalyzer; - } - public override int Read(byte[] buffer, int offset, int count) { int read = base.Read(buffer, offset, count); - this.flowAnalyzer.OnFlow(FlowType.Read, read); + flowAnalyzer.OnFlow(FlowType.Read, read); return read; } public override int Read(Span destination) { int read = base.Read(destination); - this.flowAnalyzer.OnFlow(FlowType.Read, read); + flowAnalyzer.OnFlow(FlowType.Read, read); return read; } public override async Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) { int read = await base.ReadAsync(buffer.AsMemory(offset, count), cancellationToken); - this.flowAnalyzer.OnFlow(FlowType.Read, read); + flowAnalyzer.OnFlow(FlowType.Read, read); return read; } public override async ValueTask ReadAsync(Memory destination, CancellationToken cancellationToken = default) { int read = await base.ReadAsync(destination, cancellationToken); - this.flowAnalyzer.OnFlow(FlowType.Read, read); + flowAnalyzer.OnFlow(FlowType.Read, read); return read; } public override void Write(byte[] buffer, int offset, int count) { - this.flowAnalyzer.OnFlow(FlowType.Wirte, count); + flowAnalyzer.OnFlow(FlowType.Wirte, count); base.Write(buffer, offset, count); } public override void Write(ReadOnlySpan source) { - this.flowAnalyzer.OnFlow(FlowType.Wirte, source.Length); + flowAnalyzer.OnFlow(FlowType.Wirte, source.Length); base.Write(source); } public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) { - this.flowAnalyzer.OnFlow(FlowType.Wirte, count); + flowAnalyzer.OnFlow(FlowType.Wirte, count); return base.WriteAsync(buffer, offset, count, cancellationToken); } public override ValueTask WriteAsync(ReadOnlyMemory source, CancellationToken cancellationToken = default) { - this.flowAnalyzer.OnFlow(FlowType.Wirte, source.Length); + flowAnalyzer.OnFlow(FlowType.Wirte, source.Length); return base.WriteAsync(source, cancellationToken); } } \ No newline at end of file diff --git a/src/Gateway/Middlewares/FlowAnalytics/FlowAnalyzer.cs b/src/Gateway/Middlewares/FlowAnalytics/FlowAnalyzer.cs index 55636f4..b223d9c 100644 --- a/src/Gateway/Middlewares/FlowAnalytics/FlowAnalyzer.cs +++ b/src/Gateway/Middlewares/FlowAnalytics/FlowAnalyzer.cs @@ -3,8 +3,8 @@ namespace Gateway.Middlewares.FlowAnalytics; public sealed class FlowAnalyzer : IFlowAnalyzer { private const int INTERVAL_SECONDS = 5; - private readonly FlowQueues readQueues = new(INTERVAL_SECONDS); - private readonly FlowQueues writeQueues = new(INTERVAL_SECONDS); + private readonly FlowQueues _readQueues = new(INTERVAL_SECONDS); + private readonly FlowQueues _writeQueues = new(INTERVAL_SECONDS); /// /// 收到数据 @@ -15,14 +15,20 @@ public sealed class FlowAnalyzer : IFlowAnalyzer { if (flowType == FlowType.Read) { - this.readQueues.OnFlow(length); + _readQueues.OnFlow(length); } else { - this.writeQueues.OnFlow(length); + _writeQueues.OnFlow(length); } } + public void CleanRecords() + { + _readQueues.CleanRecords(); + _writeQueues.CleanRecords(); + } + /// /// 获取流量分析 /// @@ -31,40 +37,35 @@ public sealed class FlowAnalyzer : IFlowAnalyzer { return new FlowStatisticsDto { - TotalRead = this.readQueues.TotalBytes, - TotalWrite = this.writeQueues.TotalBytes, - ReadRate = this.readQueues.GetRate(), - WriteRate = this.writeQueues.GetRate() + TotalRead = _readQueues.TotalBytes, + TotalWrite = _writeQueues.TotalBytes, + ReadRate = _readQueues.GetRate(), + WriteRate = _writeQueues.GetRate() }; } - private class FlowQueues + private class FlowQueues(int intervalSeconds) { - private int cleaning = 0; - private long totalBytes = 0L; - private record QueueItem(long Ticks, int Length); - private readonly ConcurrentQueue queues = new(); + private int _cleaning = 0; + private long _totalBytes = 0L; - private readonly int intervalSeconds; + private record QueueItem(long Ticks, int Length); - public long TotalBytes => this.totalBytes; + private readonly ConcurrentQueue _queues = new(); - public FlowQueues(int intervalSeconds) - { - this.intervalSeconds = intervalSeconds; - } + public long TotalBytes => this._totalBytes; public void OnFlow(int length) { - Interlocked.Add(ref this.totalBytes, length); + Interlocked.Add(ref this._totalBytes, length); this.CleanInvalidRecords(); - this.queues.Enqueue(new QueueItem(Environment.TickCount64, length)); + this._queues.Enqueue(new QueueItem(Environment.TickCount64, length)); } public double GetRate() { - this.CleanInvalidRecords(); - return (double)this.queues.Sum(item => item.Length) / this.intervalSeconds; + CleanInvalidRecords(); + return (double)_queues.Sum(item => item.Length) / intervalSeconds; } /// @@ -73,26 +74,31 @@ public sealed class FlowAnalyzer : IFlowAnalyzer /// private bool CleanInvalidRecords() { - if (Interlocked.CompareExchange(ref this.cleaning, 1, 0) != 0) + if (Interlocked.CompareExchange(ref this._cleaning, 1, 0) != 0) { return false; } var ticks = Environment.TickCount64; - while (this.queues.TryPeek(out var item)) + while (_queues.TryPeek(out var item)) { - if (ticks - item.Ticks < this.intervalSeconds * 1000) + if (ticks - item.Ticks < intervalSeconds * 1000) { break; } - else - { - this.queues.TryDequeue(out _); - } + + _queues.TryDequeue(out _); } - Interlocked.Exchange(ref this.cleaning, 0); + Interlocked.Exchange(ref _cleaning, 0); return true; } + + + public void CleanRecords() + { + _queues.Clear(); + Interlocked.Exchange(ref _cleaning, 0); + } } } \ No newline at end of file diff --git a/src/Gateway/Middlewares/FlowAnalytics/IFlowAnalyzer.cs b/src/Gateway/Middlewares/FlowAnalytics/IFlowAnalyzer.cs index 8f680d9..90d83b2 100644 --- a/src/Gateway/Middlewares/FlowAnalytics/IFlowAnalyzer.cs +++ b/src/Gateway/Middlewares/FlowAnalytics/IFlowAnalyzer.cs @@ -14,4 +14,9 @@ public interface IFlowAnalyzer /// /// FlowStatisticsDto GetFlowStatistics(); + + /// + /// 清楚记录 + /// + void CleanRecords(); } \ No newline at end of file diff --git a/src/Gateway/Program.cs b/src/Gateway/Program.cs index 8e9d27f..cae5217 100644 --- a/src/Gateway/Program.cs +++ b/src/Gateway/Program.cs @@ -1,5 +1,7 @@ #region FreeSql类型转换 +using Gateway.Middlewares.FlowAnalytics; + Utils.TypeHandlers.TryAdd(typeof(Dictionary), new StringJsonHandler>()); Utils.TypeHandlers.TryAdd(typeof(List), new StringJsonHandler>()); Utils.TypeHandlers.TryAdd(typeof(string[]), new StringJsonHandler()); @@ -30,6 +32,8 @@ builder.Configuration.GetSection(GatewayOptions.Name) // 获取环境变量 var https_password = Environment.GetEnvironmentVariable("HTTPS_PASSWORD") ?? "dd666666"; var https_file = Environment.GetEnvironmentVariable("HTTPS_FILE") ?? "gateway.pfx"; +var enable_flow_monitoring = Environment.GetEnvironmentVariable("ENABLE_FLOW_MONITORING") ?? "true"; + builder.WebHost.UseKestrel(options => { @@ -69,12 +73,21 @@ builder.WebHost.ConfigureKestrel(kestrel => { portOptions.Protocols = HttpProtocols.Http1AndHttp2AndHttp3; portOptions.UseHttps(); + + if (enable_flow_monitoring == "true") + { + portOptions.Use(); + } }); kestrel.ListenAnyIP(8080, portOptions => { portOptions.Protocols = HttpProtocols.Http1AndHttp2; - portOptions.Use(); + + if (enable_flow_monitoring == "true") + { + portOptions.Use(); + } }); }); @@ -140,6 +153,8 @@ builder.Services.AddReverseProxy() builder.Services.AddTunnelServices(); +builder.Services.AddSingleton(); + var app = builder.Build(); app.UseCors("AllowAll"); diff --git a/src/Gateway/Services/SystemService.cs b/src/Gateway/Services/SystemService.cs index dbbde70..1850c1d 100644 --- a/src/Gateway/Services/SystemService.cs +++ b/src/Gateway/Services/SystemService.cs @@ -1,14 +1,24 @@ -namespace Gateway.Services; +using Gateway.Middlewares.FlowAnalytics; -public class SystemService +namespace Gateway.Services; + +/// +/// 系统服务 +/// +public static class SystemService { - public static async Task StreamAsync(HttpContext context) + public static async Task StreamAsync(HttpContext context, IFreeSql sql, IFlowAnalyzer flowAnalyzer) { // 使用sse,返回响应头 context.Response.Headers.ContentType = "text/event-stream"; var i = 0; + var totalErrorCount = (double)await sql.Select().SumAsync(x => x.ErrorRequestCount); + var totalRequestCount = (double)await sql.Select().SumAsync(x => x.RequestCount); + var totalRead = (double)await sql.Select().SumAsync(x => x.ReadRate); + var totalWrite = (double)await sql.Select().SumAsync(x => x.WriteRate); + while (!context.RequestAborted.IsCancellationRequested) { // 获取所有网络接口 @@ -27,7 +37,7 @@ public class SystemService initialBytesSent += interfaceStats.BytesSent; initialBytesReceived += interfaceStats.BytesReceived; } - + // 等待1秒钟 await Task.Delay(1000, context.RequestAborted); @@ -49,8 +59,18 @@ public class SystemService var totalBytesSentIn1Sec = bytesSentAfter1Sec - initialBytesSent; var totalBytesReceivedIn1Sec = bytesReceivedAfter1Sec - initialBytesReceived; + var flowStatisticsDto = flowAnalyzer.GetFlowStatistics(); + var data = - $"data:{JsonSerializer.Serialize(new NetWorkDto(totalBytesReceivedIn1Sec, totalBytesSentIn1Sec))}\n\n"; + $"data:{JsonSerializer.Serialize(new NetWorkDto(totalBytesReceivedIn1Sec, totalBytesSentIn1Sec) + { + TotalErrorCount = totalErrorCount, + TotalRequestCount = totalRequestCount, + ReadRate = flowStatisticsDto.TotalRead, + WriteRate = flowStatisticsDto.TotalWrite, + TotalWrite = totalRead, + TotalRead = totalWrite + })}\n\n"; // 将数据写入到响应流中 await context.Response.WriteAsync(data, context.RequestAborted); @@ -58,8 +78,8 @@ public class SystemService i++; - // 只维持5秒的连接 - if (i > 5) + // 只维持10秒的连接 + if (i > 10) { break; } @@ -71,10 +91,7 @@ public static class SystemExtension { public static void MapSystem(this IEndpointRouteBuilder app) { - app.MapGet("/api/gateway/system", async context => - await SystemService.StreamAsync(context)); - - app.MapGet("/api/gateway/system/flow-statistics", - (SystemService systemService) => systemService.FlowStatistics()); + app.MapGet("/api/gateway/system", async (HttpContext context, IFreeSql sql, IFlowAnalyzer flowAnalyzer) => + await SystemService.StreamAsync(context, sql, flowAnalyzer)); } } \ No newline at end of file diff --git a/src/Gateway/TokenBucket.cs b/src/Gateway/TokenBucket.cs new file mode 100644 index 0000000..ec015af --- /dev/null +++ b/src/Gateway/TokenBucket.cs @@ -0,0 +1,37 @@ +namespace Gateway; + +/// +/// 令牌桶,用于限制流量。 +/// +public struct TokenBucket(int maxTokens) +{ + private readonly int _maxTokens = maxTokens; + private int _tokens = maxTokens; + private DateTime _lastCheck = DateTime.UtcNow; + + private bool GetTokens(int count) + { + var now = DateTime.UtcNow; + if ((now - _lastCheck).TotalSeconds > 1) + { + _tokens = _maxTokens; + _lastCheck = now; + } + + if (_tokens < count) return false; + + _tokens -= count; + + return true; + } + + public async ValueTask WaitForTokens(int count) + { + while (!GetTokens(count)) + { + var now = DateTime.UtcNow; + var timeToNextToken = (now - _lastCheck).Microseconds; + await Task.Delay(timeToNextToken); + } + } +} \ No newline at end of file diff --git a/web/src/pages/home/index.tsx b/web/src/pages/home/index.tsx index 88de3ba..4024e69 100644 --- a/web/src/pages/home/index.tsx +++ b/web/src/pages/home/index.tsx @@ -8,6 +8,10 @@ export default function Home() { const [totalErrorCount, setTotalErrorCount] = useState(0); const [currentRequestCount, setCurrentRequestCount] = useState(0); const [currentErrorCount, setCurrentErrorCount] = useState(0); + const [readRate, setReadRate] = useState(""); + const [writeRate, setWriteRate] = useState(""); + const [totalRead, setTotalRead] = useState(""); + const [totalWrite, setTotalWrite] = useState(""); useEffect(() => { var chartDom = document.getElementById('network')!; @@ -58,23 +62,8 @@ export default function Home() { formatter: function (params: any) { // 将字节转换KB MB GB let value = params.value; - let unit = 'B'; - if (value > 1024) { - value = value / 1024; - unit = 'KB'; - } - - if (value > 1024) { - value = value / 1024; - unit = 'MB'; - } - - if (value > 1024) { - value = value / 1024; - unit = 'GB'; - } - - return value.toFixed(2) + unit; + + return formatBytes(value, 2); } } }, @@ -92,23 +81,8 @@ export default function Home() { formatter: function (params: any) { // 将字节转换KB MB GB let value = params.value; - let unit = 'B'; - if (value > 1024) { - value = value / 1024; - unit = 'KB'; - } - - if (value > 1024) { - value = value / 1024; - unit = 'MB'; - } - - if (value > 1024) { - value = value / 1024; - unit = 'GB'; - } - - return value.toFixed(2) + unit; + + return formatBytes(value, 2); } } } @@ -135,6 +109,12 @@ export default function Home() { setTotalErrorCount(chunk.TotalErrorCount); setTotalRequestCount(chunk.TotalRequestCount); + setReadRate(formatBytes(chunk.ReadRate)); + setWriteRate(formatBytes(chunk.WriteRate)); + setTotalRead(formatBytes(chunk.TotalRead)); + setTotalWrite(formatBytes(chunk.TotalWrite)); + + option && myChart.setOption(option); myChart.resize(); @@ -156,6 +136,15 @@ export default function Home() { }; }, []); // 空依赖数组意味着这个effect只会在组件挂载时执行一次 + function formatBytes(bytes: number, decimals = 2) { + if (bytes === 0) return '0 Bytes'; + const k = 1024; + const dm = decimals < 0 ? 0 : decimals; + const sizes = ['Bytes', 'KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB', 'YB']; + const i = Math.floor(Math.log(bytes) / Math.log(k)); + return parseFloat((bytes / Math.pow(k, i)).toFixed(dm)) + ' ' + sizes[i]; + } + return (
+ + + + 入口流量总计:{totalRead} + + + + + 出口流量总计:{totalWrite} + + + + + 当天入口流量:{readRate} + + + + + 当天出口流量:{writeRate} + + +