C++ 大规模并行 Socket 处理算法

标题:Алгоритм массово-параллельной работы с сокетами для C++

日期:2025/06/05

作者:Владислав Шпилевой

链接:https://www.youtube.com/watch?v=DK0bPIsyJY8

注意:此为 AI 翻译生成 的中文转录稿,详细说明请参阅仓库中的 README 文件。

备注一:造轮子现场,还 TLA+,挺高端的。为什么作者说 Asio 代码不可读?我看的部分感觉还好。

备注二:有时间我再进一步看下,应该挺不错的。


主持人: 今天,Vladislav Shpilevoy 将为大家介绍一个类似于 Boost.Asio 的东西,或者可以说是打了类固醇的 libuv。他会更侧重于算法本身,而不是具体实现,但其实现也是开源的,并且已经在实际的生产项目中使用。Vlad,请问这个演讲对于那些不打算编写自己的事件循环(event loop)——我已经忘了“loop”怎么翻译了——的人来说,会有用吗?

Vladislav: 绝对会有趣,至少我希望如此。首先,对于那些显然要去实现它的人。其次,对于那些仅仅好奇事件循环内部是如何工作的人,通过了解一些内部细节,你可能会学会如何更有效地使用它们——不一定是具体的 Boost.Asio,尽管我也会提到一些关于它的内容。第三,我希望这个演讲本身就足够有趣,即使对那些不打算写 event loop 或者已经对它们了如指掌的人也是如此。后面会有动画,请大家坚持看到最后。第四,即使你不打算写自己的 event loop,也许我的算法或实现中的某些想法会让你觉得有趣,可以借鉴到你自己的东西里,而这些东西甚至可能不是 event loop。也就是说,这次会议是关于 C++ 的,所以你可能不使用 uServer,但你仍然可以借鉴 Anton 今天早些时候分享的一些技巧。这里的逻辑是一样的。

主持人: 在我们进入演讲之前,我想简单介绍一下演讲者。Vlad 是系统编程课程的作者,他在大学里设计并讲授了这门课程,并且在接下来的几年里也由其他人继续讲授。他曾致力于解决与分片(sharding)、同步/异步复制、SWIM 谣言传播算法、Raft 领导者选举以及各种网络代码等相关的不同任务。今天 Vlad 有很多东西要和我们分享。让我们欢迎他!

Vladislav: 谢谢 Alexander 这样介绍我。如他所说,也如大家所知,我叫 Vlad。我写 C 和 C++ 代码,纯 C 我也写了很长时间,现在也还在写。这在某种程度上影响了我的编程风格,即使是在写 C++ 的时候。我已经写了10年多了。在这段时间里,我在不同领域工作过,包括数据库、游戏,现在具体是在广告行业,甚至在职业生涯早期还接触过一点卫星遥测。在所有这些领域,我都有很多机会和网络代码打交道。不知怎的,我总是接到那些需要提升网络代码性能或者从零开始实现某些功能的任务。

今天,我将向大家展示一个我自认为——主观上——在处理大规模 Socket 方面非常高效的算法。从程序员的角度来看,这构成了网络编程的大部分工作。我用 C++ 实现了这个算法,它完全开源,你们可以拿去看、去运行。你们可能以前没有见过类似的东西,至少我在实现它之前没见过。这不一定是说它有多好,我的意思是,你们都明白,同一个解决方案在不同的场景下可能更好,也可能更差。例如,很多人可能知道,冒泡排序(Bubble Sort)可能比快速排序(Quick Sort)更快,这取决于你的数组有多大,以及它已经有多大程度的有序性。所以,没有完美的万能解决方案。但在我工作过的那些场景和项目中,这个算法被证明比我当时能用的其他方案更快、更方便。

我需要强调,今天我们专注于算法。也就是说,我不会深入探讨实现的内部细节。我认为这样做价值不大,因为它已经完全开源了,任何人都可以自己去阅读具体的实现细节。几年前我的上一个演讲中讲过一些非常底层的细节。大家也可以问我问题。代码的文档非常详尽,我稍后会解释为什么它如此简单且文档齐全——这是有原因的。是的,我相信演讲中最有趣的部分应该是关于算法,而不是某个具体的实现。希望你们会喜欢。让我们开始吧。

今天的计划是这样的:

  1. 首先,为什么要在 C++ 中编写网络代码?

  2. 在 C++ 和 C 中,现有的用于大规模并行处理 Socket 的解决方案有哪些?

  3. 我提出的新算法是什么样的?

  4. 它的具体代码使用示例。

  5. 最后,我们自然要看基准测试(benchmarks),看看我写的东西到底有多大意义。

演讲结束后,二维码会变得可用,现在演讲是关闭的。链接里也会有这个完整的演示文稿。GitHub 上的项目已经开源,可以打开、查看和使用。

为什么要用 C++ 编写网络代码?

让我们从这个问题开始:为什么要用 C++ 写网络代码?这是我有时会问自己的问题,尤其是在当今这个有 Go 的 goroutine、Rust 的 Tokio、Java 的新型虚拟线程的世界里。为什么还要用 C++ 写代码呢?它很复杂,很容易出错。

在我的职业生涯中,我见过几个原因:

  1. 代码已经是用 C++ 写的了。 你没办法摆脱它,重写它需要数年时间,甚至根本不可能。例如,在我当前的项目中,有17万行 C++ 代码。重写它需要多少时间?里面有多少业务逻辑和测试?这不现实。或者我在 Tarantool 数据库工作,那里有30万行 C 代码和一些 C++,同样也基本不可能重写。

  2. 你可能非常自信,并且在 C++ 方面已经有深厚的专业知识, 可以很舒服地用它来编写新服务。这种情况也存在,可能在座的许多人就是这样做的。毕竟,这是一个 C++ 大会。

  3. 最后但同样重要的原因:你需要极致的性能。 据我所知,在榨干硬件最后一丝性能方面,C++ 仍然难以被超越。

今天我们专注于最后一个原因,因为其他的我们基本无法影响,但性能方面我们大有可为。

现有的解决方案

