반응형
System.IO.Pipelines 는 .NET에서 고성능 I/O 를 더 쉽게 수행할 수 있도록 설계된 라이브러리이다.
Kestrel 을 업계에서 가장 빠른 웹 서버 중 하나로 만들기 위해 수행한 작업에서 탄생했다.
웹 소켓 라이브러리인 SignalR 에도 포함이 되어있고 네트워크단에서 처리를 하는 SDK에는 포함이 대부분 되어 있는 것 같다.
해당 벤치마크에서 확인이 가능하다.
https://www.techempower.com/benchmarks/#section=data-r21&hw=ph&test=plaintext
TechEmpower Framework Benchmarks
www.techempower.com
소켓 버퍼로부터 유저 어플리케이션단으로 데이터를 가져오기 위해 해당 버퍼 만큼의 메모리를 할당하고 값을 복사하는 작업 등이 필요하고 각 개발자마다 이 부분을 속도나 메모리나 GC 등 단점 등을 보완하기 위해 각자의 방법으로 처리합니다.
아래는 패킷의 완성이 \r\n 으로 데이터의 끝을 알리는 MS형님의 예제이다.
- \r\n을 찾을 때까지 들어오는 데이터를 버퍼링 해야 합니다.
- 버퍼에 반환된 모든 줄 구문을 분석해야 한다.
- 보낸 사이즈보다 버퍼가 작아 데이터가 덜 들어오는 경우엔 해당 데이터를 저장을 해야 하고 다음 데이터를 이어 받아야 한다.
async Task ProcessLinesAsync(NetworkStream stream)
{
var buffer = new byte[1024];
var bytesBuffered = 0;
var bytesConsumed = 0;
while (true)
{
var bytesRead = await stream.ReadAsync(buffer, bytesBuffered, buffer.Length - bytesBuffered);
if (bytesRead == 0)
{
// EOF
break;
}
// Keep track of the amount of buffered bytes
bytesBuffered += bytesRead;
var linePosition = -1;
do
{
// Look for a EOL in the buffered data
linePosition = Array.IndexOf(buffer, (byte)'\n', bytesConsumed, bytesBuffered - bytesConsumed);
if (linePosition >= 0)
{
// Calculate the length of the line based on the offset
var lineLength = linePosition - bytesConsumed;
// Process the line
ProcessLine(buffer, bytesConsumed, lineLength);
// Move the bytesConsumed to skip past the line we consumed (including \n)
bytesConsumed += lineLength + 1;
}
}
while (linePosition >= 0);
}
}
- 버퍼 풀을 이용해서 1024만큼의 버퍼 풀링을 시작했으나 1024 사이즈보다 작은 데이터들이 들어오는 경우 메모리를 더 많이 사용한다.
async Task ProcessLinesAsync(NetworkStream stream)
{
byte[] buffer = ArrayPool<byte>.Shared.Rent(1024);
var bytesBuffered = 0;
var bytesConsumed = 0;
while (true)
{
// Calculate the amount of bytes remaining in the buffer
var bytesRemaining = buffer.Length - bytesBuffered;
if (bytesRemaining == 0)
{
// Double the buffer size and copy the previously buffered data into the new buffer
var newBuffer = ArrayPool<byte>.Shared.Rent(buffer.Length * 2);
Buffer.BlockCopy(buffer, 0, newBuffer, 0, buffer.Length);
// Return the old buffer to the pool
ArrayPool<byte>.Shared.Return(buffer);
buffer = newBuffer;
bytesRemaining = buffer.Length - bytesBuffered;
}
var bytesRead = await stream.ReadAsync(buffer, bytesBuffered, bytesRemaining);
if (bytesRead == 0)
{
// EOF
break;
}
// Keep track of the amount of buffered bytes
bytesBuffered += bytesRead;
do
{
// Look for a EOL in the buffered data
linePosition = Array.IndexOf(buffer, (byte)'\n', bytesConsumed, bytesBuffered - bytesConsumed);
if (linePosition >= 0)
{
// Calculate the length of the line based on the offset
var lineLength = linePosition - bytesConsumed;
// Process the line
ProcessLine(buffer, bytesConsumed, lineLength);
// Move the bytesConsumed to skip past the line we consumed (including \n)
bytesConsumed += lineLength + 1;
}
}
while (linePosition >= 0);
}
}
- 메모리 낭비를 막기 위해 512보다 작은 경우엔 새로 메모리를 할당한다.
- 처리량을 늘리기 위해 소켓에 들어온 데이터를 읽는 것과 유저 어플리케이션에 저장된 버퍼를 처리하는 로직을 분산해서 동시에 처리한다.
- 기타 등등 최적화를 위한 로직이 들어간다면 코드가 복잡해진다.
System.IO.Pipelines가 있는 TCP 서버
- new Pipie로 시작을 한다. 이 객체는 내부적으로 LOH 힙에 메모리를 만들며 쓰레드로부터 보호되는 객체를 생성한다.
- https://github.com/dotnet/runtime/blob/57bfe474518ab5b7cfe6bf7424a79ce3af9d6657/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/BufferSegmentStack.cs
- 아래 코드는 소켓의 스트림을 읽어 파이프 안의 메모리에 저장이 되고 그 파이프를 처리하는 로직이 존재하기에 Task가 2개가 생성된 것이다. 동시 처리가 필요없다면 하나만으로도 처리가 가능하다.
async Task ProcessLinesAsync(Socket socket)
{
var pipe = new Pipe();
Task writing = FillPipeAsync(socket, pipe.Writer);
Task reading = ReadPipeAsync(pipe.Reader);
return Task.WhenAll(reading, writing);
}
async Task FillPipeAsync(Socket socket, PipeWriter writer)
{
const int minimumBufferSize = 512;
while (true)
{
// Allocate at least 512 bytes from the PipeWriter
Memory<byte> memory = writer.GetMemory(minimumBufferSize);
try
{
int bytesRead = await socket.ReceiveAsync(memory, SocketFlags.None);
if (bytesRead == 0)
{
break;
}
// Tell the PipeWriter how much was read from the Socket
writer.Advance(bytesRead);
}
catch (Exception ex)
{
LogError(ex);
break;
}
// Make the data available to the PipeReader
FlushResult result = await writer.FlushAsync();
if (result.IsCompleted)
{
break;
}
}
// Tell the PipeReader that there's no more data coming
writer.Complete();
}
async Task ReadPipeAsync(PipeReader reader)
{
while (true)
{
ReadResult result = await reader.ReadAsync();
ReadOnlySequence<byte> buffer = result.Buffer;
SequencePosition? position = null;
do
{
// Look for a EOL in the buffer
position = buffer.PositionOf((byte)'\n');
if (position != null)
{
// Process the line
ProcessLine(buffer.Slice(0, position.Value));
// Skip the line + the \n character (basically position)
buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
}
}
while (position != null);
// Tell the PipeReader how much of the buffer we have consumed
reader.AdvanceTo(buffer.Start, buffer.End);
// Stop reading if there's no more data coming
if (result.IsCompleted)
{
break;
}
}
// Mark the PipeReader as complete
reader.Complete();
}
- https://github.com/dotnet/runtime/blob/57bfe474518ab5b7cfe6bf7424a79ce3af9d6657/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/BufferSegmentStack.cs
- https://github.com/dotnet/runtime/blob/main/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs
- 읽기용 데이터의 시작 인덱스 _readHeadIndex 와 읽기용 데이터의 끝 _readTailIndex
- 쓰기용 데이터의 시작 인덱스 _writingHead 와 바이트의 수 _writingHeadBytesBuffered
- 이 인덱스들이 내부에서 왔다갔다 하면서 카피 대신에 해당하는 버퍼의 index들의 데이터들을 스택에 올려 반환하면서 처리를 하게 된다.
- 스택을 사용함으로써 GC가 돌지 않게 만들 수 있다.
References
https://github.com/davidfowl/TcpEcho
반응형
'개발관련 > C#' 카테고리의 다른 글
Dictionary 와 HashTable, SortedDictionary 차이 (0) | 2024.08.30 |
---|---|
리플렉션(Reflection)과 표현식 트리(Expression Tree) (0) | 2023.10.15 |
Thread Synchronization spinlock vs lock performance (0) | 2022.12.20 |
.net core 패킷 데이터가 30K 바이트보다 큰 경우 디스크에 저장 (0) | 2022.04.06 |
소켓 비정상 종료 처리 TcpKeepAlive (0) | 2018.09.13 |