반응형

System.IO.Pipelines 는 .NET에서 고성능 I/O 를 더 쉽게 수행할 수 있도록 설계된 라이브러리이다.

Kestrel 을 업계에서 가장 빠른 웹 서버 중 하나로 만들기 위해 수행한 작업에서 탄생했다.

웹 소켓 라이브러리인 SignalR 에도 포함이 되어있고 네트워크단에서 처리를 하는 SDK에는 포함이 대부분 되어 있는 것 같다.

해당 벤치마크에서 확인이 가능하다.

 

https://www.techempower.com/benchmarks/#section=data-r21&hw=ph&test=plaintext

 

TechEmpower Framework Benchmarks

 

www.techempower.com

 

소켓 버퍼로부터 유저 어플리케이션단으로 데이터를 가져오기 위해 해당 버퍼 만큼의 메모리를 할당하고 값을 복사하는 작업 등이 필요하고 각 개발자마다 이 부분을 속도나 메모리나 GC 등 단점 등을 보완하기 위해 각자의 방법으로 처리합니다.

 

아래는 패킷의 완성이 \r\n 으로 데이터의 끝을 알리는 MS형님의 예제이다.

  • \r\n을 찾을 때까지 들어오는 데이터를 버퍼링 해야 합니다.
  • 버퍼에 반환된 모든 줄 구문을 분석해야 한다.
  • 보낸 사이즈보다 버퍼가 작아 데이터가 덜 들어오는 경우엔 해당 데이터를 저장을 해야 하고 다음 데이터를 이어 받아야 한다.
async Task ProcessLinesAsync(NetworkStream stream)
{
    var buffer = new byte[1024];
    var bytesBuffered = 0;
    var bytesConsumed = 0;
 
    while (true)
    {
        var bytesRead = await stream.ReadAsync(buffer, bytesBuffered, buffer.Length - bytesBuffered);
        if (bytesRead == 0)
        {
            // EOF
            break;
        }
        // Keep track of the amount of buffered bytes
        bytesBuffered += bytesRead;
         
        var linePosition = -1;
 
        do
        {
            // Look for a EOL in the buffered data
            linePosition = Array.IndexOf(buffer, (byte)'\n', bytesConsumed, bytesBuffered - bytesConsumed);
 
            if (linePosition >= 0)
            {
                // Calculate the length of the line based on the offset
                var lineLength = linePosition - bytesConsumed;
 
                // Process the line
                ProcessLine(buffer, bytesConsumed, lineLength);
 
                // Move the bytesConsumed to skip past the line we consumed (including \n)
                bytesConsumed += lineLength + 1;
            }
        }
        while (linePosition >= 0);
    }
}
  • 버퍼 풀을 이용해서 1024만큼의 버퍼 풀링을 시작했으나 1024 사이즈보다 작은 데이터들이 들어오는 경우 메모리를 더 많이 사용한다.
async Task ProcessLinesAsync(NetworkStream stream)
{
    byte[] buffer = ArrayPool<byte>.Shared.Rent(1024);
    var bytesBuffered = 0;
    var bytesConsumed = 0;
 
    while (true)
    {
        // Calculate the amount of bytes remaining in the buffer
        var bytesRemaining = buffer.Length - bytesBuffered;
 
        if (bytesRemaining == 0)
        {
            // Double the buffer size and copy the previously buffered data into the new buffer
            var newBuffer = ArrayPool<byte>.Shared.Rent(buffer.Length * 2);
            Buffer.BlockCopy(buffer, 0, newBuffer, 0, buffer.Length);
            // Return the old buffer to the pool
            ArrayPool<byte>.Shared.Return(buffer);
            buffer = newBuffer;
            bytesRemaining = buffer.Length - bytesBuffered;
        }
 
        var bytesRead = await stream.ReadAsync(buffer, bytesBuffered, bytesRemaining);
        if (bytesRead == 0)
        {
            // EOF
            break;
        }
         
        // Keep track of the amount of buffered bytes
        bytesBuffered += bytesRead;
         
        do
        {
            // Look for a EOL in the buffered data
            linePosition = Array.IndexOf(buffer, (byte)'\n', bytesConsumed, bytesBuffered - bytesConsumed);
 
            if (linePosition >= 0)
            {
                // Calculate the length of the line based on the offset
                var lineLength = linePosition - bytesConsumed;
 
                // Process the line
                ProcessLine(buffer, bytesConsumed, lineLength);
 
                // Move the bytesConsumed to skip past the line we consumed (including \n)
                bytesConsumed += lineLength + 1;
            }
        }
        while (linePosition >= 0);
    }
}
  • 메모리 낭비를 막기 위해 512보다 작은 경우엔 새로 메모리를 할당한다.
  • 처리량을 늘리기 위해 소켓에 들어온 데이터를 읽는 것과 유저 어플리케이션에 저장된 버퍼를 처리하는 로직을 분산해서 동시에 처리한다.
  • 기타 등등 최적화를 위한 로직이 들어간다면 코드가 복잡해진다.

