Overview
PipeReader
manages memory on the caller’s behalf.
PipeReader.AdvanceTo
after calling PipeReader.ReadAsync
:
ReadAsync
returns aReadOnlySequence<byte>
that is only valid until the call toAdvanceTo
.- Using the return value after the call to
AdvanceTo
throws an exception.
PipeReader.AdvanceTo
takes two SequencePosition
arguments: one that marks how much data was consumed, and one that marks how much of the
buffer was observed.
Consumed — When data is marked as consumed, the pipe returns the memory to the underlying buffer pool.
Observed — When data is marked as observed, the next call to ReadAsync either:
- won’t return until there’s more data written to the pipe (assuming all day was marked as observed).
- return immediately with the observed and unobserved data (but not data already consumed).
Patterns for Reading Data with PipeReader
For examples below, consider this helper method:
bool TryParseLines(ref ReadOnlySequence<byte> buffer, out Message message);
Reading a Single Message
Read a single message from a PipeReader
and return it to the caller:
async ValueTask<Message?> ReadSingleMessageAsync(PipeReader reader, CancellationToken cancellationToken = default)
{
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> buffer = result.Buffer;
// In the event that no message is parsed successfully, mark consumed as nothing and examined as the entire buffer.
SequencePosition consumed = buffer.Start;
SequencePosition examined = buffer.End;
try
{
if (TryParseLines(ref buffer, out Message message))
{
// A single message was successfully parsed so mark the start of the parsed buffer as consumed.
// TryParseLines trims the buffer to point to the data after the message was parsed.
consumed = buffer.Start;
// Examined is marked the same as consumed here, so the next call to ReadSingleMessageAsync will
// process the next message if there is one.
examined = consumed;
return message;
}
// There's no more data to be processed.
if (result.IsCompleted)
{
if (buffer.Length > 0)
{
// The message is incomplete and there's no more data to process.
throw new InvalidDataException("Incomplete message.");
}
break;
}
}
finally
{
reader.AdvanceTo(consumed, examined);
}
}
return null;
}
Reading Multiple Messages
Read all messages from a PipeReader
and call ProcessMessageAsync
on each:
async Task ProcessMessagesAsync(PipeReader reader, CancellationToken cancellationToken = default)
{
try
{
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> buffer = result.Buffer;
try
{
// Process all messages from the buffer, modifying the input buffer on each iteration.
while (TryParseLines(ref buffer, out Message message))
{
await ProcessMessageAsync(message);
}
// There's no more data to be processed.
if (result.IsCompleted)
{
if (buffer.Length > 0)
{
// The message is incomplete and there's no more data to process.
throw new InvalidDataException("Incomplete message.");
}
break;
}
}
finally
{
// Since all messages in the buffer are being processed, you can use the
// remaining buffer's Start and End position to determine consumed and examined.
reader.AdvanceTo(buffer.Start, buffer.End);
}
}
}
finally
{
await reader.CompleteAsync();
}
}
Cancellation with PipeReader
PipeReader.ReadAsync
accepts a CancellationToken
. However, PipeReader.CancelPendingRead
cancels the current read operation without throwing
an exception. It causes the current or next call to ReadAsync
to return a ReadResult
with IsCanceled
set to true
. to This approach may be favorable in some cases.
Common Problems with PipeReader
See this page for more information.