关于System.IO.Pipelines的简介

Pipeline是什么

System.IO.Pipelines是一个随 .net core 2.1一起发布的新的IO相关的基础库,它没有随 .net core 2.1一起发布,而是需要采用NuGet Package的方式独立安装。很明显,Pipelines库中使用了大量 .net standard 2.0及以后的基础类型(诸如Span, Memory等),相关的API也是全新风格,因此如果想在之前的项目中使用则需要升级 .net版本。

Pipeline能解决什么问题

在Pipeline出来之前,我们在C#中一般会使用Stream来实现相关的IO操作,比如使用NetworkStream从一个Socket中读写网络数据。比如下面这段代码就是一个简单的例子:

1
2
3
4
5
6
7
8
9
async Task Read(NetworkStream stream)
{
var buffer = new byte[1024];
await stream.ReadAsync(buffer, 0, buffer.Length);

//buffer handler
//...
}

每次尝试从网络中读取数据的时候,需要预先申请一块内存当buffer,然后再尝试从socket中读取数据写入到这块buffer中。通常来说我们需要做各种优化,比如buffer的循环使用,并且实际应用时,我们通常可能需要在另一个线程去使用这些从socket中来的数据。但是这样以来,又会牵扯到另一个问题–线程安全,并且与此同时还要保证性能,我们需要非常小心且聪明的编写我们的代码才能避免出现各bug。而Pipeline的出现,极大程度上帮我们减小了解决这个问题的负担。

如何使用Pipeline

在Pipeline库中,最基础的类型是Pipe,每个Pipe拥有Reader和Writer两个成员。顾名思义,Pipe.Reader负责从Pipe中读取数据,Pipe.Writer负责往Pipe中写入数据,就好比一根水管的两端,你可以从一段往里面塞东西,也可以从另一端取东西。并且Pipe底层的实现已经帮我们保证了Reader和Writer的方法的线程安全性。但是需要注意的是,由于目前 .net core相关文档的缺失,实际使用中我还是发现了一些坑。以代码为例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
private void readLoop()
{
Task.Run(async () =>
{
Exception error = null;
while (!Cancel.IsCancellationRequested)
{
try
{
var buffer = pipe.Writer.GetMemory(1024);
int bytesRecv = await tcpclient.Client.ReceiveAsync(buffer, SocketFlags.None);
if (bytesRecv == 0)
{
break;
}

// Tell the PipeWriter how much was read from the Socket
pipe.Writer.Advance(bytesRecv);
}
catch (Exception e)
{
error = e;
break;
}

var result = await pipe.Writer.FlushAsync(Cancel.Token);
if (result.IsCompleted || result.IsCanceled)
{
break;
}
}

pipe.Writer.Complete(error);
Close(PipelineCompleteException.Instance);
});
}

这是一段使用Pipeline从socket中读取数据的代码,在一个循环中我们首先从Pipe中申请一块内存,然后往这块内存中写入从socket中读取的数据。然后需要显式的调用Pipe.Writer.Advance()去告诉Pipe的另一端也就是Pipe.Reader我们写入了这么多数据,最后再调用Pipe.Writer.FlushAsync()。这样,我们就可以在别的线程使用Pipe.Reader.ReadAsync()来读取数据了。

1
2
3
4
5
6
7
var result = await pipe.Reader.ReadAsync(cancel.Token);
if (result.IsCompleted)
{
break;
}
//需要手动通知Writer我们已经读了多少,这些内存已经可以回收了
pipe.Reader.AdvanceTo(result.Buffer.End);

这是一段从Pipeline中读取数据的代码,需要注意的是在每次使用Reader.ReadAsync()之后必须要调用一次Reader.AdvanceTo(),不然是没有办法调用下次ReadAsync的。之所以这样做原因也很简单,Pipeline底层是有一块自己管理的buffer的,用户需要写的时候会从里面申请内存,用完了以后会放回去,这样才能做到循环使用避免造成过多的gc或者memory leak。另外因为AdvanceTo的语义是告诉Pipe.Writer我已经从Pipe中消费了多少数据,这部分数据的内存已经可以回收了或者说可以往里面写入了。如果同时有多个线程同时调用ReadAsync,那么每个线程读取到的数据指向的一块内存地址是可能会有重叠的部分,而当其中任意一个线程调用AdvanceTo的时候这部分内存地址就会被回收,可能会被写入新的内容,而其他的线程如果这时候还在尝试读取这块数据,就会出现各种奇怪的问题。另一个就是result的值是需要判断的,比如result.IsCompleted,这时候表示Writer已经完成所有数据往Pipe里的写入了,没有新的数据会被写入进Pipe里。这时候我们需要判断,并且决定接下来的行为(是继续等待还是结束)。

通过上面的例子我们不能看出使用Pipeline编写的代码无需我们手动添加任何lock或者手动管理内存,并且更重要的是它是原本较为复杂的代码结构变得非常的简洁和清晰。因为在我的经验中,任何涉及多线程的代码中,保持结构的简洁和清晰是非常重要的,一段各种循环/lock嵌套,同步异步方法混用的代码绝对是埋下的炸弹,往往只有出现bug的时候你才能发现原来这种当时没有考虑到(事实上你往往很难想清楚所有的情况)的情况会引发诸如xxx的问题。

更多的参考

如今自 .net core 2.1出来也已经过了一段时间,关于 .net core中的新内容,已经有很多人在尝试并且研究,其中不乏关于Pipeline的内容。下面是我了解并且推荐的的一些比较优质的博客:

  • David Fowler的这篇博客用一个简单的应用场景介绍了如何使用Pipeline以及和老的API相比它又有什么优势

  • Marc Gravell的Pipe Dreams系列博客,非常详细的介绍了Pipeline的用途以及他关于如何使用Pipeline的更深层次的思考

关于System.IO.Pipelines的简介

http://xulanting.net/2019/03/04/post4/

Author

John Doe

Posted on

2019-03-04

Updated on

2021-02-07

Licensed under

You need to set install_url to use ShareThis. Please set it in _config.yml.

Comments

You forgot to set the shortname for Disqus. Please set it in _config.yml.