让我们看看我们开箱即用、在语言层面已经拥有的东西。不幸的是,C++ 标准库里什么都没有。但在 C 标准库里,有我们大多数人可能都熟悉的函数,比如打开 Socket 的 socket() 函数、send()recv()connect()listen()accept()bind() 等等。这是一个标准,到处都有。你甚至可能会惊讶,在 Windows 上也有,至少是以一种略微残缺的形式存在。

接下来,当你需要大规模处理 Socket,而不是像这样一次处理一个时,不同的平台就有各自的特殊处理方式了。例如:

  • 在 macOS 上有 kqueue

  • 在 Linux 上有 epollio_uring

  • 在 Windows 上有 I/O Completion Ports (IOCP)AIO Completion Ports,还有一个新的时髦玩意儿也叫 io_uring

这些都是很好的解决方案。实际上,用它们可以写出非常快、大规模并行的代码。但首先,它们不是标准的。也就是说,你为 kqueue 写了一个实现,就必须为 Linux 的 epoll 或 io_uring 再写一遍,如果需要的话,还要维护 Windows 版本。其次,这些都是 C 风格的 API,在 C++ 代码中有时看起来不太协调,除非你用自己的 C++ 包装器把它们隐藏起来。

原则上,这些包装器已经存在了,甚至不需要自己写。有很多项目会为你使用所有这些函数,并提供一个漂亮的 API。不幸的是,如果我们只想要那些流行的、经过实战检验、有社区支持、灵活的解决方案,那么选择就不多了。

我能说出几个,可能还有更多,但最流行和耳熟能详的有:

  • Boost.Asio:这基本上就是一个为异步网络编程设计的库,它就是对 epoll、IOCP 等的封装。这是一个非常棒的项目,说真的。它有庞大的社区,历史悠久,意味着它在无数公司和项目中经过了实战检验。使用起来也很方便,特别是当你理解了像 strandio_context 这样的概念后——这些是非常强大的东西。你可以在社区里提问,也有很多示例可以让你上手学习。而且,这是你会写在简历里的技能,也是招聘要求里会提到的东西,比如:“我们的项目使用 Boost.Asio,你必须了解它。”

还有哪些替代方案?

  • uServer:当然,在这次会议上它已经被提到了,之前的会议也都有人讲。它不仅仅是一个网络库,而是一个用于开发后端的完整框架。它们有数据库连接器、协程,功能非常强大。

  • Seastar:这是 ScyllaDB 的一个项目,同样用于编写异步后端。它从 ScyllaDB 中分离出来,现在似乎可以独立使用。

  • 还有一大堆小一些的解决方案,比如 libuv,它与网络相关但又不完全是,还有 ThriftgRPClibevlibevent。可能还有很多其他的。

如果已经有这么多解决方案,尤其是已经有了 Boost.Asio,我今天到底在讲什么呢?也许我们现在就可以结束了,我告诉你们:“去用 Boost.Asio 吧。”

我真的会这么告诉你们:如果你只是需要一个正常的网络框架/库,能够覆盖你绝大部分的需求,那就去用 Boost.Asio,别折腾了。 当我可以的时候,我个人也会这么做。因为这样更容易为项目招到新的开发人员,他们可能在加入之前就已经了解 Boost.Asio 了。而且我们都知道,Boost 里的东西最终会进入 C++ 标准。也许 Boost.Asio 里的某些东西也会被标准化,这样你就能更容易地把代码移植到标准库。

但很不幸,虽然像 Boost 这样的解决方案覆盖了 99.999% 的情况,但总有那剩下的 0.001% 的情况,什么都帮不了你,你必须自己发明轮子,或者用一些非常小众的东西。

让我们看看在我之前列举的那些项目中,可能会有哪些原因(导致无法使用现有方案)。我希望我不是唯一一个遇到这些问题的人,也许我只是个糟糕的程序员,但是:

  • Boost.Asio 的代码对我来说是不可读的。 当我打开它时,我无法理解里面发生了什么。有文档,有代码,当非常需要的时候,我可以花时间深入研究,理解某个具体细节的作用。但我个人很难理解 Boost.Asio 的整体架构。

  • 这不是我当时发明这个算法时不能用它的主要原因。为什么我不能用 uServer它不支持 Windows。 这对你来说可能不那么重要,但在游戏开发(GameDev)行业,Windows 甚至在后端服务器上都用得很多。你就是需要它。

  • Seastar 也不支持 Windows 和 macOS。macOS 作为开发环境非常方便。我喜欢在 Mac 上写代码,当我的代码能在 Mac 上运行时,我会觉得很方便。Seastar 不支持。但 Windows 是一个硬性阻碍。

  • Thrift/gRPC 强迫你使用它们自己的二进制协议。但也许你的项目已经有自己的协议了,比如在 Tarantool 里是 MessagePack。我不知道怎么把 Thrift/gRPC 移植到 MessagePack 上,也许有办法。

  • libuv/libevent 是单线程解决方案,而且是 C 语言的。至少 libuv 是 C 的,libevent 我不确定,但它也是单线程的,据我所知。

但是,所有这些原因,在一个终极理由面前都黯然失色,那就是上面直接告诉你:“不行”

比如,当我在那家游戏开发公司工作时——我不确定我是否能说出公司名字,因为我已经不在那里了——但情况是,他们有自己的标准库。公司有一万名员工,有专门的部门负责开发这个库。你不能从外部引入任何东西,除非它几乎已经是直接的系统调用了。还有各种小众平台,比如 PlayStation、Xbox、任天堂的各种设备。它们大多数的 API 或多或少都是 Linux,但它们有自己的一套你必须使用的 API,而且看起来和任何东西都不像。

我当时遇到的就是这种情况。所以,当别人直接告诉你“不行”时,其他一切都变得不重要了。

