paint-brush
Building a High Performance TCP Server in C#by@aykutalparslan
15,878 reads
15,878 reads

Building a High Performance TCP Server in C#

by Aykut Alparslan KocOctober 17th, 2022
Read on Terminal Reader
Read this story w/o Javascript
tldt arrow

Too Long; Didn't Read

ASP.NET Kestrel currently ranks 3rd in plaintext benchmark scores with 7,023,107 responses per second. In this post I will demonstrate how a simple but high performance TCP server can be built borrowing some ideas from the original Kestrel implementation.

Company Mentioned

Mention Thumbnail
featured image - Building a High Performance TCP Server in C#
Aykut Alparslan Koc HackerNoon profile picture

When I decided to build an open-source Telegram Server earlier this year and to build it in C# I needed a template for building the transport layer of the server. I was particularly happy when I checked out the TechEmpower benchmarks in which ASP.NET Kestrel currently ranks 3rd in plaintext benchmark scores with 7,023,107 responses per second. In this post, I will demonstrate how a high-performance TCP server can be built and I will do so by borrowing some ideas from Kestrel.


The code presented leverages low allocation asynchronous programming patterns and pooling to be blazing fast.


Socket operations in .NET are described by SocketAsyncEventArgs which is a reusable object that provides a callback-based asynchronous pattern. First this pattern needs to be wrapped with an async one by implementing the IValueTaskSource interface so that allocations for the asynchronous path can be avoided.


AwaitableEventArgs.cs

//From: https://github.com/dotnet/aspnetcore/blob/main/src/Servers/Kestrel/Transport.Sockets/src/Internal/SocketAwaitableEventArgs.cs#L10-L77
public class AwaitableEventArgs : SocketAsyncEventArgs, IValueTaskSource<int>
{
    private ManualResetValueTaskSourceCore<int> _source;

    public AwaitableEventArgs() :
        base(unsafeSuppressExecutionContextFlow: true)
    {
    }

    protected override void OnCompleted(SocketAsyncEventArgs args)
    {
        if (SocketError != SocketError.Success)
        {
            _source.SetException(new SocketException((int)SocketError));
        }

        _source.SetResult(BytesTransferred);
    }

    public int GetResult(short token)
    {
        int result = _source.GetResult(token);
        _source.Reset();
        return result;
    }

    public ValueTaskSourceStatus GetStatus(short token)
    {
        return _source.GetStatus(token);
    }

    public void OnCompleted(Action<object?> continuation, object? state, short token,
        ValueTaskSourceOnCompletedFlags flags)
    {
        _source.OnCompleted(continuation, state, token, flags);
    }
}


This is rewritten from the original implementation here and the main difference is this version uses ManualResetValueTaskSourceCore for the sake of simplicity while the original one uses a thinner custom implementation of the IValueTaskSource interface.


Needed Receiver and Sender implementations are derived from the AwaitableEventArgs above. They first set the buffer for the SocketAsyncEventArgs and then call the respective SendAsync or ReceiveAsync method of the Socket which returns true if the I/O operation is pending in which case Receiver/Sender can be the source for the ValueTask returned as the base class implements the interface. When OnCompleted(SocketAsyncEventArgs args) is fired, awaited ValueTask will either return the bytes received/sent or throw an Exception. However if the above call returns false it means the I/O operation completed synchronously and synchronous path needs to be handled.


Receiver.cs

//From: https://github.com/dotnet/aspnetcore/blob/main/src/Servers/Kestrel/Transport.Sockets/src/Internal/SocketReceiver.cs
public class Receiver : AwaitableEventArgs
{
   private short _token;
   public ValueTask<int> ReceiveAsync(Socket socket, Memory<byte> memory)
   {
      SetBuffer(memory);
      if (socket.ReceiveAsync(this))
      {
         return new ValueTask<int>(this, _token++);
      }

      var transferred = BytesTransferred;
      var err = SocketError;
      return err == SocketError.Success
         ? new ValueTask<int>(transferred)
         : ValueTask.FromException<int>(new SocketException((int)err));
   }
}


Sender also needs to handle the buffer being a ReadOnlySequence<byte> as it will be the case.


Sender.cs

