Ring clients

The reactor serves HTTP. Everything else a backend talks to - Postgres, Redis, files, an upstream service - is a client riding the same ring, written against one seam. The engine never names a client type, so adding a capability never touches the reactor.

The seam: IRingHost

public interface IRingHost
{
    // Stream I/O: sockets.
    void SubmitConnect(int fd, nint sockaddr, int len, IRingCompletion completion);
    void SubmitSend   (int fd, nint buffer, int len, IRingCompletion completion);
    void SubmitRecv   (int fd, nint buffer, int len, IRingCompletion completion);

    // Positional I/O: files.
    void SubmitRead   (int fd, nint buffer, int len, long offset, IRingCompletion completion);
    void SubmitWrite  (int fd, nint buffer, int len, long offset, IRingCompletion completion);
}

public interface IRingCompletion
{
    void Complete(int result);   // bytes transferred (0 for connect), or a negative errno
}

Every submission carries its own completion object. The reactor stores it in a slot table, puts the slot index in the SQE's user_data, and when the CQE lands calls Complete - inline, on the reactor thread. Routing is per operation, not per descriptor: two concurrent reads of the same file, or a send racing a recv on one socket, never collide, and nothing is registered or unbound around descriptor lifetimes.

The kit

RingOpSource - one CQE becomes one await

private readonly RingOpSource _op = new();

public ValueTask<int> ReadAsync(int fd, nint buffer, int length, long offset)
{
    var pending = _op.Prepare();                       // arm - zero allocation
    _host.SubmitRead(fd, buffer, length, offset, _op); // the source IS the completion
    return pending;                                    // resumes inline on the CQE
}

It implements both IRingCompletion and IValueTaskSource<int> and is reusable across operations. One op in flight per source at a time - a duplex client owns two (one per direction).

RingSocket - TCP that lives on the ring

CreateTcp(host) makes the descriptor (with TCP_NODELAY); ConnectAsync(ip, port), SendAsync, and RecvAsync are all ring ops. Even the connect is asynchronous, so a client can dial out - or re-dial after a failure - without ever stalling the reactor. It holds a tx and an rx source, so one send and one recv may overlap. IPv4 literals only: resolve DNS up front, a resolver would block the thread.

RingPool<T> - one-conversation clients, N at a time

Most protocol connections carry one exchange at a time. The pool turns that into per-reactor concurrency: RentAsync pops an idle resource or queues the renter FIFO; Return hands it to the oldest waiter, whose continuation runs synchronously - an inline return resumes the next handler inline. Seed it by Returning freshly created resources.

Adding your own client

A ring client is three things: a file descriptor, native buffers, and a protocol state machine that alternates submits and awaits. If you already have an io_uring client in another stack, porting it means swapping its submission/completion layer for IRingHost calls - the protocol logic carries over unchanged. The rules:

  1. Open on the right thread. Create the client from Reactor.OnStart so its operations ride that reactor's ring, and register it (or its pool) with reactor.AddService(...). One client per reactor - shared-nothing applies to clients too.
  2. Own native memory. Allocate wire buffers with NativeMemory.Alloc and keep addresses as nint fields so async methods can do arithmetic on them. A buffer must stay valid until its operation completes.
  3. One RingOpSource per concurrent direction. Sequential request/response needs one; full duplex needs two. Never two ops on one source.
  4. Loop your sends and recvs. A send may be short - loop until done. A recv returns whatever arrived - parse incrementally and recv again until your protocol says the message is complete.
  5. Translate errors, mark broken. Negative results are errnos; 0 on recv is EOF. A transport failure should mark the client broken so a pool can discard and replace it - see how PgPool heals.
  6. Never dispose with an op in flight. The completion slot holds a reference to your source; close only from a known-idle state.

Here is a complete minimal Redis client built on those rules - real RESP bytes, real framing, inline resume end to end:

public sealed class RedisConnection : IDisposable
{
    private const int BufferSize = 16 * 1024;

    private readonly RingSocket _socket;
    private readonly nint _send;
    private readonly nint _recv;
    private int _received;

    public bool IsBroken { get; private set; }

    private RedisConnection(RingSocket socket, nint send, nint recv)
    {
        _socket = socket;
        _send = send;
        _recv = recv;
    }

