ConnectionStream
ConnectionStream is a thin Stream adapter that bridges zerg’s native connection API to BCL pipeline APIs. It provides zero-copy writes and single-copy reads for compatibility with code that expects a System.IO.Stream.
Class Definition
public sealed class ConnectionStream : StreamConstructor
public ConnectionStream(Connection inner)Wraps an existing Connection in a Stream interface. The underlying connection must already be registered with a reactor.
Supported Operations
| Property | Value | Description |
|---|---|---|
CanRead | true | Reading is supported via reactor receive rings |
CanWrite | true | Writing appends to the connection’s unmanaged slab |
CanSeek | false | Network streams are not seekable |
Write Operations
Write(byte[], int, int)
public override void Write(byte[] buffer, int offset, int count)Synchronous write. Validates parameters and copies the buffer slice into the connection’s unmanaged write slab.
WriteAsync(ReadOnlyMemory<byte>, CancellationToken)
public override ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default)Fast async write with no allocation and no implicit flush. Returns a completed ValueTask if the buffer is empty. Otherwise copies the data into the write slab synchronously and returns completed.
Note: This does not flush. Call FlushAsync() to send data.
Flush
FlushAsync(CancellationToken)
public override Task FlushAsync(CancellationToken token)Flushes all previously written data. Delegates to connection.FlushAsync().AsTask(). The reactor controls the actual send – the returned Task completes when all staged bytes have been transmitted.
Flush()
public override void Flush()Throws NotSupportedException. Synchronous flush is not supported. Use FlushAsync().
Read Operations
ReadAsync(Memory<byte>, CancellationToken)
public override ValueTask<int> ReadAsync(Memory<byte> destination, CancellationToken cancellationToken = default)Reads from the reactor receive rings and copies into destination.
Steps:
- Awaits the next receive snapshot from the reactor (
connection.ReadAsync()) - Returns 0 (EOF) if the connection is closed
- Gathers all segments belonging to the snapshot
- Copies once into the caller’s buffer via
CopyFromRings() - Returns each ring buffer to the reactor pool
- Calls
ResetRead()to prepare for the next cycle - Returns the number of bytes copied
ReadAsync(byte[], int, int, CancellationToken)
public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)Legacy array-based read. Delegates to the Memory<byte> overload.
Read(byte[], int, int)
public override int Read(byte[] buffer, int offset, int count)Throws NotSupportedException. Synchronous reads are not supported on an async reactor-driven connection.
Unsupported Operations
| Method | Behavior |
|---|---|
Read(byte[], int, int) | Throws NotSupportedException |
Flush() | Throws NotSupportedException |
Seek(long, SeekOrigin) | Throws NotSupportedException |
SetLength(long) | Throws NotSupportedException |
Length (property) | Throws NotSupportedException |
Position (property) | Throws NotSupportedException |
Disposal
protected override void Dispose(bool disposing)Idempotent disposal using an interlocked guard. Disposes the underlying connection. Safe to call multiple times.
Usage Example
static async Task HandleWithStreamAsync(Connection connection)
{
await using var stream = new ConnectionStream(connection);
var buffer = new byte[4096];
int bytesRead;
while ((bytesRead = await stream.ReadAsync(buffer)) > 0)
{
// Process data in buffer[..bytesRead]
// Echo back
await stream.WriteAsync(buffer.AsMemory(0, bytesRead));
await stream.FlushAsync();
}
}When to Use ConnectionStream
Use ConnectionStream when you need to integrate with APIs that require Stream:
System.Text.Jsonserialization/deserializationStreamReader/StreamWriterfor text protocols- Third-party libraries that accept
Stream
For zero-copy reads with partial consumption support, prefer ConnectionPipeReader instead. For maximum performance, prefer the native Connection API directly (ReadAsync/Write/FlushAsync). ConnectionStream adds one copy on reads (from kernel buffers into your destination buffer) and wraps ValueTask as Task for FlushAsync.
Design Notes
- No internal buffering: Reads pull directly from reactor rings; writes go directly to the slab
- No synchronization: The reactor provides exclusivity guarantees
- Single copy on read: Data is copied from kernel-provided buffers into the caller’s buffer
- Zero-copy on write: Data is copied into the slab (same as native
Write())