manages memory on the caller’s behalf.
after calling PipeReader.ReadAsync
returns aReadOnlySequence<byte>
that is only valid until the call toAdvanceTo
.- Using the return value after the call to
throws an exception.
takes two SequencePosition
arguments: one that marks how much data was consumed, and one that marks how much of the
buffer was observed.
Consumed — When data is marked as consumed, the pipe returns the memory to the underlying buffer pool.
Observed — When data is marked as observed, the next call to ReadAsync either:
- won’t return until there’s more data written to the pipe (assuming all day was marked as observed).
- return immediately with the observed and unobserved data (but not data already consumed).
patterns for reading data with pipereader
For examples below, consider this helper method:
bool TryParseLines(ref ReadOnlySequence<byte> buffer, out Message message);
reading a single message
Read a single message from a PipeReader
and return it to the caller:
async ValueTask<Message?> ReadSingleMessageAsync(PipeReader reader, CancellationToken cancellationToken = default)
while (true)
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> buffer = result.Buffer;
// In the event that no message is parsed successfully, mark consumed as nothing and examined as the entire buffer.
SequencePosition consumed = buffer.Start;
SequencePosition examined = buffer.End;
if (TryParseLines(ref buffer, out Message message))
// A single message was successfully parsed so mark the start of the parsed buffer as consumed.
// TryParseLines trims the buffer to point to the data after the message was parsed.
consumed = buffer.Start;
// Examined is marked the same as consumed here, so the next call to ReadSingleMessageAsync will
// process the next message if there is one.
examined = consumed;
return message;
// There's no more data to be processed.
if (result.IsCompleted)
if (buffer.Length > 0)
// The message is incomplete and there's no more data to process.
throw new InvalidDataException("Incomplete message.");
reader.AdvanceTo(consumed, examined);
return null;
reading multiple messages
Read all messages from a PipeReader
and call ProcessMessageAsync
on each:
async Task ProcessMessagesAsync(PipeReader reader, CancellationToken cancellationToken = default)
while (true)
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> buffer = result.Buffer;
// Process all messages from the buffer, modifying the input buffer on each iteration.
while (TryParseLines(ref buffer, out Message message))
await ProcessMessageAsync(message);
// There's no more data to be processed.
if (result.IsCompleted)
if (buffer.Length > 0)
// The message is incomplete and there's no more data to process.
throw new InvalidDataException("Incomplete message.");
// Since all messages in the buffer are being processed, you can use the
// remaining buffer's Start and End position to determine consumed and examined.
reader.AdvanceTo(buffer.Start, buffer.End);
await reader.CompleteAsync();
cancellation with pipereader
accepts a CancellationToken
. However, PipeReader.CancelPendingRead
cancels the current read operation without throwing
an exception. It causes the current or next call to ReadAsync
to return a ReadResult
with IsCanceled
set to true
. to This approach may be favorable in some cases.
common problems with pipereader
See this page for more information.