    // Call from Reactor.OnStart - the connect rides this reactor's ring.
    public static async Task<RedisConnection> ConnectAsync(IRingHost host, string ip, ushort port)
    {
        var socket = RingSocket.CreateTcp(host);

        int rc = await socket.ConnectAsync(ip, port);
        if (rc < 0)
        {
            socket.Dispose();
            throw new IOException($"redis connect failed: errno {-rc}");
        }

        nint send, recv;
        unsafe
        {
            send = (nint)NativeMemory.Alloc(BufferSize);
            recv = (nint)NativeMemory.Alloc(BufferSize);
        }

        return new RedisConnection(socket, send, recv);
    }

    // GET key → bulk string (null when the key is missing).
    public async ValueTask<string?> GetAsync(string key)
    {
        int length = WriteCommand("GET", key);          // RESP into _send
        await SendAllAsync(length);

        _received = 0;
        while (true)
        {
            int n = await _socket.RecvAsync(_recv + _received, BufferSize - _received);
            if (n <= 0)
            {
                IsBroken = true;
                throw new IOException(n == 0 ? "redis closed" : $"recv errno {-n}");
            }
            _received += n;

            if (TryParseBulkString(out string? value))  // resync per RESP framing
                return value;
            // incomplete reply - recv again
        }
    }

    private async ValueTask SendAllAsync(int length)
    {
        int sent = 0;
        while (sent < length)
        {
            int n = await _socket.SendAsync(_send + sent, length - sent);
            if (n <= 0)
            {
                IsBroken = true;
                throw new IOException($"send errno {-n}");
            }
            sent += n;
        }
    }

    private unsafe int WriteCommand(string verb, string key)
    {
        // *2\r\n$3\r\nGET\r\n$<len>\r\n<key>\r\n
        var buffer = new Span<byte>((void*)_send, BufferSize);
        int p = 0;
        p += Encoding.ASCII.GetBytes($"*2\r\n${verb.Length}\r\n{verb}\r\n${key.Length}\r\n{key}\r\n",
                                     buffer);
        return p;
    }

    private unsafe bool TryParseBulkString(out string? value)
    {
        value = null;
        var data = new ReadOnlySpan<byte>((void*)_recv, _received);

        // $<len>\r\n<bytes>\r\n   ($-1\r\n = null)
        if (data.Length < 4 || data[0] != (byte)'$') return false;
        int eol = data.IndexOf((byte)'\r');
        if (eol < 0 || data.Length < eol + 2) return false;

        int len = int.Parse(data[1..eol]);
        if (len == -1) return true;                      // null reply

        int start = eol + 2;
        if (data.Length < start + len + 2) return false; // body not fully here yet

        value = Encoding.UTF8.GetString(data.Slice(start, len));
        return true;
    }

    public unsafe void Dispose()
    {
        _socket.Dispose();
        NativeMemory.Free((void*)_send);
        NativeMemory.Free((void*)_recv);
    }
}

Wire it exactly like the Postgres pool - open N per reactor, register, rent in handlers:

reactor.OnStart = r =>
{
    var pool = new RingPool<RedisConnection>();
    for (int i = 0; i < 4; i++)
    {
        _ = RedisConnection.ConnectAsync(r, "127.0.0.1", 6379)
                           .ContinueWith(t => pool.Return(t.Result),
                                         TaskContinuationOptions.OnlyOnRanToCompletion);
    }
    r.AddService(pool);
};

// in the handler:
var pool  = reactor.GetService<RingPool<RedisConnection>>();
var redis = await pool.RentAsync();
try
{
    var value = await redis.GetAsync("hello");
}
finally
{
    pool.Return(redis);
}

Positional clients

For file-like descriptors the shape is even smaller: no connect, no socket - just SubmitRead/SubmitWrite at offsets through a RingOpSource. RingFile (one file) and AssetReader (a pooled buffer reading any cached descriptor) in ioxide.file are the reference implementations - both under 100 lines.

Reference implementations

ioxide.pg is the full worked example: ring-native connect and handshake, incremental wire parsing with buffer growth and compaction, server-vs-transport error semantics, and pool-managed healing. Start from it when your protocol has a handshake; start from the Redis sketch above when it doesn't.