于是,我发明了自己的算法和实现,我对它有以下几个具体要求:

  1. 公平性 (Fairness):在线程间负载分配的公平性。我希望当我有大量 Socket 和大量线程时,所有线程的 CPU 使用率都大致相同,而不是一个线程 100%,其他线程 30%。这意味着我不能将 Socket 固定(pin)到某个线程上。也就是说,我不能用轮询(Round-Robin)的方式接收 Socket 并将它们分配给某个特定线程,然后就一直留在那里。在游戏开发中,会话的持续时间长短不一。例如,我最初为之设计这个东西的项目甚至不是游戏服务器,而是一个处理存档的服务器。请求的持续时间可能从几微秒到几百毫秒不等。如果一个线程上堆积了大量这种“重”请求,而其他线程上很少,那会怎么样?我需要 CPU 分配的绝对公平。

  2. 从某种意义上说,这就引出了下一个需求:我需要支持协程 (Coroutines)。具体来说,不是协程的实现本身,而是协作式多任务 (Cooperative Multitasking)。也就是说,当一个任务(Task)/Socket 在一个线程上开始执行,做了一些工作,然后想要等待某件事——比如等待 Socket 变为可读(readable),或者某个定时器到期——我希望能够将这个任务从线程上卸下,让这个线程去做别的事情。也就是说,我需要 yield 或者 co_await,或者随便你怎么称呼它。

  3. 这又引出了下一个需求:我需要事件支持。想象一下,一个任务在等待它的 Socket 变为可读。我希望能在“Socket 变为可读”这个事件发生时唤醒它,或者在“等待 Socket 可读的定时器超时”这个事件发生时唤醒它。也就是说,我需要能够即时唤醒那些进入休眠状态的任务。这意味着我需要能够对事件做出反应。

所有这些,我先是构思了一个任务调度器(Task Scheduler)的算法,然后用 C++ 实现了它。它的代码其实并不复杂,这好像是我第二次这么说了。因为在那家公司,初级(Junior)开发人员非常多,代码必须非常简单,以便人们能够轻松理解和使用。它的文档非常详尽,注释比代码还多,而且是那种解释“为什么”的注释,而不是“是什么”的注释。

算法概览

现在我将从宏观上向你们展示这个算法,不深入探讨那些超级细节。细节我们可以稍后讨论,因为时间有限。

想象一下,你有一个进程,里面运行着几个线程,它们在做着不同的工作。其中一个线程里,比如,有一个 TCP 服务器在接收 Socket。每个 Socket,它都会包装成一个叫做 Task 的对象。然后,这些 Task 需要以某种方式进入调度器(Scheduler),以便在那里被执行——读取、写入等等。它们将在调度器中沿着一个特定的管道 (Pipeline)流水线 (Conveyor Belt) 一步步前进。

  1. 前端队列 (Front Queue):流水线的第一步是 Task 进入一个前端队列(Front Queue),它们在这里集合并等待处理。

  2. 等待队列 (Waiting Queue):接下来,Task 从前端队列中被取出,检查每个 Task 在等待什么事件,它们的截止时间(deadline)、超时(timeout)等等是什么。然后它们被移动到一个等待队列(Waiting Queue),在这里它们会等待某件事发生。它们会一直待在这里,直到某个事件发生,在这里累积。

  3. 就绪队列 (Ready Queue):同时,在前几轮调度的过程中,等待队列里可能已经有一些 Task 的超时时间到了,或者 Socket 变为可读,或者 Socket 上出现了错误,或者连接成功了。这些 Task 会被从等待队列中取出,与前端队列里那些不需要等待任何东西的蓝色 Task(这种情况有时会发生,比如 Socket 已经是可读或可写的)合并在一起。这一堆 Task 会移动到流水线的第三步——就绪队列(Ready Queue),在这里它们会等待被某个线程执行。

你可以为调度器创建多个工作线程 (Worker Threads),它们会从就绪队列中取出 Task 并执行它们。所谓的“执行”,就是指处理 Socket、调用你的回调函数、设置新的截止时间等等。

执行完毕后,一些 Task 可能会终止(比如 Socket 关闭了,或者它只是一个一次性的回调),而另一些——通常如果是 Socket,你会想继续读取数据——它们会再次回到循环中,通过 Front Queue -> Waiting Queue -> Ready Queue,直到 Socket 关闭或发生其他事情。

总的来说,这就是整个算法。我希望它看起来不那么复杂。

可能你们中的一些人已经开始有疑问了:这些工作线程在执行 Task,那谁在执行调度 (scheduling) 呢? 谁在移动那些矩形框里的小圆圈呢?

这也是由一个线程完成的,但不是一个固定的线程。我非常不希望有一个专门扮演“调度者”角色的线程,因为这会增加延迟,上下文切换等等……解释起来很长。

取而代之的是,其中一个工作线程——实际上是所有工作线程一起——会周期性地竞争成为调度者的权利。你可以把这个调度过程想象成一把锁,这把锁只有一把钥匙。调度器的线程们会周期性地尝试获取这把钥匙。谁拿到了,谁就去执行调度工作,然后去帮助其他线程执行 Task,这取决于哪个线程何时空闲。所以,这是一个非常灵活的机制。即使只有一个线程,调度器也能工作,因为这个线程会先去调度,然后执行 Task,然后再去调度,再执行 Task,如此循环。

让我们看一个更具体的例子。假设我们有5个 Task 想要被执行,有3个工作线程。其中一个线程拿到了调度的钥匙,进入调度器,等待 Task 的到来。Task 还没来。过了一会儿,它们进入了前端队列。这个线程注意到了它们,它被 condition_variable::notify 或者其他方式唤醒了。它开始检查这些 Task,看每个 Task 想要什么、等待什么、要等多久。假设它发现有两个 Task 在等待某些东西,它就把它们放入等待队列。另外三个 Task 已经准备好执行了,它就立即把它们发送到就绪队列。其他两个工作线程已经开始取走它们了,然后这个调度线程也跑过去加入它们,拿起剩下的 Task,一起处理。

过了一段时间,假设有两个 Task 完全完成了,被删除了。突然,等待队列里的 Task 变得就绪了——比如 Socket 变为可读,或者超时了,不管什么原因。但暂时没人注意到,因为调度锁是空的,没人持有它,没有线程在进行调度。但在某个时刻,一个空闲下来的线程会拿起钥匙,跑到调度器里,然后发现:“哦,这里有就绪的 Task!”它会把它们移动到就绪队列,然后其他工作线程会把它们取走执行,或者它自己先跑到那里取走,看谁更快。它们执行完毕,我们就回到了起点。调度器就会这样循环工作,直到你在应用程序结束时销毁它。

