In part 2 we introduced an asynchronous API for reading data from the wire, where only the number of received bytes was considered. On this part 3 let's extend it to access the actual received data.
As usual, the entire source code can be found at Minima
Data will be pushed to the CQ shared ring buffers by the kernel whenever data arrives from the wire, this can represent a partial, complete or more than one request. By request I mean it could be a HTTP/1.1, HTTP/2, gRPC, websocket, etc pretty much a request from any protocol. Whenever we call a ReadAsync, we receive a RecvSnapshot, a frozen view of the CQE metadata accumulated so far, currently this metadata only includes the number of bytes from each CQE, we need to add a byte* pointing to where the data is.
public struct Item
{
public byte* Ptr; // new
public ushort Bid;
public int Len;
public bool HasBuffer;
public ReadOnlySpan<byte> AsSpan() => new(Ptr, Len); // new
}
then on the reactor's dispatch receive branch
else if (kind == KindRecv)
{
bool hasBuf = (cqe.flags & IORING_CQE_F_BUFFER) != 0;
ushort bid = hasBuf ? (ushort)(cqe.flags >> IORING_CQE_BUFFER_SHIFT) : (ushort)0;
(...)
byte* ptr = hasBuf ? _bufSlab + (nuint)bid * (nuint)BufferSize : null;
conn.Complete(cqe.res, bid, hasBuf, ptr);
(...)
}
hasBuf - True when the kernel attached a provided buffer to this completion.
bid - the index of the buffer slot the kernel picked from the provided-buffer ring for this recv.
_bufSlab - The contiguous unmanaged memory block backing every recv buffer the kernel can DMA into for this reactor.
So ptr points to the "slot" which can be calculated by knowing the buffer Id and the size of each buffer. These slots are contiguous memory allocated during initialization.
conn.Complete adds a new Item to our SPSC ring. In case you don't remember previous parts, each CQE has a "kind", kind==KindRecv means that this CQE signals data was received from the wire.
Now for each received CQE we can access the byte* where kernel stored the received data, each ReadAsync will return a snapshot that contains one or more Items, each Item contains the metadata for one CQE. On the handler side we must consume this data. We don't want to be dealing with pointers though, that would force us to use unsafe everywhere we touch the received data.
We already have the ReadOnlySpan view of the data via the AsSpan() but spans are ref structs and can't be freely used anywhere. Why is that? Spans are used to create views over stack allocated data unlike its heap allocated counterpart Memory/ReadOnlyMemory, we can't directly use ReadOnlyMemory because it can't be directly created from a byte* even though this byte* points at unmanaged memory allocated upfront in each reactor's _bufSlab.
Enter UnmanagedMemoryManager
internal sealed unsafe class UnmanagedMemoryManager : MemoryManager<byte>
{
private readonly byte* _ptr;
private readonly int _length;
public ushort BufferId { get; }
public UnmanagedMemoryManager(byte* ptr, int length, ushort bufferId)
{
_ptr = ptr;
_length = length;
BufferId = bufferId;
}
public override Span<byte> GetSpan() => new(_ptr, _length);
public override MemoryHandle Pin(int elementIndex = 0) => new(_ptr + elementIndex);
public override void Unpin() { }
protected override void Dispose(bool disposing) { }
}
Similar to Span and Memory, UnmanagedMemoryManager is a view over memory, the abstract method implementations are just ceremony, the data already comes pinned by default. Pin is essentially a struct construction, zero cost at runtime. It exists purely to satisfy the contract. Dispose is empty because the manager is a view, not an owner, the buffer's lifetime is managed by BufferId.
The actual cost is creating a new UnmanagedMemoryManager for each Item, this can also be avoided by pre-allocating all the possible UnmanagedMemoryManagers. Every bid maps to a fixed address in the slab: _bufSlab + bid * BufferSize. The pointer for a given bid never changes. Only Len varies per recv. So we can pre-allocate one manager per slot at reactor init and reuse it forever.
UnmanagedMemoryManager is the bridge between safe and unsafe code, by inheriting from MemoryManager it can be exposed as Memory and plugged into the entire BCL ecosystem for free:
- PipeReader / PipeWriter
- Stream.ReadAsync(Memory) / WriteAsync(ReadOnlyMemory)
- ReadOnlySequence (built from ReadOnlyMemory segments)
- IBufferWriter
- Any async API that takes Memory
This is especially useful for ReadOnlySequence which is very handy when dealing with TCP fragmentation.
This is how zero allocation is achieved on the receiving branch, the data is written to pre-allocated slots and we directly read from them. While this is zero copy in user space, the kernel itself still copies the bytes from the socket receive buffer into _bufSlab[bid], avoiding that copy requires different mechanisms (io_uring zcrx) which are outside Part 3's scope. This data can be parsed by slicing over it and is valid until we return the CQE's buffer Id as can be seen in the snippet below through reactor.ReturnBuffer. After returning each buffer, the kernel may reuse that "slot" for new incoming data so its data can be invalid/overwritten.
So, how does the handler look now?
public static async Task HandleAsync(Reactor reactor, int fd, Connection conn)
{
try
{
while (true)
{
RecvSnapshot snap = await conn.ReadAsync();
while (conn.TryGetItem(snap, out SpscRecvRing.Item item))
{
if (item.HasBuffer)
{
UnmanagedMemoryManager mem = item.AsMemoryManager();
ReadOnlyMemory<byte> data = mem.Memory;
// data is now usable with any BCL Memory<byte>/async API
_ = data.Length;
reactor.ReturnBuffer(mem.BufferId);
}
conn.QueueResponse(fd);
}
if (snap.IsClosed)
{
conn.Close(fd);
return;
}
conn.ResetRead();
}
}
catch (Exception ex)
{
Console.Error.WriteLine($"[r{reactor.Id}] handler crash on fd={fd}: {ex}");
conn.Close(fd);
}
}
The possibilities are now endless, we can build a ReadOnlySequence from all the data to facilitate slicing across multiple segments, also in the case of incomplete requests we can again create a ReadOnlySequence, call another ReadAsync and add the received segments to the already existing ReadOnlySequence. We'll put this to use in Part 4 when we parse a real HTTP request that spans multiple segments.