HashedWheelTimer

为什么我们需要Timer?

经常写网络层代码的人肯定不会对定时器(Timer)陌生,因为有很多应用场景会需要用到定时器或者与之相似的东西,比如:

  • delay一段时间再执行某个方法
  • 等待某个事件直到超过一定的时间上限
  • 以一定的间隔循环执行某些方法

而在 .net中(广义的 .net包括 .net framework, .net core, .net standard, xamarin等),一共有四种定时器(Timer)类型,分别是:

  • System.Windows.Forms.Timer(.NET Framework only):windows窗体程序使用的定时器,只能在单线程中使用

  • System.Web.UI.Timer(.NET Framework only):ASP.NET使用的定时器

  • System.Threading.Timer:可以在指定的时间间隔执行方法,也可通过传入时间参数控制其行为,但是此类无法继承并且传入的方法一旦确定就无法修改,几乎所有的控制权都由 .net runtime掌控

  • System.Timers.Timer:主要用于在程序中生成定时的事件,不是所有的 .net版本都支持

考虑到适用性,一般在 .net core中使用的都是System.Threading.Timer

.net的Timer存在的一些问题

一般来说,我们会这么使用System.Threading.Timer

1
2
3
4
var timer = new Timer((object)=>{
//do something
}),null, 1000, 250);.
timer.Change(0, 500);

上面代码先是new了一个System.Threading.Timer的实例,在构造这个实例的时候,需要传入你希望执行的回调函数,执行回调的所需要的存储的状态(如果不需要则传入null即可),第一次执行回调的delay的时间,以及之后每次执行回调的时间间隔(如果这个值设置为Timeout.Infinite则不会重复执行回调).但是实际使用的时候会发现,如果够早了大量的Timer的话,会导致函数执行回调的时间精度有显著地下降.同时Timer本身还存在一个问题,就是回调函数的执行时间会有一些误差,因为Timer的实现依赖底层的操作系统.在Windows上,一般这个误差是在15ms左右,详细信息可以查看MSDN上的文档.

浏览System.Threading.Timer的实现,不难发现问题的原因.因为在当前版本的.net的实现里面,new一个Timer对象的时候,实际上做的事情相当于是把传进来的回调插入到一个在线程池中工作的队列中,而唤醒这个回调函数的时候则需要去这个队列里面查找时间到了的回调函数,随着Timer数量的增加,队列的长度也会明显的增加,这时候再插入或者唤醒就会有显著的开销.

HashWheelTimer

这时候就轮到我们的HashWheelTimer登场了,考虑一下,什么样的数据结构插入的时间复杂度最低?不难发现是HashTable,因为其插入的时间复杂度是o(1),那么接下来就要考虑一下如何将这个特点应用到Timer的查找中.如果我们对时间精度要求不是非常的高(一般大部分应用需要的时间精度都不会小于1ms),那么我们可以构建这样一张HashTable,这张表中每个槽位表示一个最小时间精度的间隔比如10ms,槽位用于存放回调函数.这样在这张表里第一个槽位存放的就是10ms后需要执行的回调函数,第二个槽位里面存放的就是20ms后要执行的回调函数,以此类推…

进一步分析,可以将原来的HashTable改为一个ring buffer,这样就可以循环使用,不过这样也限制了最大delay的时间(虽然实际使用中我们很少会去delay很长一段时间再去执行某个操作).同时我们需要保留有一个指向当前槽位的记录,这样这个定时器的工作流程可以概括为:每次await一定的时间(最小时间精度),然后将指向当前槽位的索引记录+1,取出这个槽位里的回调函数,依次执行,最后再继续await直到下一次循环开始.

不过需要注意的一点是,既然在 .net core中,await Task.Delay()这一行为本身是会产生一些误差的,那么长时间执行以后难免会有较大的累积误差.如何解决这个累积误差也很简单,引入一个执行次数的计数,暂且称之为execCount和我们HashWheelTimer的执行总时间.每次循环开始时用执行总时间除以最小时间精度得到值和execCount相比较,如果大于说明执行的次数不够,则这次循环执行完回调函数后不再等待,直接进行下一次循环,这样就可以从总体上避免累积误差的产生.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
Task.Run(async () =>
{
//初始化总执行时间和执行次数计数
sw.Start();
var execCount = 0;
//HashWheelTimer主循环
while (true)
{
if (( sw.ElapsedMilliseconds / tickDuration ) > execCount)
{
//执行delay的回调函数
doCallBack();
execCount++;
}
else
{
//await一个最小时间精度
await Task.Delay(tickDuration);
}
}
});

总结

经过上述步骤,我们就可以得到一个初步可用的HashWheelTimer,它拥有o(1)时间复杂度的插入操作,同时也避免了累积误差的出现.
但同时它也有两个缺点:

  • 1.受限于最小时间精度
  • 2.最大delay时间有上限

所以,是否使用它还是得结合项目的实际情况去看.

PS:在.net core 2.1中,我看到Timer已经引入一些优化(虽然并没有解决根本性的问题),比如根据Environment.ProcessorCount来决定Timer工作队列的数量,每次会把新构造的TimerCallBack分配到不同的队列中等等,也许在不久的未来,我们甚至可以看到标准库中引入HashWheelTimer.

关于System.IO.Pipelines的简介

Pipeline是什么

System.IO.Pipelines是一个随 .net core 2.1一起发布的新的IO相关的基础库,它没有随 .net core 2.1一起发布,而是需要采用NuGet Package的方式独立安装。很明显,Pipelines库中使用了大量 .net standard 2.0及以后的基础类型(诸如Span, Memory等),相关的API也是全新风格,因此如果想在之前的项目中使用则需要升级 .net版本。

