When I decided to build an open-source Telegram Server earlier this year and to build it in C# I needed a template for building the transport layer of the server. I was particularly happy when I checked out the TechEmpower benchmarks in which ASP.NET Kestrel currently ranks 3rd in plaintext benchmark scores with 7,023,107 responses per second. In this post, I will demonstrate how a high-performance TCP server can be built and I will do so by borrowing some ideas from Kestrel.
The code presented leverages low allocation asynchronous programming patterns and pooling to be blazing fast.
Socket operations in .NET are described by SocketAsyncEventArgs
which is a reusable object that provides a callback-based asynchronous pattern. First this pattern needs to be wrapped with an async one by implementing the IValueTaskSource
interface so that allocations for the asynchronous path can be avoided.
AwaitableEventArgs.cs
//From: https://github.com/dotnet/aspnetcore/blob/main/src/Servers/Kestrel/Transport.Sockets/src/Internal/SocketAwaitableEventArgs.cs#L10-L77
public class AwaitableEventArgs : SocketAsyncEventArgs, IValueTaskSource<int>
{
private ManualResetValueTaskSourceCore<int> _source;
public AwaitableEventArgs() :
base(unsafeSuppressExecutionContextFlow: true)
{
}
protected override void OnCompleted(SocketAsyncEventArgs args)
{
if (SocketError != SocketError.Success)
{
_source.SetException(new SocketException((int)SocketError));
}
_source.SetResult(BytesTransferred);
}
public int GetResult(short token)
{
int result = _source.GetResult(token);
_source.Reset();
return result;
}
public ValueTaskSourceStatus GetStatus(short token)
{
return _source.GetStatus(token);
}
public void OnCompleted(Action<object?> continuation, object? state, short token,
ValueTaskSourceOnCompletedFlags flags)
{
_source.OnCompleted(continuation, state, token, flags);
}
}
This is rewritten from the original implementation here and the main difference is this version uses ManualResetValueTaskSourceCore
for the sake of simplicity while the original one uses a thinner custom implementation of the IValueTaskSource
interface.
Needed Receiver
and Sender
implementations are derived from the AwaitableEventArgs
above. They first set the buffer for the SocketAsyncEventArgs
and then call the respective SendAsync
or ReceiveAsync
method of the Socket
which returns true if the I/O operation is pending in which case Receiver/Sender
can be the source
for the ValueTask
returned as the base class implements the interface. When OnCompleted(SocketAsyncEventArgs args)
is fired, awaited ValueTask
will either return the bytes received/sent or throw an Exception
. However if the above call returns false it means the I/O operation completed synchronously and synchronous path needs to be handled.
Receiver.cs
//From: https://github.com/dotnet/aspnetcore/blob/main/src/Servers/Kestrel/Transport.Sockets/src/Internal/SocketReceiver.cs
public class Receiver : AwaitableEventArgs
{
private short _token;
public ValueTask<int> ReceiveAsync(Socket socket, Memory<byte> memory)
{
SetBuffer(memory);
if (socket.ReceiveAsync(this))
{
return new ValueTask<int>(this, _token++);
}
var transferred = BytesTransferred;
var err = SocketError;
return err == SocketError.Success
? new ValueTask<int>(transferred)
: ValueTask.FromException<int>(new SocketException((int)err));
}
}
Sender
also needs to handle the buffer being a ReadOnlySequence<byte>
as it will be the case.
Sender.cs
//From: https://github.com/dotnet/aspnetcore/blob/main/src/Servers/Kestrel/Transport.Sockets/src/Internal/SocketSender.cs
public class Sender: AwaitableEventArgs
{
private short _token;
private List<ArraySegment<byte>>? _buffers;
public ValueTask<int> SendAsync(Socket socket, in ReadOnlyMemory<byte> data)
{
SetBuffer(MemoryMarshal.AsMemory(data));
if (socket.SendAsync(this))
{
return new ValueTask<int>(this, _token++);
}
var transferred = BytesTransferred;
var err = SocketError;
return err == SocketError.Success
? new ValueTask<int>(transferred)
: ValueTask.FromException<int>(new SocketException((int)err));
}
public ValueTask<int> SendAsync(Socket socket, in ReadOnlySequence<byte> data)
{
if (data.IsSingleSegment)
{
return SendAsync(socket, data.First);
}
_buffers ??= new List<ArraySegment<byte>>();
foreach (var buff in data)
{
if (!MemoryMarshal.TryGetArray(buff, out var array))
{
throw new InvalidOperationException("Buffer is not backed by an array.");
}
_buffers.Add(array);
}
BufferList = _buffers;
if (socket.SendAsync(this))
{
return new ValueTask<int>(this, _token++);
}
var transferred = BytesTransferred;
var err = SocketError;
return err == SocketError.Success
? new ValueTask<int>(transferred)
: ValueTask.FromException<int>(new SocketException((int)err));
}
public void Reset()
{
if (BufferList != null)
{
BufferList = null;
_buffers?.Clear();
}
else
{
SetBuffer(null, 0, 0);
}
}
}
A lightweight SenderPool
implementation is also needed. This is backed by a ConcurrentQueue
and leverages Interlocked
for keeping a cheaper atomic count.
SenderPool.cs
//From: https://github.com/dotnet/aspnetcore/blob/main/src/Servers/Kestrel/Transport.Sockets/src/Internal/SocketSenderPool.cs
public class SenderPool : IDisposable
{
private readonly int MaxNumberOfSenders;
private int _count;
private readonly ConcurrentQueue<Sender> _senders = new();
private bool _disposed = false;
public SenderPool(int maxNumberOfSenders = 128)
{
MaxNumberOfSenders = maxNumberOfSenders;
}
public Sender Rent()
{
if (_senders.TryDequeue(out var sender))
{
Interlocked.Decrement(ref _count);
sender.Reset();
return sender;
}
return new Sender();
}
public void Return(Sender sender)
{
if (_disposed || _count >= MaxNumberOfSenders)
{
sender.Dispose();
}
else
{
Interlocked.Increment(ref _count);
_senders.Enqueue(sender);
}
}
public void Dispose()
{
if (_disposed) return;
_disposed = true;
while (_senders.TryDequeue(out var sender))
{
sender.Dispose();
}
}
}
Now this is the part things start to get more fun. Writing a TCP server requires buffering and parsing streaming data which is a complex task and can be tedious and hard to get right the first time around. Luckily System.IO.Pipelines
is designed exactly for this problem. It reduces code complexity and allows high performance parsing of streaming data. I highly recommend anyone reading this to read the original article as well to get a better grasp. Next piece of code will leverage this library.
Connection.cs
//From: https://github.com/dotnet/aspnetcore/blob/main/src/Servers/Kestrel/Transport.Sockets/src/Internal/SocketConnection.cs
public class Connection : IAsyncDisposable
{
private const int MinBuffSize = 1024;
private readonly Socket _socket;
private readonly Receiver _receiver;
private Sender? _sender;
private readonly SenderPool _senderPool;
private Task? _receiveTask;
private Task? _sendTask;
private readonly Pipe _transportPipe;
private readonly Pipe _applicationPipe;
private readonly object _shutdownLock = new object();
private volatile bool _socketDisposed;
public PipeWriter Output { get;}
public PipeReader Input { get;}
public Connection(Socket socket, SenderPool senderPool)
{
_socket = socket;
_receiver = new Receiver();
_senderPool = senderPool;
_transportPipe = new Pipe();
Output = _transportPipe.Writer;
_applicationPipe = new Pipe();
Input = _applicationPipe.Reader;
}
public void Start()
{
try
{
_sendTask = SendLoop();
_receiveTask = ReceiveLoop();
}
catch (Exception e)
{
Console.WriteLine(e);
throw;
}
}
private async Task SendLoop()
{
try
{
while (true)
{
var result = await _transportPipe.Reader.ReadAsync();
if (result.IsCanceled)
{
break;
}
var buff = result.Buffer;
if (!buff.IsEmpty)
{
_sender = _senderPool.Rent();
await _sender.SendAsync(_socket, result.Buffer);
_senderPool.Return(_sender);
_sender = null;
}
_transportPipe.Reader.AdvanceTo(buff.End);
if (result.IsCompleted)
{
break;
}
}
}
catch (Exception e)
{
Console.WriteLine(e);
throw;
}
finally
{
_applicationPipe.Writer.Complete();
Shutdown();
}
}
private async Task ReceiveLoop()
{
try
{
while (true)
{
var buff = _applicationPipe.Writer.GetMemory(MinBuffSize);
var bytes = await _receiver.ReceiveAsync(_socket, buff);
if (bytes == 0)
{
break;
}
_applicationPipe.Writer.Advance(bytes);
var result = await _applicationPipe.Writer.FlushAsync();
if (result.IsCanceled || result.IsCompleted)
{
break;
}
}
}
catch (Exception e)
{
Console.WriteLine(e);
throw;
}
finally
{
_applicationPipe.Writer.Complete();
Shutdown();
}
}
public async ValueTask DisposeAsync()
{
_transportPipe.Reader.Complete();
_applicationPipe.Writer.Complete();
try
{
if (_receiveTask != null)
{
await _receiveTask;
}
if (_sendTask != null)
{
await _sendTask;
}
}
finally
{
_receiver.Dispose();
_sender?.Dispose();
}
}
public void Shutdown()
{
lock (_shutdownLock)
{
if (_socketDisposed)
{
return;
}
_socketDisposed = true;
try
{
_socket.Shutdown(SocketShutdown.Both);
}
finally
{
_socket.Dispose();
}
}
}
}
The Connection
class has:
Receiver
for the connectionSocket
and the SenderPool
Pipe
for transport and another one for the applicationThe PipeWriter
for the transport pipe is exposed as the public Output
property and the SendLoop
polls for data written to this pipe renting a Sender
before every send operation returning it to the pool after completion. The PipeReader
for the application pipe is exposed as the public Input
property and the ReceiveLoop
basically gets a buffer from this pipe and receives into it. Then there is some cleanup code. This again is a simplified version of the original Kestrel implementation.
At this point a TCP Echo server needs just a few more lines of code. Following simply copies back the received bytes:
Program.cs
var senderPool = new SenderPool(1024);
var listenSocket = new Socket(SocketType.Stream, ProtocolType.Tcp);
listenSocket.Bind(new IPEndPoint(IPAddress.Loopback, 8989));
listenSocket.Listen(128);
while (true)
{
var socket = await listenSocket.AcceptAsync();
var connection = new Connection(socket, senderPool);
_ = ProcessConnection(connection);
}
static async Task ProcessConnection(Connection connection)
{
connection.Start();
while (true)
{
var result = await connection.Input.ReadAsync();
if (result.IsCanceled)
{
break;
}
var buff = result.Buffer;
if (!buff.IsEmpty)
{
if (buff.IsSingleSegment)
{
await connection.Output.WriteAsync(buff.First);
}
else
{
foreach (var mem in buff)
{
await connection.Output.WriteAsync(mem);
}
}
}
connection.Input.AdvanceTo(buff.End, buff.End);
if (result.IsCompleted)
{
break;
}
}
connection.Shutdown();
await connection.DisposeAsync();
}
This will serve as a base for a High Performance TCP Server but of course more measurements and optimizations might be necessary. Code above omits some more advanced implementation details for the sake of simplicity. Some of them are outlined below.
When creating a Pipe
, it’s possible to specify a MemoryPool<byte>
and PipeScheduler
‘s for the reader and the writer via the PipeOptions
and Kestrel uses a custom PinnedBlockMemoryPool here along with a lock-free IOQueue. SocketAwaitableEventArgs implementation as well supports a PipeScheduler
as to have fine-grained control where the I/O happens. Following pseudo-code tries to simplify and outline how Kestrel utilizes schedulers.
var memoryPool = new PinnedBlockMemoryPool();
var applicationScheduler = PipeScheduler.ThreadPool;
var transportScheduler = new IOQueue();
var socketScheduler = transportScheduler;
var receiver = new SocketReceiver(socketScheduler);
var InputOptions = new PipeOptions(memoryPool,
applicationScheduler, transportScheduler,
maxReadBufferSize, maxReadBufferSize / 2,
useSynchronizationContext: false),
var OutputOptions = new PipeOptions(memoryPool,
transportScheduler, applicationScheduler,
maxWriteBufferSize, maxWriteBufferSize / 2,
useSynchronizationContext: false),
var SocketSenderPool = new SocketSenderPool(PipeScheduler.Inline),
It can also be seen above PauseWriterThreshold
and ResumeWriterThreshold
is used for flow control. This is when processing data is more complicated than simply sending it back in which case the reader would allocate more and more memory. When PauseWriterThreshold
is set however and if the amount of data in the pipe is more than set value PipeWriter.FlushAsync
will not be completed until it is less than ResumeWriterThreshold
again.
As I was writing the code for this post some preliminary benchmarks showed that the code presented is actually somewhat slower than a basic Netty Echo Server. Netty would reach 115000 requests per second while code above would reach 110000. It however is a well-established framework, so it shouldn’t be too surprising. This just shows that some proper benchmarks are needed along with profiling and optimization.
The full source can be viewed at the GitHub repo.