唤醒机制 (Wakeup)

另一个有趣的细节。我已经提过几次“唤醒 (wakeup)”这个概念,我需要事件支持来快速唤醒 Task。有时我不仅需要根据“Socket 可读”或“超时”这类事件来唤醒它,有时我需要 просто (just) 做一个 wakeup——也就是把它从等待队列的任意位置拽出来,移动到就绪队列。

想象一下这种情况:你有一个 Task 在处理 HTTP 请求。还有另一个 Task 向第一个 Task 发送了一个 HTTP 请求,并希望在10秒后收到响应时被唤醒。通常响应会先到,所以你需要提前被唤醒,而不是等满10秒,否则延迟会太高。

这个唤醒机制实现起来其实不那么容易。想象一下,等待队列里有很多 Task,比如一百万、一千万个。现在,你需要唤醒其中一个随机的 Task。你必须快速定位到它。你不能在每次调度迭代中都去全扫描(full scan)这几百万个 Task,无论你的等待队列用的是什么数据结构,这都会太慢。并且,假设只有一个线程在做调度,没有一个专门的线程一直坐着全扫描这个队列。

当没有唤醒请求时,调度线程就只是等待这些 Task 中最近的一个截止时间。如果它们都没有截止时间,它可能会无限等待。

现在,假设我想唤醒几个 Task。在这里,我用了一个可以称之为巧妙的算法技巧,也可以称之为“黑科技” (hack)。当我想唤醒某些 Task 时,我向前端队列发送一个特殊的标记 (marker),这个标记总是对应一个具体的 Task。这个标记进入前端队列会唤醒调度线程(同样,通过 condition_variable::notifyeventfd 或任何你喜欢的方式)。调度线程会将这些标记与具体的 Task 匹配上。这些 Task 已经告诉了调度线程它们在等待队列中的位置。调度线程会把它们从那里拽出来,移动到就绪队列,然后去执行它们。

这个操作在最好情况下是对数时间复杂度 (O(log n)),具体取决于你的等待队列用的是什么数据结构。

我不只在显式唤醒时使用这个机制,我还用它来处理网络。例如,我可以在调度器内部添加 epoll、io_uring、IOCP、kqueue。它们会从内核批量地给我发送事件。我可以将这些事件匹配到具体的 Task,然后用同样的方式,通过发送标记来唤醒它们。它就是这样工作的。

实现细节

关于这个算法的实现:

  • 代码少于 2000 行,包括注释(如我所说,注释比代码还多)、空行、括号和其他格式化细节。

  • 它几乎是无锁 (lock-free) 的。为什么是“几乎”,你可以稍后问我。

  • 它对内存要求不高。每个 Task 会花费你少于 100 字节(如果你用的是 Linux 和 epoll)。

  • 非常简单。如我之前提到的,一个要求是初级开发人员必须能看懂,所以它在所有可能的地方都做了最大程度的简化。

  • 一个不错的额外好处是:它经过了形式化验证 (formally verified) 以确保其正确性。这意味着,有一种叫做 TLA+ 的语言,它看起来像数学公式。你在那里用公式描述一个状态机,比如状态集合之间的转换,各种集合关系等等。然后,你用公式描述不变量 (invariants),例如,“算法总是会终止”,“每个 Task 最终都会被执行”,或者“没有 Task 会被执行两次”等等。然后,这个语言的运行时会全扫描 (full scan) 你的算法所有可能的状态,并在每个状态上检查不变量。很多复杂的算法,比如 Raft,就是通过 TLA+ 验证的。也可以用它来验证更简单的东西,比如业务逻辑,不一定非得是底层代码。

支持的配置:

  • 编译器:Clang, GCC, MSVC

  • 平台:macOS, Windows, Linux

  • 网络后端:epoll, io_uring, kqueue, IOCP (暂时没有 aio_uring)

  • 架构:ARM, x86-64 (64-bit)

  • 编译器要求:至少 C++17(如果用我的实现的话)。但如我所说,我认为这里的价值主要在算法,而不是我实现的某些细节,所以这对你可能不那么重要。

开箱即用的功能:

  • 在我的实现中,已经有了 TCP 客户端和服务器。

  • 在此之上还有 SSL/TLS。

  • 原则上,基于这些已经可以构建任何东西了。

代码示例

让我们看看它在代码里是什么样的:这些队列、Task、Socket 的读写。

幸运的是,所有示例也都在开源项目中,你可以自己编译、修改、测试,并把它们作为你应用程序的基础。每个示例下面都有一个链接可以找到它。

1. 最简单的 Task Scheduler

我们从最简单的开始,就是一个 TaskScheduler。我在栈上创建了它,只有一个工作线程。我向它提交了一个 Task。这个 Task 就是一个 lambda 函数,它打印一些东西,然后删除自己。

/* 生成代码,仔细甄别 */

// 示例:最简单的 Task Scheduler
TaskScheduler scheduler(1); // 1 个工作线程
scheduler.post(new Task([](Task* self) {
    std::cout << "Hello from task!" << std::endl;
    delete self;
}));

原则上,不可能有比这更简单的了。我希望到目前为止一切都还清楚。

2. 多步任务 (Callback Hell)

让我们把它弄复杂一点。如果它只是这样,那它就是一个执行回调的线程池。但我本来是希望我的 Task 能够 yield、接收事件等等。

让我们给 Task 设置几个步骤。想象一下,一个 Task 想要发送一个请求 (request),接收一个响应 (response),然后结束。这会是三个独立的步骤。在这些步骤之间,我希望 Task 能做 yield,这样当这个 Task 在等待时,线程可以去做别的 Task

另外,Task 通常会有一些上下文 (context)。例如,如果 Task 是一个用户请求,你会有用户数据,比如 HTTP 头、查询参数、用户 ID 等等。总之,Task 通常有上下文。你可以用不同方式添加上下文,可以通过继承来添加成员变量,也可以像上一个例子那样通过 lambda 捕获来捕获你的上下文,甚至都不用写一个类。

