Pipe——高性能IO(一)

网友投稿 476 2022-06-19


System.IO.Pipelines是一个新的库,旨在简化在.NET中执行高性能IO的过程。它是一个依赖.NET Standard的库,适用于所有.NET实现。

Pipelines诞生于.NET Core团队,为使Kestrel成为业界最快的Web服务器之一。最初从作为Kestrel内部的实现细节发展成为可重用的API,它在.Net Core 2.1中作为可用于所有.NET开发人员的最高级BCL API(System.IO.Pipelines)提供。

它解决了什么问题?

为了正确解析Stream或Socket中的数据,代码有固定的样板,并且有许多极端情况,为了处理他们,不得不编写难以维护的复杂代码。

实现高性能和正确性,同时也难以处理这种复杂性。Pipelines旨在解决这种复杂性。

有多复杂?

让我们从一个简单的问题开始吧。我们想编写一个TCP服务器,它接收来自客户端的用行分隔的消息(由\n分隔)。(译者注:即一行为一条消息)

使用NetworkStream的TCP服务器

在Pipelines之前用.NET编写的典型代码如下所示:

async Task ProcessLinesAsync(NetworkStream stream)

{

var buffer = new byte[1024];

await stream.ReadAsync(buffer, 0, buffer.Length);

// 在buffer中处理一行消息

ProcessLine(buffer);

}

此代码可能在本地测试时正确工作,但它有几个潜在错误:

一次ReadAsync调用可能没有收到整个消息(行尾)。

它忽略了stream.ReadAsync()返回值中实际填充到buffer中的数据量。(译者注:即不一定将buffer填充满)

一次ReadAsync调用不能处理多条消息。

这些是读取流数据时常见的一些缺陷。为了解决这个问题,我们需要做一些改变:

我们需要缓冲传入的数据,直到找到新的行。

我们需要解析缓冲区中返回的所有行

async Task ProcessLinesAsync(NetworkStream stream)

{

var buffer = new byte[1024];

var bytesBuffered = 0;

var bytesConsumed = 0;

while (true)

{

var bytesRead = await stream.ReadAsync(buffer, bytesBuffered, buffer.Length - bytesBuffered);

if (bytesRead == 0)

{

// EOF 已经到末尾

break;

}

// 跟踪已缓冲的字节数

bytesBuffered += bytesRead;

var linePosition = -1;

do

{

// 在缓冲数据中查找找一个行末尾

linePosition = Array.IndexOf(buffer, (byte)‘\n‘, bytesConsumed, bytesBuffered - bytesConsumed);

if (linePosition >= 0)

{

// 根据偏移量计算一行的长度

var lineLength = linePosition - bytesConsumed;

// 处理这一行

ProcessLine(buffer, bytesConsumed, lineLength);

// 移动bytesConsumed为了跳过我们已经处理掉的行 (包括\n)

bytesConsumed += lineLength + 1;

}

}

while (linePosition >= 0);

}

}

这一次,这可能适用于本地开发,但一行可能大于1KiB(1024字节)。我们需要调整输入缓冲区的大小,直到找到新行。

因此,我们可以在堆上分配缓冲区去处理更长的一行。我们从客户端解析较长的一行时,可以通过使用ArrayPool避免重复分配缓冲区来改进这一点。

async Task ProcessLinesAsync(NetworkStream stream)

{

byte[] buffer = ArrayPool.Shared.Rent(1024);

var bytesBuffered = 0;

var bytesConsumed = 0;

while (true)

{

// 在buffer中计算中剩余的字节数

var bytesRemaining = buffer.Length - bytesBuffered;

if (bytesRemaining == 0)

{

// 将buffer size翻倍 并且将之前缓冲的数据复制到新的缓冲区

var newBuffer = ArrayPool.Shared.Rent(buffer.Length * 2);

Buffer.BlockCopy(buffer, 0, newBuffer, 0, buffer.Length);

// 将旧的buffer丢回池中

ArrayPool.Shared.Return(buffer);

buffer = newBuffer;

bytesRemaining = buffer.Length - bytesBuffered;

}

var bytesRead = await stream.ReadAsync(buffer, bytesBuffered, bytesRemaining);

if (bytesRead == 0)

{

// EOF 末尾

break;

}

// 跟踪已缓冲的字节数

bytesBuffered += bytesRead;

do

{

// 在缓冲数据中查找找一个行末尾

linePosition = Array.IndexOf(buffer, (byte)‘\n‘, bytesConsumed, bytesBuffered - bytesConsumed);

if (linePosition >= 0)

{

// 根据偏移量计算一行的长度

var lineLength = linePosition - bytesConsumed;

// 处理这一行

ProcessLine(buffer, bytesConsumed, lineLength);

// 移动bytesConsumed为了跳过我们已经处理掉的行 (包括\n)

bytesConsumed += lineLength + 1;

}

}

while (linePosition >= 0);

}

}