System.IO.Pipelines가 있는 TCP 서버

async Task ProcessLinesAsync(Socket socket)
{
    var pipe = new Pipe();
    Task writing = FillPipeAsync(socket, pipe.Writer);
    Task reading = ReadPipeAsync(pipe.Reader);
 
    return Task.WhenAll(reading, writing);
}
 
async Task FillPipeAsync(Socket socket, PipeWriter writer)
{
    const int minimumBufferSize = 512;
 
    while (true)
    {
        // Allocate at least 512 bytes from the PipeWriter
        Memory<byte> memory = writer.GetMemory(minimumBufferSize);
        try
        {
            int bytesRead = await socket.ReceiveAsync(memory, SocketFlags.None);
            if (bytesRead == 0)
            {
                break;
            }
            // Tell the PipeWriter how much was read from the Socket
            writer.Advance(bytesRead);
        }
        catch (Exception ex)
        {
            LogError(ex);
            break;
        }
 
        // Make the data available to the PipeReader
        FlushResult result = await writer.FlushAsync();
 
        if (result.IsCompleted)
        {
            break;
        }
    }
 
    // Tell the PipeReader that there's no more data coming
    writer.Complete();
}
 
async Task ReadPipeAsync(PipeReader reader)
{
    while (true)
    {
        ReadResult result = await reader.ReadAsync();
 
        ReadOnlySequence<byte> buffer = result.Buffer;
        SequencePosition? position = null;
 
        do
        {
            // Look for a EOL in the buffer
            position = buffer.PositionOf((byte)'\n');
 
            if (position != null)
            {
                // Process the line
                ProcessLine(buffer.Slice(0, position.Value));
                 
                // Skip the line + the \n character (basically position)
                buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
            }
        }
        while (position != null);
 
        // Tell the PipeReader how much of the buffer we have consumed
        reader.AdvanceTo(buffer.Start, buffer.End);
 
        // Stop reading if there's no more data coming
        if (result.IsCompleted)
        {
            break;
        }
    }
 
    // Mark the PipeReader as complete
    reader.Complete();
}
  1. https://github.com/dotnet/runtime/blob/57bfe474518ab5b7cfe6bf7424a79ce3af9d6657/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/BufferSegmentStack.cs
  2. https://github.com/dotnet/runtime/blob/main/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs
    1. 읽기용 데이터의 시작 인덱스 _readHeadIndex 와 읽기용 데이터의 끝 _readTailIndex 
    2. 쓰기용 데이터의 시작 인덱스 _writingHead 와 바이트의 수 _writingHeadBytesBuffered
    3. 이 인덱스들이 내부에서 왔다갔다 하면서 카피 대신에 해당하는 버퍼의 index들의 데이터들을 스택에 올려 반환하면서 처리를 하게 된다.
    4. 스택을 사용함으로써 GC가 돌지 않게 만들 수 있다.

References

https://learn.microsoft.com/en-us/dotnet/standard/io/pipelines?irgwc=1&OCID=AID2200057_aff_7593_1243925&tduid=(ir__qusfgwwd1wkfbzylzocc1exah32xcv00chrqwasx00)(7593)(1243925)(je6NUbpObpQ-QF0.r_GFsgNy_qAr0H6row)()&irclickid=_qusfgwwd1wkfbzylzocc1exah32xcv00chrqwasx00#pipe-basic-usage?ranMID=24542&ranEAID=je6NUbpObpQ&ranSiteID=je6NUbpObpQ-QF0.r_GFsgNy_qAr0H6row&epi=je6NUbpObpQ-QF0.r_GFsgNy_qAr0H6row

https://github.com/davidfowl/TcpEcho

https://blog.naver.com/oidoman/221674992672

https://habr.com/en/post/466137/

반응형

+ Recent posts