在这个例子里,我想展示用类也可以。这个 Task 的第一个回调是 send_request。让我们看看它执行时会发生什么。

在这个第一步里,为了简单,我没有真的发送请求,只是打印了一些东西,并准备下一步:将这个 Task 的回调函数改成 recv_response,然后通过 front_q 把这个 Task 重新提交回调度器

/* 生成代码,仔细甄别 */

struct MultiStepTask : public Task {
    void send_request() {
        std::cout << "Sending request..." << std::endl;
        set_callback(&MultiStepTask::recv_response);
        scheduler.post(this);
    }

    void recv_response() {
        std::cout << "Receiving response..." << std::endl;
        set_callback(&MultiStepTask::finalize);
        scheduler.post(this);
    }
    
    void finalize() {
        std::cout << "Finalizing." << std::endl;
        delete this;
    }
};

在某个时刻,它会再次被执行,可能在另一个线程上,也可能在同一个线程上,但它让出了位置。在它这两个步骤之间,线程们去做了别的 Task,这就是协作式多任务。

第二步也是一样,打印点东西,设置第三步,把自己提交回调度器。它执行我们的第三步,对我来说这是最后一步。之后你可以对这个 Task 做任何事,可以把它放进池里复用,也可以删除它。原则上,从这一刻起,Task 的所有权又回到了你手上,你可以对它做任何事。

我把它启动到调度器里,这里我甚至是在栈上声明的,这样也可以。它工作了,打印了所有这些东西。

可能有人会觉得刺眼,这些回调函数要设置好几次,这真的看起来不太好。至少我不太喜欢这种所谓的“回调地狱 (callback hell)”。

3. 多步任务 (Coroutines)

幸运的是,在 C++20 中出现了协程(或者说不幸,看它们长什么样)。虽然是无栈 (stackless) 的,但我们仍然可以从中获益。

在这种情况下,我可以摆脱除了一个以外的所有回调。对于不熟悉的人来说,要在 C++ 中创建一个协程,你需要做一些准备工作。你需要定义一个 promise 对象,里面有它自己的一些方法。在我的这个库 serverbox 中,它已经定义好了。我只需要返回它。

在 C++ 中,要让一个函数成为协程,你需要返回一个 promise 对象,并且在函数体内至少使用一次 co_await。然后编译器就会把它变成一个协程。

这里我返回了一个 promise,我把它命名为 coro,我觉得这很合逻辑,协程就该返回一个协程对象,而不是 promise。我使用 co_await 来代替重设回调。本质上,逻辑和之前一样,但我不需要一直调用 set_callback

/* 生成代码,仔细甄别 */

struct CoroutineTask : public Task {
    Promise<void> execute() {
        std::cout << "Sending request..." << std::endl;
        co_await yield{}; // yield 将任务放回队列

        std::cout << "Receiving response..." << std::endl;
        co_await yield{};

        std::cout << "Finalizing." << std::endl;
        delete this;
    }
};

co_await 会把我的 Task 放回 front_q,它会走遍所有队列。在此期间,线程可以去做别的 Task。这当然是线程安全的,不会出现第一步和第二步并行执行的情况。co_await 只是把你暂停,可能会把你移到另一个线程,或者过一会儿在同一个线程上恢复,但整个过程是顺序的。所以,你可以在协程体内使用 task-local 的上下文。同样,你可以删除、重用、池化这个协程。

4. 任务间交互

让我们再把它弄复杂一点。在实践中,至少在我遇到的情况里,Task 之间需要交互,而不是每个 Task 自己在线程里跑来跑去。

让我们做一个这样的例子:有一个 Task,它想为另一个 Task 创建一些异步工作。我希望在工作完成后唤醒第一个 Task,让它处理结果。

我设计了这样一个函数 task_submit_request,它接收一个原始 Task,为它创建工作,并在工作完成后唤醒它。我通过一个临时的 Task 来实现,但你也可以一次性创建这个临时 Task,或者维护一个 Task 池,看你怎么方便。

它的函数体是这样的:它会模拟工作一秒钟,就好像它在做一些真正的工作。当然,我不想在这里用任何 sleep,因为那会阻塞整个线程。取而代之的是,在调度器里,你可以使用自己的截止时间。调度器会把你放在等待队列里一秒钟,当一秒钟过去后,再把你放回就绪队列。

/* 生成代码,仔细甄别 */

// 模拟工作的任务
void worker_task_body(Task* self, Task* original_task) {
    self->set_deadline_sec(1); // 设置1秒后超时
    self->set_callback([original_task](Task* self) {
        // 工作完成,唤醒原始任务
        original_task->post_signal(); 
        delete self;
    });
    scheduler.post_wait(self); // 提交并等待截止时间
}

// 原始任务的协程体
Promise<void> original_task_body() {
    // ...
    submit_request(this); // 提交异步工作

    // 等待结果
    while (!has_result) { // 偏执地防止伪唤醒
        co_await wait_signal{}; // 等待信号
    }
    
    // 处理结果...
}

也就是说,你设置一个截止时间,然后 yield。当截止时间到了,你就会被唤醒。当它被唤醒时,我通过 post_signal 方法向原始 Task 发送一个信号。它会注意到这个信号,醒来,并处理结果,不管结果是什么。

原始 Task 看起来会很简单。它提交工作,然后在一个 while 循环里等待结果。因为我对于伪唤醒 (spurious wakeups) 有着绝对的偏执,我总是把等待操作包装在一个循环里。在这个具体的例子里,本来可以直接做一个 wait,设置一个无限的截止时间,然后 recv_signal,你就在等待队列里等,直到收到信号。然后处理结果,做你想做的。

5. 网络编程与 EvCore

到目前为止,我们还没有看到网络。这个演讲叫“正确处理 Socket”,但我们还没见过 Socket。现在我们准备好来看它们了。

为此,你需要另一个东西,叫做 EvCore。它几乎和 TaskScheduler 一样,但它的 Task 里面有 Socket。从算法上讲,它们几乎是相同的,只是在实现上有一些差异,所以我把它们分成了两个不同的调度器,但算法是一样的。