这段代码有效,但现在我们正在重新调整缓冲区大小,从而产生更多缓冲区副本。它将使用更多内存,因为根据代码在处理一行行后不会缩缓冲区的大小。为避免这种情况,我们可以存储缓冲区序列,而不是每次超过1KiB大小时调整大小。

此外,我们不会增长1KiB的 缓冲区,直到它完全为空。这意味着我们最终传递给ReadAsync越来越小的缓冲区,这将导致对操作系统的更多调用。

为了缓解这种情况,我们将在现有缓冲区中剩余少于512个字节时分配一个新缓冲区:

public class BufferSegment

{

public byte[] Buffer { get; set; }

public int Count { get; set; }

public int Remaining => Buffer.Length - Count;

}

async Task ProcessLinesAsync(NetworkStream stream)

{

const int minimumBufferSize = 512;

var segments = new List();

var bytesConsumed = 0;

var bytesConsumedBufferIndex = 0;

var segment = new BufferSegment { Buffer = ArrayPool.Shared.Rent(1024) };

segments.Add(segment);

while (true)

{

// Calculate the amount of bytes remaining in the buffer

if (segment.Remaining < minimumBufferSize)

{

// Allocate a new segment

segment = new BufferSegment { Buffer = ArrayPool.Shared.Rent(1024) };

segments.Add(segment);

}

var bytesRead = await stream.ReadAsync(segment.Buffer, segment.Count, segment.Remaining);

if (bytesRead == 0)

{

break;

}

// Keep track of the amount of buffered bytes

segment.Count += bytesRead;

while (true)

{

// Look for a EOL in the list of segments

var (segmentIndex, segmentOffset) = IndexOf(segments, (byte)‘\n‘, bytesConsumedBufferIndex, bytesConsumed);

if (segmentIndex >= 0)

{

// Process the line

ProcessLine(segments, segmentIndex, segmentOffset);

bytesConsumedBufferIndex = segmentOffset;

bytesConsumed = segmentOffset + 1;

}

else

{

break;

}

}

// Drop fully consumed segments from the list so we don‘t look at them again

for (var i = bytesConsumedBufferIndex; i >= 0; --i)

{

var consumedSegment = segments[i];

// Return all segments unless this is the current segment

if (consumedSegment != segment)

{

ArrayPool.Shared.Return(consumedSegment.Buffer);

segments.RemoveAt(i);

}

}

}

}

(int segmentIndex, int segmentOffest) IndexOf(List segments, byte value, int startBufferIndex, int startSegmentOffset)

{

var first = true;

for (var i = startBufferIndex; i < segments.Count; ++i)

{

var segment = segments[i];

// Start from the correct offset

var offset = first ? startSegmentOffset : 0;

var index = Array.IndexOf(segment.Buffer, value, offset, segment.Count - offset);

if (index >= 0)

{

// Return the buffer index and the index within that segment where EOL was found

return (i, index);

}

first = false;

}

return (-1, -1);

}

此代码只是得到很多更加复杂。当我们正在寻找分隔符时,我们同时跟踪已填充的缓冲区序列。为此,我们此处使用List查找新行分隔符时表示缓冲数据。其结果是,ProcessLine和IndexOf现在接受List作为参数,而不是一个byte[],offset和count。我们的解析逻辑现在需要处理一个或多个缓冲区序列。

我们的服务器现在处理部分消息,它使用池化内存来减少总体内存消耗,但我们还需要进行更多更改:

我们使用的byte[]和ArrayPool的只是普通的托管数组。这意味着无论何时我们执行ReadAsync或WriteAsync,这些缓冲区都会在异步操作的生命周期内被固定(以便与操作系统上的本机IO API互操作)。这对GC有性能影响,因为无法移动固定内存,这可能导致堆碎片。根据异步操作挂起的时间长短,池的实现可能需要更改。

可以通过解耦读取逻辑和处理逻辑来优化吞吐量。这会创建一个批处理效果,使解析逻辑可以使用更大的缓冲区块,而不是仅在解析单个行后才读取更多数据。这引入了一些额外的复杂性

我们需要两个彼此独立运行的循环。一个读取Socket和一个解析缓冲区。

当数据可用时,我们需要一种方法来向解析逻辑发出信号。

我们需要决定如果循环读取Socket“太快”会发生什么。如果解析逻辑无法跟上,我们需要一种方法来限制读取循环(逻辑)。这通常被称为“流量控制”或“背压”。

我们需要确保事情是线程安全的。我们现在在读取循环和解析循环之间共享多个缓冲区,并且这些缓冲区在不同的线程上独立运行。

内存管理逻辑现在分布在两个不同的代码段中,从填充缓冲区池的代码是从套接字读取的,而从缓冲区池取数据的代码是解析逻辑。

