Abstract [ Documentation]
Pipelines are designed to simplify high performance I/O operations with .NET.
Pipelines are available in System.IO.Pipelines
Nuget package.
Pipe
Pipe is used to create a PipeReader
/PipeWriter
pair, accessible via properties on the Pipe
. All data written in the PipeWriter
is available in the PipeReader
:
var pipe = new Pipe();
PipeReader reader = pipe.Reader;
PipeWriter writer = pipe.Writer;
async Task ProcessLinesAsync(Socket socket)
{
var pipe = new Pipe();
Task writing = FillPipeAsync(socket, pipe.Writer);
Task reading = ReadPipeAsync(pipe.Reader);
await Task.WhenAll(reading, writing);
}
// Reads from the Socket and writes to the PipeWriter:
async Task FillPipeAsync(Socket socket, PipeWriter writer)
{
const int minimumBufferSize = 512;
while (true)
{
// Allocate at least 512 bytes of memory from the Writer:
Memory<byte> memory = writer.GetMemory(minimumBufferSize);
try
{
int bytesRead = await socket.ReceiveAsync(memory, SocketFlags.None);
if (bytesRead == 0)
{
break;
}
// Tell the PipeWriter how much was read from the Socket and written to the buffer:
writer.Advance(bytesRead);
}
catch (Exception ex)
{
LogError(ex);
break;
}
// Make the data available to the PipeReader:
FlushResult result = await writer.FlushAsync();
if (result.IsCompleted)
{
break;
}
}
// By completing PipeWriter, tell the PipeReader that there's no more data coming.
await writer.CompleteAsync();
}
// Reads from the PipeReader:
async Task ReadPipeAsync(PipeReader reader)
{
while (true)
{
ReadResult result = await reader.ReadAsync();
// This buffer holds the data that was read:
ReadOnlySequence<byte> buffer = result.Buffer;
while (TryReadLine(ref buffer, out ReadOnlySequence<byte> line))
{
// Process the line.
ProcessLine(line);
}
// Tell the PipeReader how much of the buffer has been consumed and examined:
reader.AdvanceTo(buffer.Start, buffer.End);
// True if EOF reached:
if (result.IsCompleted)
{
break;
}
}
// Mark the PipeReader as complete and release the allocated memory:
await reader.CompleteAsync();
}
bool TryReadLine(ref ReadOnlySequence<byte> buffer, out ReadOnlySequence<byte> line)
{
// Look for a EOL in the buffer.
SequencePosition? position = buffer.PositionOf((byte)'\n');
if (position == null)
{
line = default;
return false;
}
// Skip the line + the \n.
line = buffer.Slice(0, position.Value);
buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
return true;
}
PipeReader and PipeWriter
See Notes on PipeReader and Notes on PipeWriter.
Best Practices for using PipeReader and PipeWriter
- Always complete the
PipeReader
andPipeWriter
or throw an exception. - Always call
PipeReader.AdvanceTo
andPipeReader.ReadAsync
. - Periodically
await
PipeWriter.FlushAsync
while writing.- Always check
FlushResult.IsCompleted
.- Abort writing if
IsCompleted
istrue
(since the reader has completed and is no longer listening).
- Abort writing if
- Always check
- Always call
PipeWriter.FlushAsync
after writing something you want thePipeReader
to access. Do not callFlushAsync
if the reader cannot start untilFlushAsync
finishes. This may cause a deadlock.Do not accessReadResult.Buffer
after callingAdvanceTo
or completing thePipeReader.
Caution : These types are not thread safe.
Backpressure and Flow Control
When reading and parsing:
- The reading thread consumes data from the network and puts it in buffers.
- The parsing thread constructs data structures.
Parsing takes more time, so the reading thread gets ahead of the parsing thread, so it must either:
- Pause, or;
- Allocate more memory to store the data for the parsing thread.
This is backpressure.
The Pipe
has two settings that adjust flow control:
PauseWriterThreshold
— determines how much data should be buffered before calls toPipeWriter.FlushAsync
are paused.ResumeWriterThreshold
— determines how much data the reader has to consume before calls toPipeWriter.FlushAsync
resume.
When the amount of data in the Pipe
crosses the PauseWriterThreshold
, PipeWriter.FlushAsync
returns an incomplete ValueTask<FlushResult>
.
When the amount of data becomes lower than ResumeWriterThreshold
, PipeWriter.FlushAsync
completes the ValueTask<FlushResult>
.
Example
// The Pipe will start returning incomplete tasks from FlushAsync until the reader examines at least 5 bytes:
var options = new PipeOptions(pauseWriterThreshold: 10, resumeWriterThreshold: 5);
var pipe = new Pipe(options);
PipeScheduler
When using async
/await
, asynchronous code resumes on either a TaskScheduler
or the current SynchronizationContext
.
PipeScheduler
provides fine-grained control over where the asynchronous callbacks run.
By default, the SynchronizationContext
is used. If there isn’t one, the thread pool is used to run callbacks. In this second case,
PipeScheduler.ThreadPool
is the PipeScheduler implementation that queues callbacks to the thread pool.
PipeScheduler.Inline
IDuplexPipe
IDuplexPipe
is a contract for types that support both reading and writing, such as a network connection.
IDuplexPipe
represents one side (reading or writing) of a full duplex connection. What is written to the PipeWriter
will
not be read from the PipeReader
of an IDuplexPipe
.
Streams
See this page for more information on reading and writing streaming data.