Pipeline能解决什么问题

在Pipeline出来之前,我们在C#中一般会使用Stream来实现相关的IO操作,比如使用NetworkStream从一个Socket中读写网络数据。比如下面这段代码就是一个简单的例子:

1
2
3
4
5
6
7
8
9
async Task Read(NetworkStream stream)
{
var buffer = new byte[1024];
await stream.ReadAsync(buffer, 0, buffer.Length);

//buffer handler
//...
}

每次尝试从网络中读取数据的时候,需要预先申请一块内存当buffer,然后再尝试从socket中读取数据写入到这块buffer中。通常来说我们需要做各种优化,比如buffer的循环使用,并且实际应用时,我们通常可能需要在另一个线程去使用这些从socket中来的数据。但是这样以来,又会牵扯到另一个问题–线程安全,并且与此同时还要保证性能,我们需要非常小心且聪明的编写我们的代码才能避免出现各bug。而Pipeline的出现,极大程度上帮我们减小了解决这个问题的负担。

如何使用Pipeline

在Pipeline库中,最基础的类型是Pipe,每个Pipe拥有Reader和Writer两个成员。顾名思义,Pipe.Reader负责从Pipe中读取数据,Pipe.Writer负责往Pipe中写入数据,就好比一根水管的两端,你可以从一段往里面塞东西,也可以从另一端取东西。并且Pipe底层的实现已经帮我们保证了Reader和Writer的方法的线程安全性。但是需要注意的是,由于目前 .net core相关文档的缺失,实际使用中我还是发现了一些坑。以代码为例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
private void readLoop()
{
Task.Run(async () =>
{
Exception error = null;
while (!Cancel.IsCancellationRequested)
{
try
{
var buffer = pipe.Writer.GetMemory(1024);
int bytesRecv = await tcpclient.Client.ReceiveAsync(buffer, SocketFlags.None);
if (bytesRecv == 0)
{
break;
}

// Tell the PipeWriter how much was read from the Socket
pipe.Writer.Advance(bytesRecv);
}
catch (Exception e)
{
error = e;
break;
}

var result = await pipe.Writer.FlushAsync(Cancel.Token);
if (result.IsCompleted || result.IsCanceled)
{
break;
}
}

pipe.Writer.Complete(error);
Close(PipelineCompleteException.Instance);
});
}

这是一段使用Pipeline从socket中读取数据的代码,在一个循环中我们首先从Pipe中申请一块内存,然后往这块内存中写入从socket中读取的数据。然后需要显式的调用Pipe.Writer.Advance()去告诉Pipe的另一端也就是Pipe.Reader我们写入了这么多数据,最后再调用Pipe.Writer.FlushAsync()。这样,我们就可以在别的线程使用Pipe.Reader.ReadAsync()来读取数据了。

1
2
3
4
5
6
7
var result = await pipe.Reader.ReadAsync(cancel.Token);
if (result.IsCompleted)
{
break;
}
//需要手动通知Writer我们已经读了多少,这些内存已经可以回收了
pipe.Reader.AdvanceTo(result.Buffer.End);

这是一段从Pipeline中读取数据的代码,需要注意的是在每次使用Reader.ReadAsync()之后必须要调用一次Reader.AdvanceTo(),不然是没有办法调用下次ReadAsync的。之所以这样做原因也很简单,Pipeline底层是有一块自己管理的buffer的,用户需要写的时候会从里面申请内存,用完了以后会放回去,这样才能做到循环使用避免造成过多的gc或者memory leak。另外因为AdvanceTo的语义是告诉Pipe.Writer我已经从Pipe中消费了多少数据,这部分数据的内存已经可以回收了或者说可以往里面写入了。如果同时有多个线程同时调用ReadAsync,那么每个线程读取到的数据指向的一块内存地址是可能会有重叠的部分,而当其中任意一个线程调用AdvanceTo的时候这部分内存地址就会被回收,可能会被写入新的内容,而其他的线程如果这时候还在尝试读取这块数据,就会出现各种奇怪的问题。另一个就是result的值是需要判断的,比如result.IsCompleted,这时候表示Writer已经完成所有数据往Pipe里的写入了,没有新的数据会被写入进Pipe里。这时候我们需要判断,并且决定接下来的行为(是继续等待还是结束)。

通过上面的例子我们不能看出使用Pipeline编写的代码无需我们手动添加任何lock或者手动管理内存,并且更重要的是它是原本较为复杂的代码结构变得非常的简洁和清晰。因为在我的经验中,任何涉及多线程的代码中,保持结构的简洁和清晰是非常重要的,一段各种循环/lock嵌套,同步异步方法混用的代码绝对是埋下的炸弹,往往只有出现bug的时候你才能发现原来这种当时没有考虑到(事实上你往往很难想清楚所有的情况)的情况会引发诸如xxx的问题。

更多的参考

如今自 .net core 2.1出来也已经过了一段时间,关于 .net core中的新内容,已经有很多人在尝试并且研究,其中不乏关于Pipeline的内容。下面是我了解并且推荐的的一些比较优质的博客:

  • David Fowler的这篇博客用一个简单的应用场景介绍了如何使用Pipeline以及和老的API相比它又有什么优势

  • Marc Gravell的Pipe Dreams系列博客,非常详细的介绍了Pipeline的用途以及他关于如何使用Pipeline的更深层次的思考