我们需要非常小心在解析逻辑完成之后我们如何处理缓冲区序列。如果我们不小心,我们可能会返回一个仍由Socket读取逻辑写入的缓冲区序列。

复杂性已经到了极端(我们甚至没有涵盖所有案例)。高性能网络应用通常意味着编写非常复杂的代码,以便从系统中获得更高的性能。

System.IO.Pipelines的目标是使这种类型的代码更容易编写。

使用System.IO.Pipelines的TCP服务器

让我们来看看这个例子的样子System.IO.Pipelines:

async Task ProcessLinesAsync(Socket socket)

{

var pipe = new Pipe();

Task writing = FillPipeAsync(socket, pipe.Writer);

Task reading = ReadPipeAsync(pipe.Reader);

return Task.WhenAll(reading, writing);

}

async Task FillPipeAsync(Socket socket, PipeWriter writer)

{

const int minimumBufferSize = 512;

while (true)

{

// 从PipeWriter至少分配512字节

Memory memory = writer.GetMemory(minimumBufferSize);

try

{

int bytesRead = await socket.ReceiveAsync(memory, SocketFlags.None);

if (bytesRead == 0)

{

break;

}

// 告诉PipeWriter从套接字读取了多少

writer.Advance(bytesRead);

}

catch (Exception ex)

{

LogError(ex);

break;

}

// 标记数据可用,让PipeReader读取

FlushResult result = await writer.FlushAsync();

if (result.IsCompleted)

{

break;

}

}

// 告诉PipeReader没有更多的数据

writer.Complete();

}

async Task ReadPipeAsync(PipeReader reader)

{

while (true)

{

ReadResult result = await reader.ReadAsync();

ReadOnlySequence buffer = result.Buffer;

SequencePosition? position = null;

do

{

// 在缓冲数据中查找找一个行末尾

position = buffer.PositionOf((byte)‘\n‘);

if (position != null)

{

// 处理这一行

ProcessLine(buffer.Slice(0, position.Value));

// 跳过 这一行+\n (basically position 主要位置?)

buffer = buffer.Slice(buffer.GetPosition(1, position.Value));

}

}

while (position != null);

// 告诉PipeReader我们以及处理多少缓冲

reader.AdvanceTo(buffer.Start, buffer.End);

// 如果没有更多的数据,停止都去

if (result.IsCompleted)

{

break;

}

}

// 将PipeReader标记为完成

reader.Complete();

}

我们的行读取器的pipelines版本有2个循环:

FillPipeAsync从Socket读取并写入PipeWriter。

ReadPipeAsync从PipeReader中读取并解析传入的行。

与原始示例不同,在任何地方都没有分配显式缓冲区。这是管道的核心功能之一。所有缓冲区管理都委托给PipeReader/PipeWriter实现。

这使得使用代码更容易专注于业务逻辑而不是复杂的缓冲区管理。

在第一个循环中,我们首先调用PipeWriter.GetMemory(int)从底层编写器获取一些内存; 然后我们调用PipeWriter.Advance(int)告诉PipeWriter我们实际写入缓冲区的数据量。然后我们调用PipeWriter.FlushAsync()来提供数据给PipeReader。

在第二个循环中,我们正在使用PipeWriter最终来自的缓冲区Socket。当调用PipeReader.ReadAsync()返回时,我们得到一个ReadResult包含2条重要信息,包括以ReadOnlySequence形式读取的数据和bool IsCompleted,让reader知道writer是否写完(EOF)。在找到行尾(EOL)分隔符并解析该行之后,我们将缓冲区切片以跳过我们已经处理过的内容,然后我们调用PipeReader.AdvanceTo告诉PipeReader我们消耗了多少数据。

在每个循环结束时,我们完成了reader和writer。这允许底层Pipe释放它分配的所有内存。

System.IO.Pipelines

除了处理内存管理之外,其他核心管道功能还包括能够在Pipe不实际消耗数据的情况下查看数据。

PipeReader有两个核心API ReadAsync和AdvanceTo。ReadAsync获取Pipe数据,AdvanceTo告诉PipeReader不再需要这些缓冲区,以便可以丢弃它们(例如返回到底层缓冲池)。

这是一个http解析器的示例,它在接收Pipe到有效起始行之前读取部分数据缓冲区数据。

ReadOnlySequence

该Pipe实现存储了在PipeWriter和PipeReader之间传递的缓冲区的链接列表。PipeReader.ReadAsync暴露一个ReadOnlySequence新的BCL类型,它表示一个或多个ReadOnlyMemory段的视图,类似于Span和Memory提供数组和字符串的视图。

该Pipe内部维护指向reader和writer可以分配或更新它们的数据集合,。SequencePosition表示缓冲区链表中的单个点,可用于有效地对ReadOnlySequence进行切片。


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:C# Moq(cctv5)
下一篇:转:C# String为值类型还是引用类型(转承起合的成语)
相关文章

 发表评论

暂时没有评论,来抢沙发吧~