我用三个工作线程启动它,它们暂时是空的。然后我启动一个异步服务器,它绑定到一个端口,然后被“池化”到 EvCore 中,在那里它会一直存在,并在客户端被接受 (accept) 时传递事件。现在还没有客户端。

现在有了。异步客户端,它们在创建时就立即被发送到 EvCore 中。甚至都不需要保存它们的指针。new 了一个就忘了它。它们会连接到服务器,并在那里做一些事情。然后我等待它们完成工作,最后所有东西都被删除。

让我们从服务器开始。EvCore 的目标是为你传递来自 Socket 的事件。服务器有事件,比如 TCP 服务器会接受客户端,它有一个 on_accept 事件。要接收这类事件,你需要继承自 TcpServerSubscription,也就是你订阅了 TCP 服务器的事件。

然后你需要创建这个服务器。它的构造函数和 Boost 有点像。顺便说一下,这个 EvCoreTask 和 Boost 的 strand 有点像。熟悉的人可能知道,strand 是什么。这些 Socket 几乎就像 strand,只不过 strand 已经内置在它们里面了。我稍后解释这是什么意思。

我们创建这个服务器,首先传给它它将要“生活”在其中的 EvCore。但此时它只是被创建了,还没有真的在那里运行,不传递任何东西。在此之前,你必须把它绑定到一个端口。例如,可以绑定到端口0,这样你可以在它启动之前就知道它实际绑定的端口。

要启动它,你调用 listen()。从这一刻起,你就会开始接收到与这个服务器相关的事件。服务器只有一个主要事件,那就是 accept。相应地,当客户端连接时,你会收到一个 on_accept 事件,里面已经有了 Socket 和对端的地址,你可以随意使用它。

/* 生成代码,仔细甄别 */

// 服务器端
class MyServer : public TcpServerSubscription {
public:
    void on_accept(int client_fd, const sockaddr* addr) override {
        // 创建一个 Peer 来处理这个新连接
        new Peer(ev_core, client_fd);
    }
};

这里的 client_fd 只是一个文件描述符。只是在 Windows 上它看起来不一样,所以我把它做成了一个独立的类型,但本质上它就是个描述符。我处理这个事件的方式就是创建一个 Peer 类,它也异步地坐着,接收或发送来自那个远程客户端的数据。

PeerClient 看起来几乎一样,所以为了简单,我们只看客户端。

它和服务器很像,这里的模式对于所有这些 Socket 都是一样的。你再次需要订阅 Socket 事件,继承自 TcpSocketSubscription。客户端 Socket 的事件会更多一些:on_connecton_connect_erroron_sendon_recvon_send_erroron_recv_error。你通过重载函数来定义你需要哪些事件并处理它们。

我们创建 Socket,传给它它将要生活的 EvCore。打开 Socket 时可以选一些选项,但我这里不需要,就用默认的。然后你选择要连接到哪里。这里我连接到我之前启动服务器的端口,但我原则上可以给 endpoint 传任何域名,它会在 EvCore 里被解析成 IP,然后去连接。但我这里用字符串更简单。

一旦我调用 post_connect,Socket 就飞到 EvCore 里去了,开始在那些线程里生活,并开始向你传递事件。

我这里主要关心三个事件:

  1. on_connect:当连接成功建立时,我会收到 on_connect。我做的第一件事是异步发送一个握手(handshake)字符串,然后异步接收一个字节,作为对端确认收到握手的信号。

  2. on_recv:我等待有东西过来。当有东西过来时,我会收到 on_recv 事件。原则上,如果这里是些数据,我可以读取这个流,解码一些消息,解析 HTTP 请求之类的。在这个例子里,我的逻辑很简单:一旦我收到确认握手已送达,我立即关闭连接。

  3. on_close:关闭也是异步的。因为不幸的是,在某些平台上,直接关闭 Socket 是不够的。例如,在 Windows 上,你不能直接关闭 Socket,因为之后你还会收到异步请求的结果,比如在你关闭之前发起的、但还没完成的 recvsend。或者在 epoll 里,谁不知道的话,直接关闭 Socket 也不好,因为……我们可以之后再谈,它可能会永远留在那个 epoll 实例里,如果你在没等到各种东西的情况下就关闭了它。但无论如何,这个东西是平台无关的,它最终会给你传递一个 on_close 事件,此时清理所有东西就是安全的了。我在这里删除 Socket,也连同 Socket 一起删除这个客户端。

原则上,你可以在这里做重连 (reconnect)。这在实践中其实非常方便。比如,你有一个到数据库的连接,你想在整个应用程序生命周期内都保持它。我见过代码里,人们做的是:连接死了,某个线程注意到了,创建一个新连接,带着各种超时。在这里,你可以直接在客户端内部通过一个定时器来做重连,比如一秒后。你配置好这个,TcpSocket 的 API 里直接就支持,非常方便,大大减少了做重连的代码量。

6. Pipeline: TaskScheduler + EvCore

这基本上就是简单的 TCP。现在我们来试试最真实的场景,这个场景在逻辑上和我最终在那个项目中实际使用的非常相似。

这就是流水线 (Pipeline),在这里 TaskSchedulerEvCore 互相协作。也就是说,EvCore 直接处理网络,与 Socket 打交道,而我们用 TaskScheduler 来处理业务逻辑。这样,我们不会把代码混在一起。我非常讨厌 API 过载,当一个调度器什么都干的时候,它看起来会……总之会很吓人,会像 Boost.Asio 那样。我不想那样,所以我喜欢把东西做得小而简单,然后可以把它们组合起来。

让我们设想这样一个场景:我们有一个代理(proxy)或前端服务器,它接收来自具体用户的请求。请求是算术运算,比如 a + bc * d 之类的。但问题是,我的代理服务器不会算数,它必须把这个请求发送到一个后端计算器服务。为了加两个数,我需要用那个服务。它会把这些请求作为子请求发送到一个隐藏的后端服务器集群。

所以,我有一个用户请求,业务逻辑会发出网络子请求,到数据库或者到我的这个计算器后端。

