overview
PipeReader manages memory on the caller’s behalf.
PipeReader.AdvanceTo after calling PipeReader.ReadAsync:
- ReadAsyncreturns a- ReadOnlySequence<byte>that is only valid until the call to- AdvanceTo.
- Using the return value after the call to AdvanceTothrows an exception.
PipeReader.AdvanceTo takes two SequencePosition arguments: one that marks how much data was consumed, and one that marks how much of the
buffer was observed.
Consumed — When data is marked as consumed, the pipe returns the memory to the underlying buffer pool.
Observed — When data is marked as observed, the next call to ReadAsync either:
- won’t return until there’s more data written to the pipe (assuming all day was marked as observed).
- return immediately with the observed and unobserved data (but not data already consumed).
patterns for reading data with pipereader
For examples below, consider this helper method:
bool TryParseLines(ref ReadOnlySequence<byte> buffer, out Message message);
reading a single message
Read a single message from a PipeReader and return it to the caller:
async ValueTask<Message?> ReadSingleMessageAsync(PipeReader reader, CancellationToken cancellationToken = default)
{
    while (true)
    {
        ReadResult result = await reader.ReadAsync(cancellationToken);
        ReadOnlySequence<byte> buffer = result.Buffer;
        // In the event that no message is parsed successfully, mark consumed as nothing and examined as the entire buffer.
        SequencePosition consumed = buffer.Start;
        SequencePosition examined = buffer.End;
        try
        {
            if (TryParseLines(ref buffer, out Message message))
            {
                // A single message was successfully parsed so mark the start of the parsed buffer as consumed. 
                // TryParseLines trims the buffer to point to the data after the message was parsed.
                consumed = buffer.Start;
                // Examined is marked the same as consumed here, so the next call to ReadSingleMessageAsync will 
                // process the next message if there is one.
                examined = consumed;
                return message;
            }
            // There's no more data to be processed.
            if (result.IsCompleted)
            {
                if (buffer.Length > 0)
                {
                    // The message is incomplete and there's no more data to process.
                    throw new InvalidDataException("Incomplete message.");
                }
                break;
            }
        }
        finally
        {
            reader.AdvanceTo(consumed, examined);
        }
    }
    return null;
}
reading multiple messages
Read all messages from a PipeReader and call ProcessMessageAsync on each:
async Task ProcessMessagesAsync(PipeReader reader, CancellationToken cancellationToken = default)
{
    try
    {
        while (true)
        {
            ReadResult result = await reader.ReadAsync(cancellationToken);
            ReadOnlySequence<byte> buffer = result.Buffer;
            try
            {
                // Process all messages from the buffer, modifying the input buffer on each iteration.
                while (TryParseLines(ref buffer, out Message message))
                {
                    await ProcessMessageAsync(message);
                }
                // There's no more data to be processed.
                if (result.IsCompleted)
                {
                    if (buffer.Length > 0)
                    {
                        // The message is incomplete and there's no more data to process.
                        throw new InvalidDataException("Incomplete message.");
                    }
                    break;
                }
            }
            finally
            {
                // Since all messages in the buffer are being processed, you can use the
                // remaining buffer's Start and End position to determine consumed and examined.
                reader.AdvanceTo(buffer.Start, buffer.End);
            }
        }
    }
    finally
    {
        await reader.CompleteAsync();
    }
}
cancellation with pipereader
PipeReader.ReadAsync accepts a CancellationToken. However, PipeReader.CancelPendingRead cancels the current read operation without throwing
an exception. It causes the current or next call to ReadAsync to return a ReadResult with IsCanceled set to true. to This approach may be favorable in some cases.
common problems with pipereader
See this page for more information.