bthread调度策略探索
Contents
bthread是brpc使用的M:N线程库,目的是在提高程序的并发度的同时,降低编码难度,并在核数日益增多的CPU上提供更好的scalability和cache locality。
bthread的M:N模型是将M个bthread映射到N个pthread,M会远大于N。如下图所示:
协程一般也是将多个用户线程运行在单个内核线程,但他们是N:1的关系,不同内核线程上的用户线程不共享,并且如果一个线程阻塞会导致其他线程都处于等待状态,无法利用多核。bthread的M:N模型以及steal_work的调度机制可以更好的利用多核。
Bthread的整体架构
bthread 的整体结构如下:
bthread的整体架构分为TaskControl
、TaskGroup
和ParkingLot
,其中 TaskControl 为全局唯一的实例用于管理所有的TaskGroup,;TaskGroup 主要是协调 bthread 的调度,它和一个pthread 进行绑定;ParkingLot 是 bthread 调度的信号量,内部使用futex实现,用于TaskGroup的等待和唤醒。
Bthread调度策略
1. TaskControl
TaskControl 是一个全局唯一的实例,当有新的bthread加入的时候,会从 TaskControl 里面选取一个 TaskGroup,并将该bthread加入到TaskGroup中,如果 TaskControl 还未初始化 TaskGroup,则开始创建:
1 | _workers.resize(_concurrency); |
其中 _concurrency 为指定的 workers 数量,brpc 中默认为 (8+epoll_num),创建的workers线程具体执行:
1 | TaskControl* c = static_cast<TaskControl*>(arg); |
每个 worker 首先创建一个 TaskGroup,并执行该 task_group 的run_main_task。
2. TaskGroup
TaskGroup 主要用于协调bthread的调度执行,入口为run_main_task:
1 | void TaskGroup::run_main_task() { |
不断的 wait_task, 等到 bthread 后,即进入 sched_to 开始调度该 bthread。
1 | void TaskGroup::sched_to(TaskGroup** pg, TaskMeta* next_meta) { |
sched_to 主要的步骤即把当前的bthread的所保存的上下文内容(利用_boost::context _生成的)加载到当前的线程栈中,并跳转到bthread的执行位置开始执行。当前的 bthread 执行完成后,会把之前停留的一些任务执行完,然后进入ending_sched:
1 | void TaskGroup::ending_sched(TaskGroup** pg) { |
ending_sched 主要是当前bthread执行完后会优先从本地获取下一个bthread,如果没有则从其他taskgroup偷取bthread来执行,获取成功后又重新调度。如果没有获取到bthread则进入main_task开始等待。整体流程如下:
3. ParkingLot
ParkingLot 作为 TaskGroup 获取 bthread 的信号量,内部使用futex实现。
1 | // Wake up at most `num_task' workers. |
parking_lot 的等待与唤醒都是利用系统调用 futex_wait 和 futex_wake 来实现,futex 的优势在于 wait 时会先判断当前的pending_signal 是否和 expected_state 的值相等,如果不相等则直接返回,不必进入内核态,减少开销。具体原理参考futex — Linux manual page。为了减少竞争,在bthread中默认有4个pariking_lot, 每个TaskGroup会与其中一个parking_lot对应。
1 | _pl = &c->_pl[butil::fmix64(pthread_numeric_id()) % TaskControl::PARKING_LOPART_NUM]; |
每个task_group会根据当前的pthread_numeric_id来分配parking_lot,这样在一定程度上会减少一定的竞争。
1 | static const int PARKING_LOT_NUM = 4; |
PARKING_LOPART_NUM 在 bthread 中为常量,不可调节。PARKING_LOT_NUM 不同值在不同的情况下会不会成为瓶颈有待验证。当有新的bthread加入的时候,如果设置了signal参数,则会调用TaskControl的signal_task唤醒任务:
1 | void TaskControl::signal_task(int num_task) { |
signal_task 用于唤醒等待的 workers,为了减少无用的唤醒,会做一个限制,每次唤醒的次数最多2个,如果2个都未唤醒成功,并且设置了FLAGS_bthread_min_concurrency, 则会加入新的 worker 以满足当前的并发量。
4. StealTask
在 TaskGroup 中有两个用于存储 bthread 的队列:
1 | WorkStealingQueue<bthread_t> _rq; |
其中 WorkStealingQueue 用于存储由 task_group(worker) 自身生成的 bthread,RemoteTaskQueue 用于存储非 task_group 生成的bthread,因为 WorkStealingQueue 中的 push 和 pop 只会被本 worker 调用,仅与 steal 产生竞争,所以使用无锁实现减少锁的开销。RemoteTaskQueue 由于是由非 worker 调用,且是随机选择task_group, 所以使用锁的方式实现。StealTask 是 brpc 里调度bthread 的策略,即可以从其他 task_group(worker) 的队列里面去拿任务,是实现 $M:N$ 模型的方法。
被唤醒时会触发steal_task
worker wait 的前提是本身队列没有任务,且未能偷到任务,所以当唤醒时,会触发 steal_task 去拿任务1
2
3
4
5_pl->wait(_last_pl_state);
LOG(FATAL) << "start steal task";
if (steal_task(tid)) {
return true;
}ending_sched时可能触发steal_task
1
2
3
4
5
6
7
8
9
const bool popped = g->_rq.pop(&next_tid);
const bool popped = g->_rq.steal(&next_tid);
if (!popped && !g->steal_task(&next_tid)) {
// Jump to main task if there's no task to run.
next_tid = g->_main_tid;
}如果本 taskgroup 的 _rq 没有任务时,就会触发 steal_task。task_group 的 steal_task 会首先从本地的 _remote_rq 取任务,如果没有则会从其他 task_group 拿任务。
1 | bool steal_task(bthread_t* tid) { |
通过 task_control 这个管理者实现从其他 task_group 拿任务
1 | bool stolen = false; |
即从随机位置开始遍历,每次跨度 offset,遍历 ngroup 次,先从其他 task_group 的 working_stealing_queue 里面取,其次从remote_rq 取,取到任务后立即返回。
从上面分析中可以看出,如果开启很多task_group后,每个task_group任务消耗完后会产生大量的steal_task调用。如果工作的负载处于波动的情况下,处于波峰的时候会拉起很多task_group工作,到达波谷时task_group会发生大量的steal_work,而如果负载一直处于较低或较高的状态,steal_work调用会显著降低。
5. 两种bthread启动方式
在 bthrea d中有两种方式开启新的 bthread:
bthread_start_background
1
2
3
4
5
6
7
8
9
10
11int bthread_start_background(bthread_t* __restrict tid,
const bthread_attr_t* __restrict attr,
void * (*fn)(void*),
void* __restrict arg) {
bthread::TaskGroup* g = bthread::tls_task_group;
if (g) {
// start from worker
return g->start_background<false>(tid, attr, fn, arg);
}
return bthread::start_from_non_worker(tid, attr, fn, arg);
}如果创建这个bthread的调用者是外部的线程(非task_group),调用start_from_non_worker。
start_from_non_worker的工作就是从task_control随机选择一个task_group作为这个新bthread的归宿。如果创建者是已经存在于task_group的bthread则会以当前的task_group作为最终归宿。
task_group中的start_background做的工作就是为新的bthread创建资源TaskMeta,然后根据上面两种情况分别加入不同的对列,情况1 加入 remote_rq, 情况2 加入 _rq 。1
2
3
4
5if (REMOTE) {
ready_to_run_remote(m->tid, (using_attr.flags & BTHREAD_NOSIGNAL));
} else {
ready_to_run(m->tid, (using_attr.flags & BTHREAD_NOSIGNAL));
}bthread_start_urgent
如果调用者是外部的线程则和上面分析一致,如果调用者是已经存在的task_group 的 bthread,则会调用当前 task_group 的start_foreground1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26int TaskGroup::start_foreground(TaskGroup** pg,
bthread_t* __restrict th,
const bthread_attr_t* __restrict attr,
void * (*fn)(void*),
void* __restrict arg) {
......
// initialize task meta
if (g->is_current_pthread_task()) {
g->ready_to_run(m->tid, (using_attr.flags & BTHREAD_NOSIGNAL));
} else {
// NOSIGNAL affects current task, not the new task.
RemainedFn fn = NULL;
if (g->current_task()->about_to_quit) {
fn = ready_to_run_in_worker_ignoresignal;
} else {
fn = ready_to_run_in_worker;
}
ReadyToRunArgs args = {
g->current_tid(),
(bool)(using_attr.flags & BTHREAD_NOSIGNAL)
};
g->set_remained(fn, &args);
TaskGroup::sched_to(pg, m->tid);
}
return 0;
}首先也是初始化 taskmeta,用于后续的调度,由于是 start_foreground,当前的task如果还未完成,则会在后续调用read_ro_run_in_worker,将当前未完成的 bthread 加入队列中,该逻辑封装在 RemainedFn 中。然后调用sched_to 调度当前新的 bthread。新的bthread会被包裹在 task_runner 中:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40void TaskGroup::task_runner(intptr_t skip_remained) {
// NOTE: tls_task_group is volatile since tasks are moved around
// different groups.
TaskGroup* g = tls_task_group;
if (!skip_remained) {
while (g->_last_context_remained) {
RemainedFn fn = g->_last_context_remained;
g->_last_context_remained = NULL;
fn(g->_last_context_remained_arg);
g = tls_task_group;
}
--g->_sched_recursive_guard;
}
do {
......
void* thread_return;
try {
thread_return = m->fn(m->arg);
} catch (ExitException& e) {
thread_return = e.value();
}
// Group is probably changed
g = tls_task_group;
// TODO: Save thread_return
(void)thread_return;
......
g->_control->_nbthreads << -1;
g->set_remained(TaskGroup::_release_last_context, m);
ending_sched(&g);
} while (g->_cur_meta->tid != g->_main_tid);
}首先就是调用 remain 函数(上面的将未完成的bthread加入队列),接着完成当前 bthread 具体的函数(fn),最终进入 ending_sched 调度下一个任务直到没有任务,然后进入 main_tid。
Bthread在BRPC中的应用
BRPC 是百度开源的一款 rpc 框架,内部使用了Bthread的线程模型brpc 的流程大致如下:
EventDispatcher 主要利用 epoll 监听读写事件,并分发事件到相应的处理handler。
1 | void InitializeGlobalDispatchers() { |
初始化时,会开启 event_dispatcher_num(默认为1)个disaptcher,运行在 bthread 中。
1 | for (int i = 0; i < n; ++i) { |
不断的读取发生的读写事件并交由处理函数处理。
如果是发生读事件,会抢占当前 epoll 所在 task_group 的执行权限,会调用bthread_start_ugrent来执行读事件。
1 | int Socket::StartInputEvent(SocketId id, uint32_t events, |
bthread_start_urgent 会调用 bthread 中的start_foreground,当前正在执行的 bthread 会被挂起到 _rq 队列中(可以被其他 task_group steal_task ),读事件 handler 的 btrhead 会被调度到当前task_group中(发生上下文切换 )。在brpc中读事件一般分为两种:
新的请求连接(OnNewConnections)
1
2
3
4
5
6
7
8
9
10
11
12SocketId socket_id;
SocketOptions options;
options.keytable_pool = am->_keytable_pool;
options.fd = in_fd;
options.remote_side = butil::EndPoint(*(sockaddr_in*)&in_addr);
options.user = acception->user();
options.on_edge_triggered_events = InputMessenger::OnNewMessages;
options.initial_ssl_ctx = am->_ssl_ctx;
if (Socket::Create(options, &socket_id) != 0) {
LOG(ERROR) << "Fail to create Socket";
continue;
}获得远端的 socket 的信息并创建新的 socket_id, 并将读事件的handler 设置为 OnNewMessages
读socket上的message (OnNewMessages)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37void InputMessenger::OnNewMessages(Socket* m) {
InputMessenger* messenger = static_cast<InputMessenger*>(m->user());
const InputMessageHandler* handlers = messenger->_handlers;
int progress = Socket::PROGRESS_INIT;
while (!read_eof) {
......
// Read.
const ssize_t nr = m->DoRead(once_read);
......
m->AddInputBytes(nr);
while (1) {
size_t index = 8888;
ParseResult pr = messenger->CutInputMessage(m, &index, read_eof);
......
m->AddInputMessages(1);
......
DestroyingPtr<InputMessageBase> msg(pr.message());
QueueMessage(last_msg.release(), &num_bthread_created,
m->_keytable_pool);
if (handlers[index].process == NULL) {
LOG(ERROR) << "process of index=" << index << " is NULL";
continue;
}
m->ReAddress(&msg->_socket);
m->PostponeEOF();
msg->_process = handlers[index].process;
msg->_arg = handlers[index].arg;
......
if (num_bthread_created) {
bthread_flush();
}
}
if (read_eof) {
m->SetEOF();
}
}OnNewMessages 主要是从 sokcet 上读取消息,解析消息(协议、内容等),并且将要处理的任务(用户逻辑代码)先放入队列中(QueueMessage,生成bthread,但是不signal),最终会将解析到的多个请求一并 flush(signal 所有未被signal的任务),减少不必要的开销。
当用户处理完逻辑后,会向soket发起写事件。
1 | int Socket::Write(butil::IOBuf* data, const WriteOptions* options_in) { |
将写事件封装为WriteRequest请求,并加入到链表中,然后开始StartWrite,StartWrite的逻辑就是不断的将buffer中的数据写完到socket中,如果在当前bthread中能够写完数据则成功返回,否则则加入keepwrite中,开启bthread继续写,直到写完数据。
Author: DongSheng
Link: http://ehds.github.io/2021/07/20/bthread_schedule/
License: 知识共享署名-非商业性使用 4.0 国际许可协议