From 95b84eb3bd86a1cab5230d54774e6820ea6617f3 Mon Sep 17 00:00:00 2001
From: xiaolipro <2357729423@qq.com>
Date: Wed, 31 Jan 2024 15:53:07 +0800
Subject: [PATCH 1/2] =?UTF-8?q?=E6=94=AF=E6=8C=81=E6=B5=81=E9=87=8F?=
=?UTF-8?q?=E5=88=86=E6=9E=90?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
src/Gateway/Dto/FlowStatisticsDto.cs | 27 +++
.../ConnectionBuilderExtensions.cs | 33 ++++
.../FlowAnalytics/DelegatingDuplexPipe.cs | 67 +++++++
.../FlowAnalytics/DelegatingStream.cs | 136 +++++++++++++
.../FlowAnalytics/DuplexPipeStream.cs | 179 ++++++++++++++++++
.../FlowAnalytics/FlowAnalyzeDuplexPipe.cs | 11 ++
.../FlowAnalytics/FlowAnalyzeMiddleware.cs | 28 +++
.../FlowAnalytics/FlowAnalyzeStream.cs | 65 +++++++
.../Middlewares/FlowAnalytics/FlowAnalyzer.cs | 98 ++++++++++
.../Middlewares/FlowAnalytics/FlowType.cs | 17 ++
.../FlowAnalytics/IFlowAnalyzer.cs | 17 ++
.../Middlewares/FlowAnalytics/TaskToApm.cs | 100 ++++++++++
src/Gateway/Middlewares/IKestrelMiddleware.cs | 11 ++
src/Gateway/Program.cs | 14 +-
src/Gateway/Services/SystemService.cs | 16 +-
15 files changed, 814 insertions(+), 5 deletions(-)
create mode 100644 src/Gateway/Dto/FlowStatisticsDto.cs
create mode 100644 src/Gateway/Middlewares/ConnectionBuilderExtensions.cs
create mode 100644 src/Gateway/Middlewares/FlowAnalytics/DelegatingDuplexPipe.cs
create mode 100644 src/Gateway/Middlewares/FlowAnalytics/DelegatingStream.cs
create mode 100644 src/Gateway/Middlewares/FlowAnalytics/DuplexPipeStream.cs
create mode 100644 src/Gateway/Middlewares/FlowAnalytics/FlowAnalyzeDuplexPipe.cs
create mode 100644 src/Gateway/Middlewares/FlowAnalytics/FlowAnalyzeMiddleware.cs
create mode 100644 src/Gateway/Middlewares/FlowAnalytics/FlowAnalyzeStream.cs
create mode 100644 src/Gateway/Middlewares/FlowAnalytics/FlowAnalyzer.cs
create mode 100644 src/Gateway/Middlewares/FlowAnalytics/FlowType.cs
create mode 100644 src/Gateway/Middlewares/FlowAnalytics/IFlowAnalyzer.cs
create mode 100644 src/Gateway/Middlewares/FlowAnalytics/TaskToApm.cs
create mode 100644 src/Gateway/Middlewares/IKestrelMiddleware.cs
diff --git a/src/Gateway/Dto/FlowStatisticsDto.cs b/src/Gateway/Dto/FlowStatisticsDto.cs
new file mode 100644
index 0000000..7d677c2
--- /dev/null
+++ b/src/Gateway/Dto/FlowStatisticsDto.cs
@@ -0,0 +1,27 @@
+namespace Gateway.Dto;
+
+///
+/// 流量统计
+///
+public record FlowStatisticsDto
+{
+ ///
+ /// 获取总读上行
+ ///
+ public long TotalRead { get; init; }
+
+ ///
+ /// 获取总下行
+ ///
+ public long TotalWrite { get; init; }
+
+ ///
+ /// 获取读取速率
+ ///
+ public double ReadRate { get; init; }
+
+ ///
+ /// 获取写入速率
+ ///
+ public double WriteRate { get; init; }
+}
\ No newline at end of file
diff --git a/src/Gateway/Middlewares/ConnectionBuilderExtensions.cs b/src/Gateway/Middlewares/ConnectionBuilderExtensions.cs
new file mode 100644
index 0000000..25fa1fb
--- /dev/null
+++ b/src/Gateway/Middlewares/ConnectionBuilderExtensions.cs
@@ -0,0 +1,33 @@
+using Microsoft.AspNetCore.Connections;
+
+namespace Gateway.Middlewares;
+
+///
+/// IConnectionBuilder扩展
+///
+public static class ConnectionBuilderExtensions
+{
+ ///
+ /// 使用Kestrel中间件
+ ///
+ ///
+ ///
+ ///
+ public static IConnectionBuilder Use(this IConnectionBuilder builder)
+ where TMiddleware : IKestrelMiddleware
+ {
+ var middleware = ActivatorUtilities.GetServiceOrCreateInstance(builder.ApplicationServices);
+ return builder.Use(middleware);
+ }
+
+ ///
+ /// 使用Kestrel中间件
+ ///
+ ///
+ ///
+ ///
+ public static IConnectionBuilder Use(this IConnectionBuilder builder, IKestrelMiddleware middleware)
+ {
+ return builder.Use(next => context => middleware.InvokeAsync(next, context));
+ }
+}
\ No newline at end of file
diff --git a/src/Gateway/Middlewares/FlowAnalytics/DelegatingDuplexPipe.cs b/src/Gateway/Middlewares/FlowAnalytics/DelegatingDuplexPipe.cs
new file mode 100644
index 0000000..632da93
--- /dev/null
+++ b/src/Gateway/Middlewares/FlowAnalytics/DelegatingDuplexPipe.cs
@@ -0,0 +1,67 @@
+using System.IO.Pipelines;
+
+namespace Gateway.Middlewares.FlowAnalytics;
+
+///
+/// 基于委托流的DuplexPipe
+///
+///
+public class DelegatingDuplexPipe : IDuplexPipe, IAsyncDisposable where TDelegatingStream : DelegatingStream
+{
+ private bool disposed;
+ private readonly object syncRoot = new();
+
+ ///
+ /// 输入对象
+ ///
+ public PipeReader Input { get; }
+
+ ///
+ /// 输出对象
+ ///
+ public PipeWriter Output { get; }
+
+ ///
+ /// 基于委托流的DuplexPipe
+ ///
+ ///
+ /// 委托流工厂
+ public DelegatingDuplexPipe(IDuplexPipe duplexPipe, Func delegatingStreamFactory) :
+ this(duplexPipe, delegatingStreamFactory, new StreamPipeReaderOptions(leaveOpen: true), new StreamPipeWriterOptions(leaveOpen: true))
+ {
+ }
+
+ ///
+ /// 基于委托流的DuplexPipe
+ ///
+ ///
+ /// 委托流工厂
+ ///
+ ///
+ public DelegatingDuplexPipe(IDuplexPipe duplexPipe, Func delegatingStreamFactory, StreamPipeReaderOptions readerOptions, StreamPipeWriterOptions writerOptions)
+ {
+ var duplexPipeStream = new DuplexPipeStream(duplexPipe);
+ var delegatingStream = delegatingStreamFactory(duplexPipeStream);
+ this.Input = PipeReader.Create(delegatingStream, readerOptions);
+ this.Output = PipeWriter.Create(delegatingStream, writerOptions);
+ }
+
+ ///
+ /// 释放资源
+ ///
+ ///
+ public virtual async ValueTask DisposeAsync()
+ {
+ lock (this.syncRoot)
+ {
+ if (this.disposed == true)
+ {
+ return;
+ }
+ this.disposed = true;
+ }
+
+ await this.Input.CompleteAsync();
+ await this.Output.CompleteAsync();
+ }
+}
\ No newline at end of file
diff --git a/src/Gateway/Middlewares/FlowAnalytics/DelegatingStream.cs b/src/Gateway/Middlewares/FlowAnalytics/DelegatingStream.cs
new file mode 100644
index 0000000..3e4abc9
--- /dev/null
+++ b/src/Gateway/Middlewares/FlowAnalytics/DelegatingStream.cs
@@ -0,0 +1,136 @@
+namespace Gateway.Middlewares.FlowAnalytics;
+
+///
+/// 委托流
+///
+public abstract class DelegatingStream : Stream
+{
+ ///
+ /// 获取所包装的流对象
+ ///
+ protected Stream Inner { get; }
+
+ ///
+ /// 委托流
+ ///
+ ///
+ public DelegatingStream(Stream inner)
+ {
+ Inner = inner;
+ }
+
+ ///
+ public override bool CanRead => Inner.CanRead;
+
+ ///
+ public override bool CanSeek => Inner.CanSeek;
+
+ ///
+ public override bool CanWrite => Inner.CanWrite;
+
+ ///
+ public override long Length => Inner.Length;
+
+ ///
+ public override long Position
+ {
+ get => Inner.Position;
+ set => Inner.Position = value;
+ }
+
+ ///
+ public override void Flush()
+ {
+ Inner.Flush();
+ }
+
+ ///
+ public override Task FlushAsync(CancellationToken cancellationToken)
+ {
+ return Inner.FlushAsync(cancellationToken);
+ }
+
+ ///
+ public override int Read(byte[] buffer, int offset, int count)
+ {
+ return Inner.Read(buffer, offset, count);
+ }
+
+ ///
+ public override int Read(Span destination)
+ {
+ return Inner.Read(destination);
+ }
+
+ ///
+ public override Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
+ {
+ return Inner.ReadAsync(buffer, offset, count, cancellationToken);
+ }
+
+ ///
+ public override ValueTask ReadAsync(Memory destination, CancellationToken cancellationToken = default)
+ {
+ return Inner.ReadAsync(destination, cancellationToken);
+ }
+
+ ///
+ public override long Seek(long offset, SeekOrigin origin)
+ {
+ return Inner.Seek(offset, origin);
+ }
+
+ ///
+ public override void SetLength(long value)
+ {
+ Inner.SetLength(value);
+ }
+
+ ///
+ public override void Write(byte[] buffer, int offset, int count)
+ {
+ Inner.Write(buffer, offset, count);
+ }
+
+ ///
+ public override void Write(ReadOnlySpan source)
+ {
+ Inner.Write(source);
+ }
+
+ ///
+ public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
+ {
+ return Inner.WriteAsync(buffer, offset, count, cancellationToken);
+ }
+
+ ///
+ public override ValueTask WriteAsync(ReadOnlyMemory source, CancellationToken cancellationToken = default)
+ {
+ return Inner.WriteAsync(source, cancellationToken);
+ }
+
+ ///
+ public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state)
+ {
+ return TaskToApm.Begin(ReadAsync(buffer, offset, count), callback, state);
+ }
+
+ ///
+ public override int EndRead(IAsyncResult asyncResult)
+ {
+ return TaskToApm.End(asyncResult);
+ }
+
+ ///
+ public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state)
+ {
+ return TaskToApm.Begin(WriteAsync(buffer, offset, count), callback, state);
+ }
+
+ ///
+ public override void EndWrite(IAsyncResult asyncResult)
+ {
+ TaskToApm.End(asyncResult);
+ }
+}
\ No newline at end of file
diff --git a/src/Gateway/Middlewares/FlowAnalytics/DuplexPipeStream.cs b/src/Gateway/Middlewares/FlowAnalytics/DuplexPipeStream.cs
new file mode 100644
index 0000000..85806cb
--- /dev/null
+++ b/src/Gateway/Middlewares/FlowAnalytics/DuplexPipeStream.cs
@@ -0,0 +1,179 @@
+using System.Buffers;
+using System.IO.Pipelines;
+using System.Runtime.CompilerServices;
+
+namespace Gateway.Middlewares.FlowAnalytics;
+
+///
+/// IDuplexPipe封装为Stream
+///
+public class DuplexPipeStream : Stream
+{
+ private readonly PipeReader input;
+ private readonly PipeWriter output;
+ private readonly bool throwOnCancelled;
+ private volatile bool cancelCalled;
+
+ ///
+ /// IDuplexPipe封装为Stream
+ ///
+ ///
+ ///
+ public DuplexPipeStream(IDuplexPipe duplexPipe, bool throwOnCancelled = false)
+ {
+ input = duplexPipe.Input;
+ output = duplexPipe.Output;
+ this.throwOnCancelled = throwOnCancelled;
+ }
+
+ ///
+ /// 取消挂起的读取操作
+ ///
+ public void CancelPendingRead()
+ {
+ cancelCalled = true;
+ input.CancelPendingRead();
+ }
+
+ ///
+ public override bool CanRead => true;
+
+ ///
+ public override bool CanSeek => false;
+
+ ///
+ public override bool CanWrite => true;
+
+ ///
+ public override long Length => throw new NotSupportedException();
+
+ ///
+ public override long Position
+ {
+ get => throw new NotSupportedException();
+ set => throw new NotSupportedException();
+ }
+
+ ///
+ public override long Seek(long offset, SeekOrigin origin)
+ {
+ throw new NotSupportedException();
+ }
+
+ ///
+ public override void SetLength(long value)
+ {
+ throw new NotSupportedException();
+ }
+
+ ///
+ public override int Read(byte[] buffer, int offset, int count)
+ {
+ var task = ReadAsyncInternal(new Memory(buffer, offset, count), default);
+ return task.IsCompleted ? task.Result : task.AsTask().GetAwaiter().GetResult();
+ }
+
+ ///
+ public override Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken = default)
+ {
+ return ReadAsyncInternal(new Memory(buffer, offset, count), cancellationToken).AsTask();
+ }
+
+ ///
+ public override ValueTask ReadAsync(Memory destination, CancellationToken cancellationToken = default)
+ {
+ return ReadAsyncInternal(destination, cancellationToken);
+ }
+
+ ///
+ public override void Write(byte[] buffer, int offset, int count)
+ {
+ WriteAsync(buffer, offset, count).GetAwaiter().GetResult();
+ }
+
+ ///
+ public override async Task WriteAsync(byte[]? buffer, int offset, int count, CancellationToken cancellationToken)
+ {
+ await output.WriteAsync(buffer.AsMemory(offset, count), cancellationToken);
+ }
+
+ ///
+ public override async ValueTask WriteAsync(ReadOnlyMemory source, CancellationToken cancellationToken = default)
+ {
+ await output.WriteAsync(source, cancellationToken);
+ }
+
+ ///
+ public override void Flush()
+ {
+ FlushAsync(CancellationToken.None).GetAwaiter().GetResult();
+ }
+
+ ///
+ public override async Task FlushAsync(CancellationToken cancellationToken)
+ {
+ await output.FlushAsync(cancellationToken);
+ }
+
+
+ [AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder<>))]
+ private async ValueTask ReadAsyncInternal(Memory destination, CancellationToken cancellationToken)
+ {
+ while (true)
+ {
+ var result = await input.ReadAsync(cancellationToken);
+ var readableBuffer = result.Buffer;
+ try
+ {
+ if (throwOnCancelled && result.IsCanceled && cancelCalled)
+ {
+ // Reset the bool
+ cancelCalled = false;
+ throw new OperationCanceledException();
+ }
+
+ if (!readableBuffer.IsEmpty)
+ {
+ // buffer.Count is int
+ var count = (int)Math.Min(readableBuffer.Length, destination.Length);
+ readableBuffer = readableBuffer.Slice(0, count);
+ readableBuffer.CopyTo(destination.Span);
+ return count;
+ }
+
+ if (result.IsCompleted)
+ {
+ return 0;
+ }
+ }
+ finally
+ {
+ input.AdvanceTo(readableBuffer.End, readableBuffer.End);
+ }
+ }
+ }
+
+ ///
+ public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state)
+ {
+ return TaskToApm.Begin(ReadAsync(buffer, offset, count), callback, state);
+ }
+
+ ///
+ public override int EndRead(IAsyncResult asyncResult)
+ {
+ return TaskToApm.End(asyncResult);
+ }
+
+ ///
+ public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state)
+ {
+ return TaskToApm.Begin(WriteAsync(buffer, offset, count), callback, state);
+ }
+
+ ///
+ public override void EndWrite(IAsyncResult asyncResult)
+ {
+ TaskToApm.End(asyncResult);
+ }
+}
\ No newline at end of file
diff --git a/src/Gateway/Middlewares/FlowAnalytics/FlowAnalyzeDuplexPipe.cs b/src/Gateway/Middlewares/FlowAnalytics/FlowAnalyzeDuplexPipe.cs
new file mode 100644
index 0000000..0597ebc
--- /dev/null
+++ b/src/Gateway/Middlewares/FlowAnalytics/FlowAnalyzeDuplexPipe.cs
@@ -0,0 +1,11 @@
+using System.IO.Pipelines;
+
+namespace Gateway.Middlewares.FlowAnalytics;
+
+sealed class FlowAnalyzeDuplexPipe : DelegatingDuplexPipe
+{
+ public FlowAnalyzeDuplexPipe(IDuplexPipe duplexPipe, IFlowAnalyzer flowAnalyzer) :
+ base(duplexPipe, stream => new FlowAnalyzeStream(stream, flowAnalyzer))
+ {
+ }
+}
\ No newline at end of file
diff --git a/src/Gateway/Middlewares/FlowAnalytics/FlowAnalyzeMiddleware.cs b/src/Gateway/Middlewares/FlowAnalytics/FlowAnalyzeMiddleware.cs
new file mode 100644
index 0000000..8cf53b2
--- /dev/null
+++ b/src/Gateway/Middlewares/FlowAnalytics/FlowAnalyzeMiddleware.cs
@@ -0,0 +1,28 @@
+using Microsoft.AspNetCore.Connections;
+
+namespace Gateway.Middlewares.FlowAnalytics;
+
+public class FlowAnalyzeMiddleware : IKestrelMiddleware
+{
+ private readonly IFlowAnalyzer flowAnalyzer;
+
+ public FlowAnalyzeMiddleware(IFlowAnalyzer flowAnalyzer)
+ {
+ this.flowAnalyzer = flowAnalyzer;
+ }
+
+ public async Task InvokeAsync(ConnectionDelegate next, ConnectionContext context)
+ {
+ var oldTransport = context.Transport;
+ try
+ {
+ await using var duplexPipe = new FlowAnalyzeDuplexPipe(context.Transport, flowAnalyzer);
+ context.Transport = duplexPipe;
+ await next(context);
+ }
+ finally
+ {
+ context.Transport = oldTransport;
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/Gateway/Middlewares/FlowAnalytics/FlowAnalyzeStream.cs b/src/Gateway/Middlewares/FlowAnalytics/FlowAnalyzeStream.cs
new file mode 100644
index 0000000..1d3f32d
--- /dev/null
+++ b/src/Gateway/Middlewares/FlowAnalytics/FlowAnalyzeStream.cs
@@ -0,0 +1,65 @@
+namespace Gateway.Middlewares.FlowAnalytics;
+
+sealed class FlowAnalyzeStream : DelegatingStream
+{
+ private readonly IFlowAnalyzer flowAnalyzer;
+
+ public FlowAnalyzeStream(Stream inner, IFlowAnalyzer flowAnalyzer)
+ : base(inner)
+ {
+ this.flowAnalyzer = flowAnalyzer;
+ }
+
+ public override int Read(byte[] buffer, int offset, int count)
+ {
+ int read = base.Read(buffer, offset, count);
+ this.flowAnalyzer.OnFlow(FlowType.Read, read);
+ return read;
+ }
+
+ public override int Read(Span destination)
+ {
+ int read = base.Read(destination);
+ this.flowAnalyzer.OnFlow(FlowType.Read, read);
+ return read;
+ }
+
+ public override async Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
+ {
+ int read = await base.ReadAsync(buffer.AsMemory(offset, count), cancellationToken);
+ this.flowAnalyzer.OnFlow(FlowType.Read, read);
+ return read;
+ }
+
+ public override async ValueTask ReadAsync(Memory destination, CancellationToken cancellationToken = default)
+ {
+ int read = await base.ReadAsync(destination, cancellationToken);
+ this.flowAnalyzer.OnFlow(FlowType.Read, read);
+ return read;
+ }
+
+
+ public override void Write(byte[] buffer, int offset, int count)
+ {
+ this.flowAnalyzer.OnFlow(FlowType.Wirte, count);
+ base.Write(buffer, offset, count);
+ }
+
+ public override void Write(ReadOnlySpan source)
+ {
+ this.flowAnalyzer.OnFlow(FlowType.Wirte, source.Length);
+ base.Write(source);
+ }
+
+ public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
+ {
+ this.flowAnalyzer.OnFlow(FlowType.Wirte, count);
+ return base.WriteAsync(buffer, offset, count, cancellationToken);
+ }
+
+ public override ValueTask WriteAsync(ReadOnlyMemory source, CancellationToken cancellationToken = default)
+ {
+ this.flowAnalyzer.OnFlow(FlowType.Wirte, source.Length);
+ return base.WriteAsync(source, cancellationToken);
+ }
+}
\ No newline at end of file
diff --git a/src/Gateway/Middlewares/FlowAnalytics/FlowAnalyzer.cs b/src/Gateway/Middlewares/FlowAnalytics/FlowAnalyzer.cs
new file mode 100644
index 0000000..55636f4
--- /dev/null
+++ b/src/Gateway/Middlewares/FlowAnalytics/FlowAnalyzer.cs
@@ -0,0 +1,98 @@
+namespace Gateway.Middlewares.FlowAnalytics;
+
+public sealed class FlowAnalyzer : IFlowAnalyzer
+{
+ private const int INTERVAL_SECONDS = 5;
+ private readonly FlowQueues readQueues = new(INTERVAL_SECONDS);
+ private readonly FlowQueues writeQueues = new(INTERVAL_SECONDS);
+
+ ///
+ /// 收到数据
+ ///
+ ///
+ ///
+ public void OnFlow(FlowType flowType, int length)
+ {
+ if (flowType == FlowType.Read)
+ {
+ this.readQueues.OnFlow(length);
+ }
+ else
+ {
+ this.writeQueues.OnFlow(length);
+ }
+ }
+
+ ///
+ /// 获取流量分析
+ ///
+ ///
+ public FlowStatisticsDto GetFlowStatistics()
+ {
+ return new FlowStatisticsDto
+ {
+ TotalRead = this.readQueues.TotalBytes,
+ TotalWrite = this.writeQueues.TotalBytes,
+ ReadRate = this.readQueues.GetRate(),
+ WriteRate = this.writeQueues.GetRate()
+ };
+ }
+
+ private class FlowQueues
+ {
+ private int cleaning = 0;
+ private long totalBytes = 0L;
+ private record QueueItem(long Ticks, int Length);
+ private readonly ConcurrentQueue queues = new();
+
+ private readonly int intervalSeconds;
+
+ public long TotalBytes => this.totalBytes;
+
+ public FlowQueues(int intervalSeconds)
+ {
+ this.intervalSeconds = intervalSeconds;
+ }
+
+ public void OnFlow(int length)
+ {
+ Interlocked.Add(ref this.totalBytes, length);
+ this.CleanInvalidRecords();
+ this.queues.Enqueue(new QueueItem(Environment.TickCount64, length));
+ }
+
+ public double GetRate()
+ {
+ this.CleanInvalidRecords();
+ return (double)this.queues.Sum(item => item.Length) / this.intervalSeconds;
+ }
+
+ ///
+ /// 清除无效记录
+ ///
+ ///
+ private bool CleanInvalidRecords()
+ {
+ if (Interlocked.CompareExchange(ref this.cleaning, 1, 0) != 0)
+ {
+ return false;
+ }
+
+ var ticks = Environment.TickCount64;
+ while (this.queues.TryPeek(out var item))
+ {
+ if (ticks - item.Ticks < this.intervalSeconds * 1000)
+ {
+ break;
+ }
+ else
+ {
+ this.queues.TryDequeue(out _);
+ }
+ }
+
+ Interlocked.Exchange(ref this.cleaning, 0);
+ return true;
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/Gateway/Middlewares/FlowAnalytics/FlowType.cs b/src/Gateway/Middlewares/FlowAnalytics/FlowType.cs
new file mode 100644
index 0000000..218eb11
--- /dev/null
+++ b/src/Gateway/Middlewares/FlowAnalytics/FlowType.cs
@@ -0,0 +1,17 @@
+namespace Gateway.Middlewares.FlowAnalytics;
+
+///
+/// 流量类型
+///
+public enum FlowType
+{
+ ///
+ /// 读取
+ ///
+ Read,
+
+ ///
+ /// 写入
+ ///
+ Wirte
+}
\ No newline at end of file
diff --git a/src/Gateway/Middlewares/FlowAnalytics/IFlowAnalyzer.cs b/src/Gateway/Middlewares/FlowAnalytics/IFlowAnalyzer.cs
new file mode 100644
index 0000000..8f680d9
--- /dev/null
+++ b/src/Gateway/Middlewares/FlowAnalytics/IFlowAnalyzer.cs
@@ -0,0 +1,17 @@
+namespace Gateway.Middlewares.FlowAnalytics;
+
+public interface IFlowAnalyzer
+{
+ ///
+ /// 收到数据
+ ///
+ ///
+ ///
+ void OnFlow(FlowType flowType, int length);
+
+ ///
+ /// 获取速率
+ ///
+ ///
+ FlowStatisticsDto GetFlowStatistics();
+}
\ No newline at end of file
diff --git a/src/Gateway/Middlewares/FlowAnalytics/TaskToApm.cs b/src/Gateway/Middlewares/FlowAnalytics/TaskToApm.cs
new file mode 100644
index 0000000..9e4f7fa
--- /dev/null
+++ b/src/Gateway/Middlewares/FlowAnalytics/TaskToApm.cs
@@ -0,0 +1,100 @@
+namespace Gateway.Middlewares.FlowAnalytics;
+
+static class TaskToApm
+{
+ ///
+ /// Marshals the Task as an IAsyncResult, using the supplied callback and state
+ /// to implement the APM pattern.
+ ///
+ /// The Task to be marshaled.
+ /// The callback to be invoked upon completion.
+ /// The state to be stored in the IAsyncResult.
+ /// An IAsyncResult to represent the task's asynchronous operation.
+ public static IAsyncResult Begin(Task task, AsyncCallback? callback, object? state) =>
+ new TaskAsyncResult(task, state, callback);
+
+ /// Processes an IAsyncResult returned by Begin.
+ /// The IAsyncResult to unwrap.
+ public static void End(IAsyncResult asyncResult)
+ {
+ if (asyncResult is TaskAsyncResult twar)
+ {
+ twar._task.GetAwaiter().GetResult();
+ return;
+ }
+
+ throw new ArgumentNullException();
+ }
+
+ /// Processes an IAsyncResult returned by Begin.
+ /// The IAsyncResult to unwrap.
+ public static TResult End(IAsyncResult asyncResult)
+ {
+ if (asyncResult is TaskAsyncResult twar && twar._task is Task task)
+ {
+ return task.GetAwaiter().GetResult();
+ }
+
+ throw new ArgumentNullException();
+ }
+
+ /// Provides a simple IAsyncResult that wraps a Task.
+ ///
+ /// We could use the Task as the IAsyncResult if the Task's AsyncState is the same as the object state,
+ /// but that's very rare, in particular in a situation where someone cares about allocation, and always
+ /// using TaskAsyncResult simplifies things and enables additional optimizations.
+ ///
+ internal sealed class TaskAsyncResult : IAsyncResult
+ {
+ /// The wrapped Task.
+ internal readonly Task _task;
+ /// Callback to invoke when the wrapped task completes.
+ private readonly AsyncCallback? _callback;
+
+ /// Initializes the IAsyncResult with the Task to wrap and the associated object state.
+ /// The Task to wrap.
+ /// The new AsyncState value.
+ /// Callback to invoke when the wrapped task completes.
+ internal TaskAsyncResult(Task task, object? state, AsyncCallback? callback)
+ {
+ Debug.Assert(task != null);
+ _task = task;
+ AsyncState = state;
+
+ if (task.IsCompleted)
+ {
+ // Synchronous completion. Invoke the callback. No need to store it.
+ CompletedSynchronously = true;
+ callback?.Invoke(this);
+ }
+ else if (callback != null)
+ {
+ // Asynchronous completion, and we have a callback; schedule it. We use OnCompleted rather than ContinueWith in
+ // order to avoid running synchronously if the task has already completed by the time we get here but still run
+ // synchronously as part of the task's completion if the task completes after (the more common case).
+ _callback = callback;
+ _task.ConfigureAwait(continueOnCapturedContext: false)
+ .GetAwaiter()
+ .OnCompleted(InvokeCallback); // allocates a delegate, but avoids a closure
+ }
+ }
+
+ /// Invokes the callback.
+ private void InvokeCallback()
+ {
+ Debug.Assert(!CompletedSynchronously);
+ Debug.Assert(_callback != null);
+ _callback.Invoke(this);
+ }
+
+ /// Gets a user-defined object that qualifies or contains information about an asynchronous operation.
+ public object? AsyncState { get; }
+ /// Gets a value that indicates whether the asynchronous operation completed synchronously.
+ /// This is set lazily based on whether the has completed by the time this object is created.
+ public bool CompletedSynchronously { get; }
+ /// Gets a value that indicates whether the asynchronous operation has completed.
+ public bool IsCompleted => _task.IsCompleted;
+ /// Gets a that is used to wait for an asynchronous operation to complete.
+ public WaitHandle AsyncWaitHandle => ((IAsyncResult)_task).AsyncWaitHandle;
+ }
+}
\ No newline at end of file
diff --git a/src/Gateway/Middlewares/IKestrelMiddleware.cs b/src/Gateway/Middlewares/IKestrelMiddleware.cs
new file mode 100644
index 0000000..7db518b
--- /dev/null
+++ b/src/Gateway/Middlewares/IKestrelMiddleware.cs
@@ -0,0 +1,11 @@
+using Microsoft.AspNetCore.Connections;
+
+namespace Gateway.Middlewares;
+
+///
+/// Kestrel的中间件接口
+///
+public interface IKestrelMiddleware
+{
+ Task InvokeAsync(ConnectionDelegate next, ConnectionContext context);
+}
\ No newline at end of file
diff --git a/src/Gateway/Program.cs b/src/Gateway/Program.cs
index c50e208..65f0763 100644
--- a/src/Gateway/Program.cs
+++ b/src/Gateway/Program.cs
@@ -1,12 +1,14 @@
#region FreeSql类型转换
+using Gateway.Middlewares.FlowAnalytics;
+
Utils.TypeHandlers.TryAdd(typeof(Dictionary), new StringJsonHandler>());
Utils.TypeHandlers.TryAdd(typeof(List), new StringJsonHandler>());
Utils.TypeHandlers.TryAdd(typeof(string[]), new StringJsonHandler());
#endregion
-var builder = WebApplication.CreateBuilder(args);
+var builder = WebApplication.CreateSlimBuilder(args);
var directory = new DirectoryInfo("/data");
if (!directory.Exists)
@@ -61,6 +63,8 @@ builder.WebHost.UseKestrel(options =>
});
});
+builder.Services.AddSingleton();
+
builder.WebHost.ConfigureKestrel(kestrel =>
{
kestrel.Limits.MaxRequestBodySize = null;
@@ -69,9 +73,14 @@ builder.WebHost.ConfigureKestrel(kestrel =>
{
portOptions.Protocols = HttpProtocols.Http1AndHttp2AndHttp3;
portOptions.UseHttps();
+ portOptions.Use();
});
- kestrel.ListenAnyIP(8080, portOptions => { portOptions.Protocols = HttpProtocols.Http1AndHttp2; });
+ kestrel.ListenAnyIP(8080, portOptions =>
+ {
+ portOptions.Protocols = HttpProtocols.Http1AndHttp2;
+ portOptions.Use();
+ });
});
#region Jwt
@@ -124,6 +133,7 @@ builder.Services.AddSingleton();
builder.Services.AddSingleton();
builder.Services.AddSingleton();
builder.Services.AddSingleton();
+builder.Services.AddSingleton();
builder.Services.AddSingleton();
diff --git a/src/Gateway/Services/SystemService.cs b/src/Gateway/Services/SystemService.cs
index 830edf9..c4b5e03 100644
--- a/src/Gateway/Services/SystemService.cs
+++ b/src/Gateway/Services/SystemService.cs
@@ -1,6 +1,8 @@
-namespace Gateway.Services;
+using Gateway.Middlewares.FlowAnalytics;
-public class SystemService
+namespace Gateway.Services;
+
+public class SystemService(IFlowAnalyzer flowAnalyzer)
{
public static async Task StreamAsync(HttpContext context)
{
@@ -27,7 +29,7 @@ public class SystemService
initialBytesSent += interfaceStats.BytesSent;
initialBytesReceived += interfaceStats.BytesReceived;
}
-
+
// 等待1秒钟
await Task.Delay(1000, context.RequestAborted);
@@ -65,6 +67,11 @@ public class SystemService
}
}
}
+
+ public FlowStatisticsDto FlowStatistics()
+ {
+ return flowAnalyzer.GetFlowStatistics();
+ }
}
public static class SystemExtension
@@ -73,5 +80,8 @@ public static class SystemExtension
{
app.MapGet("/api/gateway/system", async context =>
await SystemService.StreamAsync(context));
+
+ app.MapGet("/api/gateway/system/flow-statistics",
+ (SystemService systemService) => systemService.FlowStatistics());
}
}
\ No newline at end of file
--
Gitee
From c9b4718c2b88f1fc0eb5b5d1d91de8a13b726c8d Mon Sep 17 00:00:00 2001
From: token <239573049@qq.com>
Date: Wed, 31 Jan 2024 19:09:48 +0800
Subject: [PATCH 2/2] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E6=B5=81=E9=87=8F?=
=?UTF-8?q?=E7=9B=91=E6=8E=A7=20=E5=9F=BA=E4=BA=8EKestrel=E5=AE=9E?=
=?UTF-8?q?=E7=8E=B0=E7=9B=91=E6=8E=A7=E5=87=BA=E5=85=A5=E5=8F=A3=E6=B5=81?=
=?UTF-8?q?=E9=87=8F?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
README.md | 40 +++++++++-
.../BackgroundServices/GatewayBackground.cs | 14 +++-
src/Gateway/Dto/NetWorkDto.cs | 33 ++++++++
src/Gateway/Entities/SystemLoggerEntity.cs | 10 ++-
.../FlowAnalytics/DelegatingDuplexPipe.cs | 10 +--
.../FlowAnalytics/FlowAnalyzeDuplexPipe.cs | 9 +--
.../FlowAnalytics/FlowAnalyzeMiddleware.cs | 9 +--
.../FlowAnalytics/FlowAnalyzeStream.cs | 26 +++---
.../Middlewares/FlowAnalytics/FlowAnalyzer.cs | 68 ++++++++--------
.../FlowAnalytics/IFlowAnalyzer.cs | 5 ++
src/Gateway/Program.cs | 17 +++-
src/Gateway/Services/SystemService.cs | 41 +++++++---
src/Gateway/TokenBucket.cs | 37 +++++++++
web/src/pages/home/index.tsx | 79 +++++++++++--------
14 files changed, 277 insertions(+), 121 deletions(-)
create mode 100644 src/Gateway/TokenBucket.cs
diff --git a/README.md b/README.md
index 1880e08..7f54968 100644
--- a/README.md
+++ b/README.md
@@ -10,6 +10,7 @@ Gateway提供了基本的管理服务,提供简单的登录授权,和实时
- [x] dashboard监控
- [x] 静态文件服务代理
- [x] 穿透隧道功能
+- [x] 出入口流量监控
- [ ] 动态插件管理
## 技术栈
@@ -29,7 +30,6 @@ Gateway提供了基本的管理服务,提供简单的登录授权,和实时
- semi 用于提供基础组件
- react-router-dom 用于路由管理
-
## 镜像执行指令
```bash
@@ -118,6 +118,7 @@ services:
`/app/certificates`:
- 这个是系统证书默认存放目录,如果映射了目录则需要提供自己的证书。
+
## 使用隧道
```yml
@@ -152,7 +153,42 @@ services:
增加`TUNNEL_PASSWORD`环境变量,默认为空不设置密码
-下载隧道客户端 https://gitee.com/hejiale010426/Gateway/releases 然后解压压缩包,打开appsettings.json文件修改Tunnel节点的Url,如果Gateway使用了TUNNEL_PASSWORD,那么你的URL应该是`https://localhost:8081/api/gateway/connect-h2?host=backend1.app&password=dd666666`,
+下载隧道客户端 然后解压压缩包,打开appsettings.json文件修改Tunnel节点的Url,如果Gateway使用了TUNNEL_PASSWORD,那么你的URL应该是`https://localhost:8081/api/gateway/connect-h2?host=backend1.app&password=dd666666`,
`host`是在集群中的集群端点的域名,这个域名就是定义到我们的隧道客户端的`host`的这个参数,请保证值的唯一性,当绑定集群的路由匹配成功以后则会访问图片定义的端点,如果并没有存在节点那么他会直接代理。
![输入图片说明](img/%E9%9B%86%E7%BE%A4-01.png.png)
+
+## 出入流量监控
+
+使用环境变量控制是否启用流量监控,使用环境变量`ENABLE_FLOW_MONITORING`设置我们是否启用流量监控,如果为空则默认启动流量监控,然后可以打开我们的控制面板查看流量监控的数据。
+
+```yml
+
+services:
+ gateway-api:
+ image: registry.cn-shenzhen.aliyuncs.com/tokengo/gateway-api
+ restart: always
+ container_name: gateway-api
+ environment:
+ USER: root
+ PASS: Aa010426.
+ HTTPS_PASSWORD: dd666666
+ HTTPS_FILE: gateway.pfx
+ ENABLE_FLOW_MONITORING: true
+ ports:
+ - 8200:8080
+ volumes:
+ - ./data:/data/
+ - ./app/certificates:/app/certificates
+
+ gateway-web:
+ image: registry.cn-shenzhen.aliyuncs.com/tokengo/gateway-web
+ restart: always
+ container_name: gateway-web
+ privileged: true
+ environment:
+ api_url: http://token-ai.cn:8200
+ ports:
+ - 10800:80
+
+```
diff --git a/src/Gateway/BackgroundServices/GatewayBackground.cs b/src/Gateway/BackgroundServices/GatewayBackground.cs
index 4db3a71..2d63e0a 100644
--- a/src/Gateway/BackgroundServices/GatewayBackground.cs
+++ b/src/Gateway/BackgroundServices/GatewayBackground.cs
@@ -1,8 +1,11 @@
-namespace Gateway.BackgroundServices;
+using Gateway.Middlewares.FlowAnalytics;
+
+namespace Gateway.BackgroundServices;
public class GatewayBackgroundService(
GatewayService gatewayService,
CertificateService certificateService,
+ IFlowAnalyzer flowAnalyzer,
IFreeSql freeSql) : BackgroundService
{
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
@@ -42,15 +45,20 @@ public class GatewayBackgroundService(
#endregion
+ var flow = flowAnalyzer.GetFlowStatistics();
+
var systemLoggerEntity = new SystemLoggerEntity
{
RequestCount = GatewayMiddleware.CurrentRequestCount,
ErrorRequestCount = GatewayMiddleware.CurrentErrorCount,
- CurrentTime = DateTime.Now.AddDays(-1)
+ CurrentTime = DateTime.Now.AddDays(-1),
+ ReadRate = flow.TotalRead,
+ WriteRate = flow.TotalWrite
};
-
+
// 清空请求计数器
GatewayMiddleware.ClearRequestCount();
+ flowAnalyzer.CleanRecords();
await freeSql.Insert(systemLoggerEntity).ExecuteAffrowsAsync();
}
diff --git a/src/Gateway/Dto/NetWorkDto.cs b/src/Gateway/Dto/NetWorkDto.cs
index a0fdc69..729fbe3 100644
--- a/src/Gateway/Dto/NetWorkDto.cs
+++ b/src/Gateway/Dto/NetWorkDto.cs
@@ -60,4 +60,37 @@ public class NetWorkDto(long received, long sent)
get => _totalErrorCount + CurrentErrorCount;
set => value = _totalErrorCount;
}
+
+
+ ///
+ /// 获取读取速率
+ ///
+ public double ReadRate { get; init; }
+
+ ///
+ /// 获取写入速率
+ ///
+ public double WriteRate { get; init; }
+
+ private double _totalRead;
+
+ ///
+ /// 获取总读上行
+ ///
+ public double TotalRead
+ {
+ get => (double)((decimal)_totalRead + (decimal)ReadRate);
+ init => _totalRead = value;
+ }
+
+ private double _totalWrite;
+
+ ///
+ /// 获取总下行
+ ///
+ public double TotalWrite
+ {
+ get => (double)((decimal)_totalWrite + (decimal)WriteRate);
+ init => _totalWrite = value;
+ }
}
\ No newline at end of file
diff --git a/src/Gateway/Entities/SystemLoggerEntity.cs b/src/Gateway/Entities/SystemLoggerEntity.cs
index 89ad2ec..7bef5fe 100644
--- a/src/Gateway/Entities/SystemLoggerEntity.cs
+++ b/src/Gateway/Entities/SystemLoggerEntity.cs
@@ -17,5 +17,13 @@ public sealed class SystemLoggerEntity : Entity
///
public DateTime CurrentTime { get; set; }
-
+ ///
+ /// 获取读取速率
+ ///
+ public double ReadRate { get; init; }
+
+ ///
+ /// 获取写入速率
+ ///
+ public double WriteRate { get; init; }
}
\ No newline at end of file
diff --git a/src/Gateway/Middlewares/FlowAnalytics/DelegatingDuplexPipe.cs b/src/Gateway/Middlewares/FlowAnalytics/DelegatingDuplexPipe.cs
index 632da93..a2cf5f8 100644
--- a/src/Gateway/Middlewares/FlowAnalytics/DelegatingDuplexPipe.cs
+++ b/src/Gateway/Middlewares/FlowAnalytics/DelegatingDuplexPipe.cs
@@ -8,8 +8,8 @@ namespace Gateway.Middlewares.FlowAnalytics;
///
public class DelegatingDuplexPipe : IDuplexPipe, IAsyncDisposable where TDelegatingStream : DelegatingStream
{
- private bool disposed;
- private readonly object syncRoot = new();
+ private bool _disposed;
+ private readonly object _syncRoot = new();
///
/// 输入对象
@@ -52,13 +52,13 @@ public class DelegatingDuplexPipe : IDuplexPipe, IAsyncDispos
///
public virtual async ValueTask DisposeAsync()
{
- lock (this.syncRoot)
+ lock (this._syncRoot)
{
- if (this.disposed == true)
+ if (this._disposed == true)
{
return;
}
- this.disposed = true;
+ this._disposed = true;
}
await this.Input.CompleteAsync();
diff --git a/src/Gateway/Middlewares/FlowAnalytics/FlowAnalyzeDuplexPipe.cs b/src/Gateway/Middlewares/FlowAnalytics/FlowAnalyzeDuplexPipe.cs
index 0597ebc..92de095 100644
--- a/src/Gateway/Middlewares/FlowAnalytics/FlowAnalyzeDuplexPipe.cs
+++ b/src/Gateway/Middlewares/FlowAnalytics/FlowAnalyzeDuplexPipe.cs
@@ -2,10 +2,5 @@ using System.IO.Pipelines;
namespace Gateway.Middlewares.FlowAnalytics;
-sealed class FlowAnalyzeDuplexPipe : DelegatingDuplexPipe
-{
- public FlowAnalyzeDuplexPipe(IDuplexPipe duplexPipe, IFlowAnalyzer flowAnalyzer) :
- base(duplexPipe, stream => new FlowAnalyzeStream(stream, flowAnalyzer))
- {
- }
-}
\ No newline at end of file
+sealed class FlowAnalyzeDuplexPipe(IDuplexPipe duplexPipe, IFlowAnalyzer flowAnalyzer)
+ : DelegatingDuplexPipe(duplexPipe, stream => new FlowAnalyzeStream(stream, flowAnalyzer));
\ No newline at end of file
diff --git a/src/Gateway/Middlewares/FlowAnalytics/FlowAnalyzeMiddleware.cs b/src/Gateway/Middlewares/FlowAnalytics/FlowAnalyzeMiddleware.cs
index 8cf53b2..60c0152 100644
--- a/src/Gateway/Middlewares/FlowAnalytics/FlowAnalyzeMiddleware.cs
+++ b/src/Gateway/Middlewares/FlowAnalytics/FlowAnalyzeMiddleware.cs
@@ -2,15 +2,8 @@
namespace Gateway.Middlewares.FlowAnalytics;
-public class FlowAnalyzeMiddleware : IKestrelMiddleware
+public sealed class FlowAnalyzeMiddleware(IFlowAnalyzer flowAnalyzer) : IKestrelMiddleware
{
- private readonly IFlowAnalyzer flowAnalyzer;
-
- public FlowAnalyzeMiddleware(IFlowAnalyzer flowAnalyzer)
- {
- this.flowAnalyzer = flowAnalyzer;
- }
-
public async Task InvokeAsync(ConnectionDelegate next, ConnectionContext context)
{
var oldTransport = context.Transport;
diff --git a/src/Gateway/Middlewares/FlowAnalytics/FlowAnalyzeStream.cs b/src/Gateway/Middlewares/FlowAnalytics/FlowAnalyzeStream.cs
index 1d3f32d..1ac61a7 100644
--- a/src/Gateway/Middlewares/FlowAnalytics/FlowAnalyzeStream.cs
+++ b/src/Gateway/Middlewares/FlowAnalytics/FlowAnalyzeStream.cs
@@ -1,65 +1,57 @@
namespace Gateway.Middlewares.FlowAnalytics;
-sealed class FlowAnalyzeStream : DelegatingStream
+sealed class FlowAnalyzeStream(Stream inner, IFlowAnalyzer flowAnalyzer) : DelegatingStream(inner)
{
- private readonly IFlowAnalyzer flowAnalyzer;
-
- public FlowAnalyzeStream(Stream inner, IFlowAnalyzer flowAnalyzer)
- : base(inner)
- {
- this.flowAnalyzer = flowAnalyzer;
- }
-
public override int Read(byte[] buffer, int offset, int count)
{
int read = base.Read(buffer, offset, count);
- this.flowAnalyzer.OnFlow(FlowType.Read, read);
+ flowAnalyzer.OnFlow(FlowType.Read, read);
return read;
}
public override int Read(Span destination)
{
int read = base.Read(destination);
- this.flowAnalyzer.OnFlow(FlowType.Read, read);
+ flowAnalyzer.OnFlow(FlowType.Read, read);
return read;
}
public override async Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
int read = await base.ReadAsync(buffer.AsMemory(offset, count), cancellationToken);
- this.flowAnalyzer.OnFlow(FlowType.Read, read);
+ flowAnalyzer.OnFlow(FlowType.Read, read);
return read;
}
public override async ValueTask ReadAsync(Memory destination, CancellationToken cancellationToken = default)
{
int read = await base.ReadAsync(destination, cancellationToken);
- this.flowAnalyzer.OnFlow(FlowType.Read, read);
+ flowAnalyzer.OnFlow(FlowType.Read, read);
return read;
}
public override void Write(byte[] buffer, int offset, int count)
{
- this.flowAnalyzer.OnFlow(FlowType.Wirte, count);
+ flowAnalyzer.OnFlow(FlowType.Wirte, count);
base.Write(buffer, offset, count);
}
public override void Write(ReadOnlySpan source)
{
- this.flowAnalyzer.OnFlow(FlowType.Wirte, source.Length);
+ flowAnalyzer.OnFlow(FlowType.Wirte, source.Length);
base.Write(source);
}
public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
- this.flowAnalyzer.OnFlow(FlowType.Wirte, count);
+ flowAnalyzer.OnFlow(FlowType.Wirte, count);
return base.WriteAsync(buffer, offset, count, cancellationToken);
}
public override ValueTask WriteAsync(ReadOnlyMemory source, CancellationToken cancellationToken = default)
{
- this.flowAnalyzer.OnFlow(FlowType.Wirte, source.Length);
+ flowAnalyzer.OnFlow(FlowType.Wirte, source.Length);
return base.WriteAsync(source, cancellationToken);
}
}
\ No newline at end of file
diff --git a/src/Gateway/Middlewares/FlowAnalytics/FlowAnalyzer.cs b/src/Gateway/Middlewares/FlowAnalytics/FlowAnalyzer.cs
index 55636f4..b223d9c 100644
--- a/src/Gateway/Middlewares/FlowAnalytics/FlowAnalyzer.cs
+++ b/src/Gateway/Middlewares/FlowAnalytics/FlowAnalyzer.cs
@@ -3,8 +3,8 @@ namespace Gateway.Middlewares.FlowAnalytics;
public sealed class FlowAnalyzer : IFlowAnalyzer
{
private const int INTERVAL_SECONDS = 5;
- private readonly FlowQueues readQueues = new(INTERVAL_SECONDS);
- private readonly FlowQueues writeQueues = new(INTERVAL_SECONDS);
+ private readonly FlowQueues _readQueues = new(INTERVAL_SECONDS);
+ private readonly FlowQueues _writeQueues = new(INTERVAL_SECONDS);
///
/// 收到数据
@@ -15,14 +15,20 @@ public sealed class FlowAnalyzer : IFlowAnalyzer
{
if (flowType == FlowType.Read)
{
- this.readQueues.OnFlow(length);
+ _readQueues.OnFlow(length);
}
else
{
- this.writeQueues.OnFlow(length);
+ _writeQueues.OnFlow(length);
}
}
+ public void CleanRecords()
+ {
+ _readQueues.CleanRecords();
+ _writeQueues.CleanRecords();
+ }
+
///
/// 获取流量分析
///
@@ -31,40 +37,35 @@ public sealed class FlowAnalyzer : IFlowAnalyzer
{
return new FlowStatisticsDto
{
- TotalRead = this.readQueues.TotalBytes,
- TotalWrite = this.writeQueues.TotalBytes,
- ReadRate = this.readQueues.GetRate(),
- WriteRate = this.writeQueues.GetRate()
+ TotalRead = _readQueues.TotalBytes,
+ TotalWrite = _writeQueues.TotalBytes,
+ ReadRate = _readQueues.GetRate(),
+ WriteRate = _writeQueues.GetRate()
};
}
- private class FlowQueues
+ private class FlowQueues(int intervalSeconds)
{
- private int cleaning = 0;
- private long totalBytes = 0L;
- private record QueueItem(long Ticks, int Length);
- private readonly ConcurrentQueue queues = new();
+ private int _cleaning = 0;
+ private long _totalBytes = 0L;
- private readonly int intervalSeconds;
+ private record QueueItem(long Ticks, int Length);
- public long TotalBytes => this.totalBytes;
+ private readonly ConcurrentQueue _queues = new();
- public FlowQueues(int intervalSeconds)
- {
- this.intervalSeconds = intervalSeconds;
- }
+ public long TotalBytes => this._totalBytes;
public void OnFlow(int length)
{
- Interlocked.Add(ref this.totalBytes, length);
+ Interlocked.Add(ref this._totalBytes, length);
this.CleanInvalidRecords();
- this.queues.Enqueue(new QueueItem(Environment.TickCount64, length));
+ this._queues.Enqueue(new QueueItem(Environment.TickCount64, length));
}
public double GetRate()
{
- this.CleanInvalidRecords();
- return (double)this.queues.Sum(item => item.Length) / this.intervalSeconds;
+ CleanInvalidRecords();
+ return (double)_queues.Sum(item => item.Length) / intervalSeconds;
}
///
@@ -73,26 +74,31 @@ public sealed class FlowAnalyzer : IFlowAnalyzer
///
private bool CleanInvalidRecords()
{
- if (Interlocked.CompareExchange(ref this.cleaning, 1, 0) != 0)
+ if (Interlocked.CompareExchange(ref this._cleaning, 1, 0) != 0)
{
return false;
}
var ticks = Environment.TickCount64;
- while (this.queues.TryPeek(out var item))
+ while (_queues.TryPeek(out var item))
{
- if (ticks - item.Ticks < this.intervalSeconds * 1000)
+ if (ticks - item.Ticks < intervalSeconds * 1000)
{
break;
}
- else
- {
- this.queues.TryDequeue(out _);
- }
+
+ _queues.TryDequeue(out _);
}
- Interlocked.Exchange(ref this.cleaning, 0);
+ Interlocked.Exchange(ref _cleaning, 0);
return true;
}
+
+
+ public void CleanRecords()
+ {
+ _queues.Clear();
+ Interlocked.Exchange(ref _cleaning, 0);
+ }
}
}
\ No newline at end of file
diff --git a/src/Gateway/Middlewares/FlowAnalytics/IFlowAnalyzer.cs b/src/Gateway/Middlewares/FlowAnalytics/IFlowAnalyzer.cs
index 8f680d9..90d83b2 100644
--- a/src/Gateway/Middlewares/FlowAnalytics/IFlowAnalyzer.cs
+++ b/src/Gateway/Middlewares/FlowAnalytics/IFlowAnalyzer.cs
@@ -14,4 +14,9 @@ public interface IFlowAnalyzer
///
///
FlowStatisticsDto GetFlowStatistics();
+
+ ///
+ /// 清楚记录
+ ///
+ void CleanRecords();
}
\ No newline at end of file
diff --git a/src/Gateway/Program.cs b/src/Gateway/Program.cs
index 8e9d27f..cae5217 100644
--- a/src/Gateway/Program.cs
+++ b/src/Gateway/Program.cs
@@ -1,5 +1,7 @@
#region FreeSql类型转换
+using Gateway.Middlewares.FlowAnalytics;
+
Utils.TypeHandlers.TryAdd(typeof(Dictionary), new StringJsonHandler>());
Utils.TypeHandlers.TryAdd(typeof(List), new StringJsonHandler>());
Utils.TypeHandlers.TryAdd(typeof(string[]), new StringJsonHandler());
@@ -30,6 +32,8 @@ builder.Configuration.GetSection(GatewayOptions.Name)
// 获取环境变量
var https_password = Environment.GetEnvironmentVariable("HTTPS_PASSWORD") ?? "dd666666";
var https_file = Environment.GetEnvironmentVariable("HTTPS_FILE") ?? "gateway.pfx";
+var enable_flow_monitoring = Environment.GetEnvironmentVariable("ENABLE_FLOW_MONITORING") ?? "true";
+
builder.WebHost.UseKestrel(options =>
{
@@ -69,12 +73,21 @@ builder.WebHost.ConfigureKestrel(kestrel =>
{
portOptions.Protocols = HttpProtocols.Http1AndHttp2AndHttp3;
portOptions.UseHttps();
+
+ if (enable_flow_monitoring == "true")
+ {
+ portOptions.Use();
+ }
});
kestrel.ListenAnyIP(8080, portOptions =>
{
portOptions.Protocols = HttpProtocols.Http1AndHttp2;
- portOptions.Use();
+
+ if (enable_flow_monitoring == "true")
+ {
+ portOptions.Use();
+ }
});
});
@@ -140,6 +153,8 @@ builder.Services.AddReverseProxy()
builder.Services.AddTunnelServices();
+builder.Services.AddSingleton();
+
var app = builder.Build();
app.UseCors("AllowAll");
diff --git a/src/Gateway/Services/SystemService.cs b/src/Gateway/Services/SystemService.cs
index dbbde70..1850c1d 100644
--- a/src/Gateway/Services/SystemService.cs
+++ b/src/Gateway/Services/SystemService.cs
@@ -1,14 +1,24 @@
-namespace Gateway.Services;
+using Gateway.Middlewares.FlowAnalytics;
-public class SystemService
+namespace Gateway.Services;
+
+///
+/// 系统服务
+///
+public static class SystemService
{
- public static async Task StreamAsync(HttpContext context)
+ public static async Task StreamAsync(HttpContext context, IFreeSql sql, IFlowAnalyzer flowAnalyzer)
{
// 使用sse,返回响应头
context.Response.Headers.ContentType = "text/event-stream";
var i = 0;
+ var totalErrorCount = (double)await sql.Select().SumAsync(x => x.ErrorRequestCount);
+ var totalRequestCount = (double)await sql.Select().SumAsync(x => x.RequestCount);
+ var totalRead = (double)await sql.Select().SumAsync(x => x.ReadRate);
+ var totalWrite = (double)await sql.Select().SumAsync(x => x.WriteRate);
+
while (!context.RequestAborted.IsCancellationRequested)
{
// 获取所有网络接口
@@ -27,7 +37,7 @@ public class SystemService
initialBytesSent += interfaceStats.BytesSent;
initialBytesReceived += interfaceStats.BytesReceived;
}
-
+
// 等待1秒钟
await Task.Delay(1000, context.RequestAborted);
@@ -49,8 +59,18 @@ public class SystemService
var totalBytesSentIn1Sec = bytesSentAfter1Sec - initialBytesSent;
var totalBytesReceivedIn1Sec = bytesReceivedAfter1Sec - initialBytesReceived;
+ var flowStatisticsDto = flowAnalyzer.GetFlowStatistics();
+
var data =
- $"data:{JsonSerializer.Serialize(new NetWorkDto(totalBytesReceivedIn1Sec, totalBytesSentIn1Sec))}\n\n";
+ $"data:{JsonSerializer.Serialize(new NetWorkDto(totalBytesReceivedIn1Sec, totalBytesSentIn1Sec)
+ {
+ TotalErrorCount = totalErrorCount,
+ TotalRequestCount = totalRequestCount,
+ ReadRate = flowStatisticsDto.TotalRead,
+ WriteRate = flowStatisticsDto.TotalWrite,
+ TotalWrite = totalRead,
+ TotalRead = totalWrite
+ })}\n\n";
// 将数据写入到响应流中
await context.Response.WriteAsync(data, context.RequestAborted);
@@ -58,8 +78,8 @@ public class SystemService
i++;
- // 只维持5秒的连接
- if (i > 5)
+ // 只维持10秒的连接
+ if (i > 10)
{
break;
}
@@ -71,10 +91,7 @@ public static class SystemExtension
{
public static void MapSystem(this IEndpointRouteBuilder app)
{
- app.MapGet("/api/gateway/system", async context =>
- await SystemService.StreamAsync(context));
-
- app.MapGet("/api/gateway/system/flow-statistics",
- (SystemService systemService) => systemService.FlowStatistics());
+ app.MapGet("/api/gateway/system", async (HttpContext context, IFreeSql sql, IFlowAnalyzer flowAnalyzer) =>
+ await SystemService.StreamAsync(context, sql, flowAnalyzer));
}
}
\ No newline at end of file
diff --git a/src/Gateway/TokenBucket.cs b/src/Gateway/TokenBucket.cs
new file mode 100644
index 0000000..ec015af
--- /dev/null
+++ b/src/Gateway/TokenBucket.cs
@@ -0,0 +1,37 @@
+namespace Gateway;
+
+///
+/// 令牌桶,用于限制流量。
+///
+public struct TokenBucket(int maxTokens)
+{
+ private readonly int _maxTokens = maxTokens;
+ private int _tokens = maxTokens;
+ private DateTime _lastCheck = DateTime.UtcNow;
+
+ private bool GetTokens(int count)
+ {
+ var now = DateTime.UtcNow;
+ if ((now - _lastCheck).TotalSeconds > 1)
+ {
+ _tokens = _maxTokens;
+ _lastCheck = now;
+ }
+
+ if (_tokens < count) return false;
+
+ _tokens -= count;
+
+ return true;
+ }
+
+ public async ValueTask WaitForTokens(int count)
+ {
+ while (!GetTokens(count))
+ {
+ var now = DateTime.UtcNow;
+ var timeToNextToken = (now - _lastCheck).Microseconds;
+ await Task.Delay(timeToNextToken);
+ }
+ }
+}
\ No newline at end of file
diff --git a/web/src/pages/home/index.tsx b/web/src/pages/home/index.tsx
index 88de3ba..4024e69 100644
--- a/web/src/pages/home/index.tsx
+++ b/web/src/pages/home/index.tsx
@@ -8,6 +8,10 @@ export default function Home() {
const [totalErrorCount, setTotalErrorCount] = useState(0);
const [currentRequestCount, setCurrentRequestCount] = useState(0);
const [currentErrorCount, setCurrentErrorCount] = useState(0);
+ const [readRate, setReadRate] = useState("");
+ const [writeRate, setWriteRate] = useState("");
+ const [totalRead, setTotalRead] = useState("");
+ const [totalWrite, setTotalWrite] = useState("");
useEffect(() => {
var chartDom = document.getElementById('network')!;
@@ -58,23 +62,8 @@ export default function Home() {
formatter: function (params: any) {
// 将字节转换KB MB GB
let value = params.value;
- let unit = 'B';
- if (value > 1024) {
- value = value / 1024;
- unit = 'KB';
- }
-
- if (value > 1024) {
- value = value / 1024;
- unit = 'MB';
- }
-
- if (value > 1024) {
- value = value / 1024;
- unit = 'GB';
- }
-
- return value.toFixed(2) + unit;
+
+ return formatBytes(value, 2);
}
}
},
@@ -92,23 +81,8 @@ export default function Home() {
formatter: function (params: any) {
// 将字节转换KB MB GB
let value = params.value;
- let unit = 'B';
- if (value > 1024) {
- value = value / 1024;
- unit = 'KB';
- }
-
- if (value > 1024) {
- value = value / 1024;
- unit = 'MB';
- }
-
- if (value > 1024) {
- value = value / 1024;
- unit = 'GB';
- }
-
- return value.toFixed(2) + unit;
+
+ return formatBytes(value, 2);
}
}
}
@@ -135,6 +109,12 @@ export default function Home() {
setTotalErrorCount(chunk.TotalErrorCount);
setTotalRequestCount(chunk.TotalRequestCount);
+ setReadRate(formatBytes(chunk.ReadRate));
+ setWriteRate(formatBytes(chunk.WriteRate));
+ setTotalRead(formatBytes(chunk.TotalRead));
+ setTotalWrite(formatBytes(chunk.TotalWrite));
+
+
option && myChart.setOption(option);
myChart.resize();
@@ -156,6 +136,15 @@ export default function Home() {
};
}, []); // 空依赖数组意味着这个effect只会在组件挂载时执行一次
+ function formatBytes(bytes: number, decimals = 2) {
+ if (bytes === 0) return '0 Bytes';
+ const k = 1024;
+ const dm = decimals < 0 ? 0 : decimals;
+ const sizes = ['Bytes', 'KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB', 'YB'];
+ const i = Math.floor(Math.log(bytes) / Math.log(k));
+ return parseFloat((bytes / Math.pow(k, i)).toFixed(dm)) + ' ' + sizes[i];
+ }
+
return (
+
+
+
+ 入口流量总计:{totalRead}
+
+
+
+
+ 出口流量总计:{totalWrite}
+
+
+
+
+ 当天入口流量:{readRate}
+
+
+
+
+ 当天出口流量:{writeRate}
+
+
+