소켓 버퍼로부터 유저 어플리케이션단으로 데이터를 가져오기 위해 해당 버퍼 만큼의 메모리를 할당하고 값을 복사하는 작업 등이 필요하고 각 개발자마다 이 부분을 속도나 메모리나 GC 등 단점 등을 보완하기 위해 각자의 방법으로 처리합니다.
아래는 패킷의 완성이 \r\n 으로 데이터의 끝을 알리는 MS형님의 예제이다.
\r\n을 찾을 때까지 들어오는 데이터를 버퍼링 해야 합니다.
버퍼에 반환된 모든 줄 구문을 분석해야 한다.
보낸 사이즈보다 버퍼가 작아 데이터가 덜 들어오는 경우엔 해당 데이터를 저장을 해야 하고 다음 데이터를 이어 받아야 한다.
async Task ProcessLinesAsync(NetworkStream stream)
{
var buffer = new byte[1024];
var bytesBuffered = 0;
var bytesConsumed = 0;
while (true)
{
var bytesRead = await stream.ReadAsync(buffer, bytesBuffered, buffer.Length - bytesBuffered);
if (bytesRead == 0)
{
// EOF
break;
}
// Keep track of the amount of buffered bytes
bytesBuffered += bytesRead;
var linePosition = -1;
do
{
// Look for a EOL in the buffered data
linePosition = Array.IndexOf(buffer, (byte)'\n', bytesConsumed, bytesBuffered - bytesConsumed);
if (linePosition >= 0)
{
// Calculate the length of the line based on the offset
var lineLength = linePosition - bytesConsumed;
// Process the line
ProcessLine(buffer, bytesConsumed, lineLength);
// Move the bytesConsumed to skip past the line we consumed (including \n)
bytesConsumed += lineLength + 1;
}
}
while (linePosition >= 0);
}
}
버퍼 풀을 이용해서 1024만큼의 버퍼 풀링을 시작했으나 1024 사이즈보다 작은 데이터들이 들어오는 경우 메모리를 더 많이 사용한다.
async Task ProcessLinesAsync(NetworkStream stream)
{
byte[] buffer = ArrayPool<byte>.Shared.Rent(1024);
var bytesBuffered = 0;
var bytesConsumed = 0;
while (true)
{
// Calculate the amount of bytes remaining in the buffer
var bytesRemaining = buffer.Length - bytesBuffered;
if (bytesRemaining == 0)
{
// Double the buffer size and copy the previously buffered data into the new buffer
var newBuffer = ArrayPool<byte>.Shared.Rent(buffer.Length * 2);
Buffer.BlockCopy(buffer, 0, newBuffer, 0, buffer.Length);
// Return the old buffer to the pool
ArrayPool<byte>.Shared.Return(buffer);
buffer = newBuffer;
bytesRemaining = buffer.Length - bytesBuffered;
}
var bytesRead = await stream.ReadAsync(buffer, bytesBuffered, bytesRemaining);
if (bytesRead == 0)
{
// EOF
break;
}
// Keep track of the amount of buffered bytes
bytesBuffered += bytesRead;
do
{
// Look for a EOL in the buffered data
linePosition = Array.IndexOf(buffer, (byte)'\n', bytesConsumed, bytesBuffered - bytesConsumed);
if (linePosition >= 0)
{
// Calculate the length of the line based on the offset
var lineLength = linePosition - bytesConsumed;
// Process the line
ProcessLine(buffer, bytesConsumed, lineLength);
// Move the bytesConsumed to skip past the line we consumed (including \n)
bytesConsumed += lineLength + 1;
}
}
while (linePosition >= 0);
}
}
메모리 낭비를 막기 위해 512보다 작은 경우엔 새로 메모리를 할당한다.
처리량을 늘리기 위해 소켓에 들어온 데이터를 읽는 것과 유저 어플리케이션에 저장된 버퍼를 처리하는 로직을 분산해서 동시에 처리한다.
기타 등등 최적화를 위한 로직이 들어간다면 코드가 복잡해진다.
System.IO.Pipelines가 있는 TCP 서버
new Pipie로 시작을 한다. 이 객체는 내부적으로 LOH 힙에 메모리를 만들며 쓰레드로부터 보호되는 객체를 생성한다.
아래 코드는 소켓의 스트림을 읽어 파이프 안의 메모리에 저장이 되고 그 파이프를 처리하는 로직이 존재하기에 Task가 2개가 생성된 것이다. 동시 처리가 필요없다면 하나만으로도 처리가 가능하다.
async Task ProcessLinesAsync(Socket socket)
{
var pipe = new Pipe();
Task writing = FillPipeAsync(socket, pipe.Writer);
Task reading = ReadPipeAsync(pipe.Reader);
return Task.WhenAll(reading, writing);
}
async Task FillPipeAsync(Socket socket, PipeWriter writer)
{
const int minimumBufferSize = 512;
while (true)
{
// Allocate at least 512 bytes from the PipeWriter
Memory<byte> memory = writer.GetMemory(minimumBufferSize);
try
{
int bytesRead = await socket.ReceiveAsync(memory, SocketFlags.None);
if (bytesRead == 0)
{
break;
}
// Tell the PipeWriter how much was read from the Socket
writer.Advance(bytesRead);
}
catch (Exception ex)
{
LogError(ex);
break;
}
// Make the data available to the PipeReader
FlushResult result = await writer.FlushAsync();
if (result.IsCompleted)
{
break;
}
}
// Tell the PipeReader that there's no more data coming
writer.Complete();
}
async Task ReadPipeAsync(PipeReader reader)
{
while (true)
{
ReadResult result = await reader.ReadAsync();
ReadOnlySequence<byte> buffer = result.Buffer;
SequencePosition? position = null;
do
{
// Look for a EOL in the buffer
position = buffer.PositionOf((byte)'\n');
if (position != null)
{
// Process the line
ProcessLine(buffer.Slice(0, position.Value));
// Skip the line + the \n character (basically position)
buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
}
}
while (position != null);
// Tell the PipeReader how much of the buffer we have consumed
reader.AdvanceTo(buffer.Start, buffer.End);
// Stop reading if there's no more data coming
if (result.IsCompleted)
{
break;
}
}
// Mark the PipeReader as complete
reader.Complete();
}
서비스 파일이 없이 서비스를 등록하려고 하면 Unit not found. 해당 에러가 발생한다.
systemctl enable ba.internalserverd
파일을 생성해준다. 현재 개인 프로젝트의 서버 등록을 위해 ba.internalserverd.service 로 파일을 만들어준다.
vi /etc/systemd/system/ba.internalserverd.service
[Unit]
Description=BA.InterServer.dll
[Service]
WorkingDirectory=/ba/bin/interserver
ExecStart=/usr/bin/dotnet /ba/bin/interserver/BA.InterServer.dll
Restart=always
# Restart service after 10 seconds if the dotnet service crashes:
RestartSec=10
KillSignal=SIGINT
SyslogIdentifier=BA.InterServer
[Install]
WantedBy=multi-user.target