为了访问计算器后端,我写一个 CalculatorClient。我会把它做得和我之前例子里的 MyClient 类似:TcpSocketSubscriptionon_recvon_send 等等。它的公共 API 会非常简单:接收一个操作、两个参数和一个回调函数,当操作完成时,这个回调函数会被带着结果调用。当然,这一切都是异步的。

我的业务逻辑会是这样的:一个用户请求 (UserRequest),它在创建时会拿到一个 CalculatorClient。可能我整个应用只有一个客户端,或者我有一个客户端池,总之我不想为每个请求都创建一个。它还会拿到它将要生活的 scheduler。你可以有多个调度器。在我的项目里,有一个调度器用于业务请求,一个用于数据库,还有一个用于别的什么,我忘了。但重点是,调度器不是单例 (singleton)。你可以有多个 TaskScheduler,也可以有多个 EvCore,用于不同的事情,这样它们就不会为了 CPU 时间而互相竞争。

然后我立即把这个 Task 提交到调度器。它的主体 (execute) 会非常简单,为了方便,我把它做成了一个协程,这样就不用和回调纠缠了。

/* 生成代码,仔细甄别 */

// UserRequest 的协程体
Promise<void> UserRequest::execute() {
    // 1. 向后端计算器发送请求
    calculator_client->calculate(op, a, b, [this](int result) {
        // 3. 收到结果,保存并唤醒自己
        this->result = result;
        this->post_signal();
    });

    // 2. 等待结果
    while (!has_result) {
        co_await wait_signal{};
    }

    // 4. 处理结果...
    // ... 然后删除自己或重用
}

函数体是这样的:第一件事就是向后端计算器服务器发送请求,传递一些参数和一个回调。然后等待请求完成。再说一遍,我非常害怕伪唤醒,所以我通过一个循环来做,坐着等,直到从我的回调里收到信号。在此期间,这个线程可以去做别的 Task,因为我 yield 了。

当响应回来时,我的完成回调会被调用,我在里面保存结果,把它藏到某个地方,然后唤醒我的原始 Task。然后它会处理这个结果,并把自己删除,或者重用,或者再发一个请求等等。

这样我们就得到了 TaskSchedulerEvCore 的一种共生关系。

  • 第一个我们用于业务逻辑,它为此非常简单。

  • EvCore 我们用于网络操作,在它里面网络操作也非常简单。

从算法上讲,它们是相同的。当然 EvCore 会稍微重一点,因为它要处理 Socket,TaskScheduler 里没有 Socket。也就是说,EvCore 还需要维护那个 epoll、kqueue 或者在对应平台上使用的东西。另外,EvCore 不支持协程,因为它的 Task 没有“主体”,只有你可以响应的事件。其他方面,它们在算法上几乎是一样的。

基准测试 (Benchmarks)

让我们看看基准测试。

我在这些基准测试中的一个主要目标,或者说主要目标之一,是让它容易执行。因为当我要花很长时间准备运行基准测试、编译、启动、手动比较结果时,我感到非常烦躁。我希望只用一行命令就能完成,然后得到一份带结果的报告。

我在这里成功做到了。不仅如此,基准测试还是比较性的。也就是说,我个人觉得非常方便的是,当我在调度器里修改了某些东西后,我可以运行一个基准测试,用旧的调度器对阵新的,或者旧的 EvCore 对阵新的,在一系列场景下,然后得到一个结果,比如:“没什么变化”,“快了两倍”,“慢了50%”之类的。当然,用户也可以这样做,但最初的目标是为了最大程度地简化我自己的开发。

特别是,我可以比较不同的实现。也就是说,我可以把 TaskScheduler 和某个只有一个队列和一个互斥锁 (mutex) 的线程池进行比较,或者把 EvCore 和 Boost.Asio 进行比较。让我们看看我比较出了什么。

Task Scheduler 基准测试

对于 TaskScheduler,我没怎么找到一个像这样纯粹的、没有 Socket 的调度器的对标物。所以我自己写了一个非常简单的线程池,就是字面上的 std::thread 池,执行回调,一个队列,一个互斥锁。没有 Waiting Queue,没有 Ready Queue,没有所有这些开销。我很好奇,我加了所有这些逻辑,到底增加了多少开销。

结果是这样的:

  • 场景一:空任务。 回调什么都不做。我想测试竞争 (contention)。当任务是空的时候,线程会在就绪队列上不停地撞头,你会得到就绪队列上最大的竞争,最大的冲突,缓存行失效等等。

  • 这里我们得到了有趣的结果。基本上,在50个线程时,有超过100倍的加速。线程池的性能退化到了每秒几万个任务,而 TaskScheduler 保持在数百万。

  • 场景二:更真实的场景。 Task 还是会做点事情,而不是创建了就被删除。我做了这样一个场景:Task 忙几微秒,然后做 yield,或者被删除,或者被重新调度等等。

  • 结果变得不那么惊人了,但仍然比普通的线程池快很多。

但这都是没有网络的,所以我们再看看网络基准测试。它们要复杂一些,可以想出更多的场景。我这里只有一个场景,但它的可配置性非常强。

网络基准测试 (EvCore vs. Boost.Asio)

场景模板是这样的:你有一个 echo 服务器。客户端发送一条消息,服务器解析它,重新打包,然后原样发回。

实际上,用这个简单的模板你可以衡量很多东西。比如,你可以改变消息的大小,就当它是一个数据块,解码是瞬间的,但它会占用大量网络带宽。或者你可以让消息里有很多很多字段,解码会很慢,这样你就可以模拟比如 SSL 解码很慢的情况。或者你可以一条接一条地发消息,也可以一次性发100条在途 (in-flight),让管道里一直保持有100条消息。这些东西都可以调整。

另外,我们现在有了很多平台:Linux 有自己的 epoll 和 io_uring,Windows 有 IOCP,macOS 有 kqueue。我们在这里对所有这些可能的组合都做了基准测试。它们都可以在线以漂亮的 Markdown 报告形式查看,顺便说一下,运行基准测试的工具也会为你生成这些报告。