//From: https://github.com/dotnet/aspnetcore/blob/main/src/Servers/Kestrel/Transport.Sockets/src/Internal/SocketSender.cs
public class Sender: AwaitableEventArgs
{
    private short _token;
    private List<ArraySegment<byte>>? _buffers;
    public ValueTask<int> SendAsync(Socket socket, in ReadOnlyMemory<byte> data)
    {
        SetBuffer(MemoryMarshal.AsMemory(data));
        if (socket.SendAsync(this))
        {
            return new ValueTask<int>(this, _token++);
        }

        var transferred = BytesTransferred;
        var err = SocketError;
        return err == SocketError.Success
            ? new ValueTask<int>(transferred)
            : ValueTask.FromException<int>(new SocketException((int)err));
    }
    public ValueTask<int> SendAsync(Socket socket, in ReadOnlySequence<byte> data)
    {
        if (data.IsSingleSegment)
        {
            return SendAsync(socket, data.First);
        }
        _buffers ??= new List<ArraySegment<byte>>();
        foreach (var buff in data)
        {
            if (!MemoryMarshal.TryGetArray(buff, out var array))
            {
                throw new InvalidOperationException("Buffer is not backed by an array.");
            }

            _buffers.Add(array);
        }

        BufferList = _buffers;

        if (socket.SendAsync(this))
        {
            return new ValueTask<int>(this, _token++);
        }

        var transferred = BytesTransferred;
        var err = SocketError;
        return err == SocketError.Success
            ? new ValueTask<int>(transferred)
            : ValueTask.FromException<int>(new SocketException((int)err));
    }
    public void Reset()
    {
        if (BufferList != null)
        {
            BufferList = null;

            _buffers?.Clear();
        }
        else
        {
            SetBuffer(null, 0, 0);
        }
    }
}


A lightweight SenderPool implementation is also needed. This is backed by a ConcurrentQueue and leverages Interlocked for keeping a cheaper atomic count.


SenderPool.cs

//From: https://github.com/dotnet/aspnetcore/blob/main/src/Servers/Kestrel/Transport.Sockets/src/Internal/SocketSenderPool.cs
public class SenderPool : IDisposable
{
    private readonly int MaxNumberOfSenders;
    private int _count;
    private readonly ConcurrentQueue<Sender> _senders = new();
    private bool _disposed = false;

    public SenderPool(int maxNumberOfSenders = 128)
    {
        MaxNumberOfSenders = maxNumberOfSenders;
    }

    public Sender Rent()
    {
        if (_senders.TryDequeue(out var sender))
        {
            Interlocked.Decrement(ref _count);
            sender.Reset();
            return sender;
        }

        return new Sender();
    }

    public void Return(Sender sender)
    {
        if (_disposed || _count >= MaxNumberOfSenders)
        {
            sender.Dispose();
        }
        else
        {
            Interlocked.Increment(ref _count);
            _senders.Enqueue(sender);
        }
    }

    public void Dispose()
    {
        if (_disposed) return;
        _disposed = true;
        while (_senders.TryDequeue(out var sender))
        {
            sender.Dispose();
        }
    }
}


Now this is the part things start to get more fun. Writing a TCP server requires buffering and parsing streaming data which is a complex task and can be tedious and hard to get right the first time around. Luckily System.IO.Pipelines is designed exactly for this problem. It reduces code complexity and allows high performance parsing of streaming data. I highly recommend anyone reading this to read the original article as well to get a better grasp. Next piece of code will leverage this library.


Connection.cs

//From: https://github.com/dotnet/aspnetcore/blob/main/src/Servers/Kestrel/Transport.Sockets/src/Internal/SocketConnection.cs
public class Connection : IAsyncDisposable
{
    private const int MinBuffSize = 1024;
    private readonly Socket _socket;
    private readonly Receiver _receiver;
    private Sender? _sender;
    private readonly SenderPool _senderPool;
    private Task? _receiveTask;
    private Task? _sendTask;
    private readonly Pipe _transportPipe;
    private readonly Pipe _applicationPipe;
    private readonly object _shutdownLock = new object();
    private volatile bool _socketDisposed;
    public PipeWriter Output { get;}
    public PipeReader Input { get;}

    public Connection(Socket socket, SenderPool senderPool)
    {
        _socket = socket;
        _receiver = new Receiver();
        _senderPool = senderPool;
        _transportPipe = new Pipe();
        Output = _transportPipe.Writer;
        _applicationPipe = new Pipe();
        Input = _applicationPipe.Reader;
    }

    public void Start()
    {
        try
        {
            _sendTask = SendLoop();
            _receiveTask = ReceiveLoop();
        }
        catch (Exception e)
        {
            Console.WriteLine(e);
            throw;
        }
    }
    private async Task SendLoop()
    {
        try
        {
            while (true)
            {
                var result = await _transportPipe.Reader.ReadAsync();
                if (result.IsCanceled)
                {
                    break;
                }
                var buff = result.Buffer;
                if (!buff.IsEmpty)
                {
                    _sender = _senderPool.Rent();
                    await _sender.SendAsync(_socket, result.Buffer);
                    _senderPool.Return(_sender);
                    _sender = null;
                }
                _transportPipe.Reader.AdvanceTo(buff.End);
                if (result.IsCompleted)
                {
                    break;
                }
            }
        }
        catch (Exception e)
        {
            Console.WriteLine(e);
            throw;
        }
        finally
        {
            _applicationPipe.Writer.Complete();
            Shutdown();
        }
    }
    private async Task ReceiveLoop()
    {
        try
        {
            while (true)
            {
                var buff = _applicationPipe.Writer.GetMemory(MinBuffSize);
                var bytes = await _receiver.ReceiveAsync(_socket, buff);
                if (bytes == 0)
                {
                    break;
                }
                _applicationPipe.Writer.Advance(bytes);
                var result = await _applicationPipe.Writer.FlushAsync();
                if (result.IsCanceled || result.IsCompleted)
                {
                    break;
                }
            }
        }
        catch (Exception e)
        {
            Console.WriteLine(e);
            throw;
        }
        finally
        {
            _applicationPipe.Writer.Complete();
            Shutdown();
        }
    }

