Ring clients
The reactor serves HTTP. Everything else a backend talks to - Postgres, Redis, files, an upstream service - is a client riding the same ring, written against one seam. The engine never names a client type, so adding a capability never touches the reactor.
The seam: IRingHost
public interface IRingHost
{
// Stream I/O: sockets.
void SubmitConnect(int fd, nint sockaddr, int len, IRingCompletion completion);
void SubmitSend (int fd, nint buffer, int len, IRingCompletion completion);
void SubmitRecv (int fd, nint buffer, int len, IRingCompletion completion);
// Positional I/O: files.
void SubmitRead (int fd, nint buffer, int len, long offset, IRingCompletion completion);
void SubmitWrite (int fd, nint buffer, int len, long offset, IRingCompletion completion);
}
public interface IRingCompletion
{
void Complete(int result); // bytes transferred (0 for connect), or a negative errno
}
Every submission carries its own completion object. The reactor stores it in a slot table,
puts the slot index in the SQE's user_data, and when the CQE lands calls
Complete - inline, on the reactor thread. Routing is per operation, not
per descriptor: two concurrent reads of the same file, or a send racing a recv on one socket,
never collide, and nothing is registered or unbound around descriptor lifetimes.
- Buffers cross as
nint- raw pointers can't survive anawait; an address can. Buffers must be native (or pinned) memory that outlives the operation. - Any thread may submit - off-reactor calls are marshalled to the reactor through its wake queue automatically.
- Results follow io_uring's convention - bytes transferred on success, 0 for a successful connect (or EOF on recv), a negative errno on failure. Clients translate these into their own error semantics.
The kit
RingOpSource - one CQE becomes one await
private readonly RingOpSource _op = new();
public ValueTask<int> ReadAsync(int fd, nint buffer, int length, long offset)
{
var pending = _op.Prepare(); // arm - zero allocation
_host.SubmitRead(fd, buffer, length, offset, _op); // the source IS the completion
return pending; // resumes inline on the CQE
}
It implements both IRingCompletion and IValueTaskSource<int>
and is reusable across operations. One op in flight per source at a time - a duplex client
owns two (one per direction).
RingSocket - TCP that lives on the ring
CreateTcp(host) makes the descriptor (with TCP_NODELAY);
ConnectAsync(ip, port), SendAsync, and RecvAsync are
all ring ops. Even the connect is asynchronous, so a client can dial out - or re-dial after a
failure - without ever stalling the reactor. It holds a tx and an rx source, so one send and
one recv may overlap. IPv4 literals only: resolve DNS up front, a resolver would block the
thread.
RingPool<T> - one-conversation clients, N at a time
Most protocol connections carry one exchange at a time. The pool turns that into
per-reactor concurrency: RentAsync pops an idle resource or queues the renter
FIFO; Return hands it to the oldest waiter, whose continuation runs synchronously
- an inline return resumes the next handler inline. Seed it by Returning freshly
created resources.
Adding your own client
A ring client is three things: a file descriptor, native buffers, and a
protocol state machine that alternates submits and awaits. If you already have an io_uring
client in another stack, porting it means swapping its submission/completion layer for
IRingHost calls - the protocol logic carries over unchanged. The rules:
- Open on the right thread. Create the client from
Reactor.OnStartso its operations ride that reactor's ring, and register it (or its pool) withreactor.AddService(...). One client per reactor - shared-nothing applies to clients too. - Own native memory. Allocate wire buffers with
NativeMemory.Allocand keep addresses asnintfields so async methods can do arithmetic on them. A buffer must stay valid until its operation completes. - One RingOpSource per concurrent direction. Sequential request/response needs one; full duplex needs two. Never two ops on one source.
- Loop your sends and recvs. A send may be short - loop until done. A recv returns whatever arrived - parse incrementally and recv again until your protocol says the message is complete.
- Translate errors, mark broken. Negative results are errnos; 0 on recv is EOF. A transport failure should mark the client broken so a pool can discard and replace it - see how PgPool heals.
- Never dispose with an op in flight. The completion slot holds a reference to your source; close only from a known-idle state.
Here is a complete minimal Redis client built on those rules - real RESP bytes, real framing, inline resume end to end:
public sealed class RedisConnection : IDisposable
{
private const int BufferSize = 16 * 1024;
private readonly RingSocket _socket;
private readonly nint _send;
private readonly nint _recv;
private int _received;
public bool IsBroken { get; private set; }
private RedisConnection(RingSocket socket, nint send, nint recv)
{
_socket = socket;
_send = send;
_recv = recv;
}
// Call from Reactor.OnStart - the connect rides this reactor's ring.
public static async Task<RedisConnection> ConnectAsync(IRingHost host, string ip, ushort port)
{
var socket = RingSocket.CreateTcp(host);
int rc = await socket.ConnectAsync(ip, port);
if (rc < 0)
{
socket.Dispose();
throw new IOException($"redis connect failed: errno {-rc}");
}
nint send, recv;
unsafe
{
send = (nint)NativeMemory.Alloc(BufferSize);
recv = (nint)NativeMemory.Alloc(BufferSize);
}
return new RedisConnection(socket, send, recv);
}
// GET key → bulk string (null when the key is missing).
public async ValueTask<string?> GetAsync(string key)
{
int length = WriteCommand("GET", key); // RESP into _send
await SendAllAsync(length);
_received = 0;
while (true)
{
int n = await _socket.RecvAsync(_recv + _received, BufferSize - _received);
if (n <= 0)
{
IsBroken = true;
throw new IOException(n == 0 ? "redis closed" : $"recv errno {-n}");
}
_received += n;
if (TryParseBulkString(out string? value)) // resync per RESP framing
return value;
// incomplete reply - recv again
}
}
private async ValueTask SendAllAsync(int length)
{
int sent = 0;
while (sent < length)
{
int n = await _socket.SendAsync(_send + sent, length - sent);
if (n <= 0)
{
IsBroken = true;
throw new IOException($"send errno {-n}");
}
sent += n;
}
}
private unsafe int WriteCommand(string verb, string key)
{
// *2\r\n$3\r\nGET\r\n$<len>\r\n<key>\r\n
var buffer = new Span<byte>((void*)_send, BufferSize);
int p = 0;
p += Encoding.ASCII.GetBytes($"*2\r\n${verb.Length}\r\n{verb}\r\n${key.Length}\r\n{key}\r\n",
buffer);
return p;
}
private unsafe bool TryParseBulkString(out string? value)
{
value = null;
var data = new ReadOnlySpan<byte>((void*)_recv, _received);
// $<len>\r\n<bytes>\r\n ($-1\r\n = null)
if (data.Length < 4 || data[0] != (byte)'$') return false;
int eol = data.IndexOf((byte)'\r');
if (eol < 0 || data.Length < eol + 2) return false;
int len = int.Parse(data[1..eol]);
if (len == -1) return true; // null reply
int start = eol + 2;
if (data.Length < start + len + 2) return false; // body not fully here yet
value = Encoding.UTF8.GetString(data.Slice(start, len));
return true;
}
public unsafe void Dispose()
{
_socket.Dispose();
NativeMemory.Free((void*)_send);
NativeMemory.Free((void*)_recv);
}
}
Wire it exactly like the Postgres pool - open N per reactor, register, rent in handlers:
reactor.OnStart = r =>
{
var pool = new RingPool<RedisConnection>();
for (int i = 0; i < 4; i++)
{
_ = RedisConnection.ConnectAsync(r, "127.0.0.1", 6379)
.ContinueWith(t => pool.Return(t.Result),
TaskContinuationOptions.OnlyOnRanToCompletion);
}
r.AddService(pool);
};
// in the handler:
var pool = reactor.GetService<RingPool<RedisConnection>>();
var redis = await pool.RentAsync();
try
{
var value = await redis.GetAsync("hello");
}
finally
{
pool.Return(redis);
}
Positional clients
For file-like descriptors the shape is even smaller: no connect, no socket - just
SubmitRead/SubmitWrite at offsets through a
RingOpSource. RingFile (one file) and AssetReader (a
pooled buffer reading any cached descriptor) in ioxide.file are the reference
implementations - both under 100 lines.
Reference implementations
ioxide.pg is the full worked example: ring-native connect and handshake,
incremental wire parsing with buffer growth and compaction, server-vs-transport error
semantics, and pool-managed healing. Start from it when your protocol has a handshake;
start from the Redis sketch above when it doesn't.