我们只看几个,因为时间不多。

  • Linux + epoll: 我们看到,我试了单线程、多线程、大消息 (100KB)、小消息 (128B)。在所有情况下,Boost 都严重落后。尽管我非常努力地想让它变得高效。我用的是当时最新的 Boost 1.84,大概是去年的,或者最多是前年的,但它相当新。然而结果就是这样。

  • Linux + io_uring: 我们新的希望,它应该能加速一切。结果相似。虽然可能有人已经注意到了,它的性能为什么比 epoll 还低。我们稍后会回到这个问题。

  • Windows + IOCP: 结果也相似。不过,并非总是 EvCore 更快。有一个场景,Boost.Asio 稍微快一点。我们可以在最后讨论这个。

  • macOS + kqueue: 再次是相似的结果。不知为何,Boost 也没那么快。

更多基准测试可以在链接里找到。主要目标当然是,我运行了这些基准测试,它们告诉你一些东西,但它在你自己的硬件上会怎么工作呢?如果我看到这个,它对我来说什么也说明不了。所以,这些基准测试,理想情况下,如果有人感兴趣,最好在自己的硬件上运行一下,看看你个人能得到什么样的性能。例如,那台 Mac,那台笔记本已经有11、12年历史了,我在上面运行测试,结果不是很具代表性。

关于 epoll vs. io_uring

在演讲快结束时,关于 epoll vs. io_uring。

最初我做这些的时候,我并不支持 io_uring。我觉得没什么意义。但后来有人说服我,它那么快,没有系统调用(syscall)。如果有人不知道,epoll 是这样的:你把 Socket 注册到一个 epoll 队列里,然后内核会批量地给你发送来自很多很多 Socket 的事件。然后你单独处理每个事件。比如内核告诉你,这个 Socket 可读了,这个可写了。然后你必须为 sendrecvconnect(如果是异步连接)等单独做一次系统调用。也就是说,有很多系统调用。

在 Meltdown 漏洞之后的世界里,系统调用突然变得相当昂贵,尽管可能以前上下文切换就很贵了。

io_uring 是这样的东西,Windows 其实早就发明了类似的东西,至少 API 看起来和 IOCP 很像。你不是注册 Socket,而是把任何可以处理文件描述符的系统调用,推送到一个用户空间的队列里,但内核知道这个队列。比如,你往队列里塞了一千个 send 操作,它们是一些描述操作的结构体,比如 sendrecv。然后用一次系统调用,你把这一切都推送到内核,它用这一次系统调用就能执行那一千个 send

这听起来多酷啊!甚至可以完全没有系统调用。可以开启 io_uring 的一种模式,内核会启动一个线程,持续轮询 (poll) 这个队列,这样你就可以在没有系统调用的情况下处理 Socket 了。

然而,不知为何,我的测试结果是 io_uring 比 epoll 慢了两倍,尽管它的系统调用少了上百倍。我用的是最新的 Linux 内核,最新的 liburing 库。我看到原生的 io_uring API,我决定甚至不去尝试用它,它的 API 太可怕了。但 liburing 是一个完全可用的包装器。不知为何速度是这样。

而且不只是我。后来我去深挖,发现了很多报告说,io_uring 并不比 epoll 快,除非你把 epoll 阉割得很厉害,那样 io_uring 才会突然大放异彩。但这没意思。

我想邀请那些知道为什么会这样的朋友来讨论一下,因为我只有一些理论,但关于它们在内核中的实现到底有什么不同,我没有答案。所以,这是一个有趣的话题,也许有人懂。这里这么多人,肯定有人知道。

谢谢大家关注!我们可以进入提问和讨论环节了。希望你们觉得有趣。

问答环节 (Q&A)

提问者1: 谢谢你的演讲。我想知道,比如说,我看了基准测试结果,但不是很明显,这个解决方案在不同数量的物理核心、不同数量的客户端下的扩展性如何?因为那里的结果只限制在5个线程,我认为这不是很具代表性的样本。

Vladislav: 这个东西在1万个客户端下测试过,是的,我们为生产环境准备过。据我所知,在我做这个的公司里,它至今仍在那里运行。它确实能扩展,也就是说,它仍然比(竞品)快。扩展性几乎是线性的,只要你的线程数等于或小于核心数。但为了在幻灯片上简化,基准测试做得简单一些了。

提问者1: 好的。那么,你有没有想过和 Go 语言的解决方案比较一下?

Vladislav: Go 我甚至没试过。我只是……我也很好奇,试着比较过。我有一个基于 Boost 的解决方案,我能从中榨出的最大性能是 Go 能给出的 90-95%。

Vladislav: 嗯,是的,是这样。虽然我个人没比较过,但我得提醒一下,这当然取决于具体场景。完全有可能,你可以找出一个场景,Go 会快得多,或者(我的方案)不知为何会快得多。要看具体情况。是的,如果像这样在真空中比较百分比,那我的解决方案会比 Go 上的类似实现要快。但在 Go 里,你其实控制不了太多东西,它有内置的 goroutine、yield,它自己的操作调度器,可能在某种形式上和我现在讲的以及 Boost.Asio 类似。你没有太多调整的余地。是的,就是这样。看具体情况会更有趣。

提问者2: 我还有一个关于这个 API 设计的小问题。让我很困惑的是,连接内部资源的生命周期管理,似乎是通过 newdelete 来控制的,而且 delete 是在用户侧的 on_close 方法里调用的。你有没有想过,比如,如果一个资源真的比连接活得更长,它应该由外部来控制。而如果资源已经和连接的生命周期绑定了,那在 on_close 里原则上就不需要控制任何东西了。

Vladislav: 首先,在 on_close 之后,有时需要做重连。所以我不能 просто (just) 直接删除 Task。但有时我可以,当不需要重连的时候。但我不知道应用程序里会怎么样,所以不能默认总是静默删除。其次,有时你想要删除一些你自己的上下文。我不喜欢把所有东西都塞进析构函数里。我喜欢更明确地写这类东西。但原则上,这可能都可以通过包装器、引用计数之类的来做。但我喜欢更明确的代码。所以我的实现就成了这样。

提问者2: 好的,谢谢你的演讲。