    public async ValueTask DisposeAsync()
    {
        _transportPipe.Reader.Complete();
        _applicationPipe.Writer.Complete();
        try
        {
            if (_receiveTask != null)
            {
                await _receiveTask;
            }

            if (_sendTask != null)
            {
                await _sendTask;
            }
        }
        finally
        {
            _receiver.Dispose();
            _sender?.Dispose();
        }
    }
    public void Shutdown()
    {
        lock (_shutdownLock)
        {
            if (_socketDisposed)
            {
                return;
            }
            _socketDisposed = true;
            try
            {
                _socket.Shutdown(SocketShutdown.Both);
            }
            finally
            {
                _socket.Dispose();
            }
        }
    }
}


The Connection class has:

  • a Receiver for the connection
  • references to the Socket and the SenderPool
  • a Pipe for transport and another one for the application
  • and finally two loops for sending and receiving.

So how do we send/receive data?

The PipeWriter for the transport pipe is exposed as the public Output property and the SendLoop polls for data written to this pipe renting a Sender before every send operation returning it to the pool after completion. The PipeReader for the application pipe is exposed as the public Input property and the ReceiveLoop basically gets a buffer from this pipe and receives into it. Then there is some cleanup code. This again is a simplified version of the original Kestrel implementation.

TCP Echo

At this point a TCP Echo server needs just a few more lines of code. Following simply copies back the received bytes:


Program.cs

var senderPool = new SenderPool(1024);
var listenSocket = new Socket(SocketType.Stream, ProtocolType.Tcp);
listenSocket.Bind(new IPEndPoint(IPAddress.Loopback, 8989));
listenSocket.Listen(128);
while (true)
{
    var socket = await listenSocket.AcceptAsync();
    var connection = new Connection(socket, senderPool);
    _ = ProcessConnection(connection);
}

static async Task ProcessConnection(Connection connection)
{
    connection.Start();
    while (true)
    {
        var result = await connection.Input.ReadAsync();
        if (result.IsCanceled)
        {
            break;
        }
        var buff = result.Buffer;
        if (!buff.IsEmpty)
        {
            if (buff.IsSingleSegment)
            {
                await connection.Output.WriteAsync(buff.First);
            }
            else
            {
                foreach (var mem in buff)
                {
                    await connection.Output.WriteAsync(mem);
                }
            }
        }
        connection.Input.AdvanceTo(buff.End, buff.End);
        if (result.IsCompleted)
        {
            break;
        }
    }
    connection.Shutdown();
    await connection.DisposeAsync();
}


This will serve as a base for a High Performance TCP Server but of course more measurements and optimizations might be necessary. Code above omits some more advanced implementation details for the sake of simplicity. Some of them are outlined below.

schedule-me-not

When creating a Pipe, it’s possible to specify a MemoryPool<byte> and PipeScheduler‘s for the reader and the writer via the PipeOptions and Kestrel uses a custom PinnedBlockMemoryPool here along with a lock-free IOQueue. SocketAwaitableEventArgs implementation as well supports a PipeScheduler as to have fine-grained control where the I/O happens. Following pseudo-code tries to simplify and outline how Kestrel utilizes schedulers.



var memoryPool = new PinnedBlockMemoryPool();
var applicationScheduler = PipeScheduler.ThreadPool;
var transportScheduler = new IOQueue();
var socketScheduler = transportScheduler;

var receiver = new SocketReceiver(socketScheduler);

var InputOptions = new PipeOptions(memoryPool, 
        applicationScheduler, transportScheduler, 
        maxReadBufferSize, maxReadBufferSize / 2, 
        useSynchronizationContext: false),

var OutputOptions = new PipeOptions(memoryPool, 
        transportScheduler, applicationScheduler, 
        maxWriteBufferSize, maxWriteBufferSize / 2, 
        useSynchronizationContext: false),

var SocketSenderPool = new SocketSenderPool(PipeScheduler.Inline),


It can also be seen above PauseWriterThreshold and ResumeWriterThreshold is used for flow control. This is when processing data is more complicated than simply sending it back in which case the reader would allocate more and more memory. When PauseWriterThreshold is set however and if the amount of data in the pipe is more than set value PipeWriter.FlushAsync will not be completed until it is less than ResumeWriterThreshold again.

Where to go from here?

As I was writing the code for this post some preliminary benchmarks showed that the code presented is actually somewhat slower than a basic Netty Echo Server. Netty would reach 115000 requests per second while code above would reach 110000. It however is a well-established framework, so it shouldn’t be too surprising. This just shows that some proper benchmarks are needed along with profiling and optimization.


The full source can be viewed at the GitHub repo.