bthread是brpc使用的M:N线程库,目的是在提高程序的并发度的同时,降低编码难度,并在核数日益增多的CPU上提供更好的scalability和cache locality。

bthread的M:N模型是将M个bthread映射到N个pthread,M会远大于N。如下图所示:

协程一般也是将多个用户线程运行在单个内核线程,但他们是N:1的关系,不同内核线程上的用户线程不共享,并且如果一个线程阻塞会导致其他线程都处于等待状态,无法利用多核。bthread的M:N模型以及steal_work的调度机制可以更好的利用多核。

Bthread的整体架构

bthread 的整体结构如下:

bthread的整体架构分为TaskControlTaskGroupParkingLot,其中 TaskControl 为全局唯一的实例用于管理所有的TaskGroup,;TaskGroup 主要是协调 bthread 的调度,它和一个pthread 进行绑定;ParkingLot 是 bthread 调度的信号量,内部使用futex实现,用于TaskGroup的等待和唤醒。

Bthread调度策略

1. TaskControl

TaskControl 是一个全局唯一的实例,当有新的bthread加入的时候,会从 TaskControl 里面选取一个 TaskGroup,并将该bthread加入到TaskGroup中,如果 TaskControl 还未初始化 TaskGroup,则开始创建:

1
2
3
4
5
6
7
8
_workers.resize(_concurrency);   
for (int i = 0; i < _concurrency; ++i) {
const int rc = pthread_create(&_workers[i], NULL, worker_thread, this);
if (rc) {
LOG(ERROR) << "Fail to create _workers[" << i << "], " << berror(rc);
return -1;
}
}

其中 _concurrency 为指定的 workers 数量,brpc 中默认为 (8+epoll_num),创建的workers线程具体执行:

1
2
3
4
5
6
7
TaskControl* c = static_cast<TaskControl*>(arg);
TaskGroup* g = c->create_group();
TaskStatistics stat;
.....
tls_task_group = g;
c->_nworkers << 1;
g->run_main_task();

每个 worker 首先创建一个 TaskGroup,并执行该 task_group 的run_main_task。

2. TaskGroup

TaskGroup 主要用于协调bthread的调度执行,入口为run_main_task:

1
2
3
4
5
6
7
8
9
10
11
12
13
void TaskGroup::run_main_task() {
......
TaskGroup* dummy = this;
bthread_t tid;
while (wait_task(&tid)) {
TaskGroup::sched_to(&dummy, tid);
DCHECK_EQ(this, dummy);
DCHECK_EQ(_cur_meta->stack, _main_stack);
if (_cur_meta->tid != _main_tid) {
TaskGroup::task_runner(1/*skip remained*/);
}
......
}

不断的 wait_task, 等到 bthread 后,即进入 sched_to 开始调度该 bthread。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
void TaskGroup::sched_to(TaskGroup** pg, TaskMeta* next_meta) {
TaskGroup* g = *pg;
......
TaskMeta* const cur_meta = g->_cur_meta;
......
if (cur_meta->stack != NULL) {
if (next_meta->stack != cur_meta->stack) {
jump_stack(cur_meta->stack, next_meta->stack);
// probably went to another group, need to assign g again.
g = tls_task_group;
}
#ifndef NDEBUG
else {
// else pthread_task is switching to another pthread_task, sc
// can only equal when they're both _main_stack
CHECK(cur_meta->stack == g->_main_stack);
}
#endif
}
// else because of ending_sched(including pthread_task->pthread_task)
}
......

while (g->_last_context_remained) {
RemainedFn fn = g->_last_context_remained;
g->_last_context_remained = NULL;
fn(g->_last_context_remained_arg);
g = tls_task_group;
}
......
}

sched_to 主要的步骤即把当前的bthread的所保存的上下文内容(利用_boost::context _生成的)加载到当前的线程栈中,并跳转到bthread的执行位置开始执行。当前的 bthread 执行完成后,会把之前停留的一些任务执行完,然后进入ending_sched:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
void TaskGroup::ending_sched(TaskGroup** pg) {
TaskGroup* g = *pg;
bthread_t next_tid = 0;
.......
// getting next bthread from _rq
const bool popped = g->_rq.pop(&next_tid);
if (!popped && !g->steal_task(&next_tid)) {
// Jump to main task if there's no task to run.
next_tid = g->_main_tid;
}
TaskMeta* const cur_meta = g->_cur_meta;
TaskMeta* next_meta = address_meta(next_tid);

if (next_meta->stack == NULL) {
......
ContextualStack* stk = get_stack(next_meta->stack_type(), task_runner);
......
}
sched_to(pg, next_meta);
}

ending_sched 主要是当前bthread执行完后会优先从本地获取下一个bthread,如果没有则从其他taskgroup偷取bthread来执行,获取成功后又重新调度。如果没有获取到bthread则进入main_task开始等待。整体流程如下:

3. ParkingLot

ParkingLot 作为 TaskGroup 获取 bthread 的信号量,内部使用futex实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// Wake up at most `num_task' workers.
// Returns #workers woken up.
int signal(int num_task) {
_pending_signal.fetch_add((num_task << 1), butil::memory_order_release);
return futex_wake_private(&_pending_signal, num_task);
}

// Get a state for later wait().
State get_state() {
return _pending_signal.load(butil::memory_order_acquire);
}

// Wait for tasks.
// If the `expected_state' does not match, wait() may finish directly.
void wait(const State& expected_state) {
//LOG(FATAL) << _pending_signal << expected_state.val ;
futex_wait_private(&_pending_signal, expected_state.val, NULL);
}

parking_lot 的等待与唤醒都是利用系统调用 futex_wait 和 futex_wake 来实现,futex 的优势在于 wait 时会先判断当前的pending_signal 是否和 expected_state 的值相等,如果不相等则直接返回,不必进入内核态,减少开销。具体原理参考futex — Linux manual page。为了减少竞争,在bthread中默认有4个pariking_lot, 每个TaskGroup会与其中一个parking_lot对应。

1
_pl = &c->_pl[butil::fmix64(pthread_numeric_id()) % TaskControl::PARKING_LOPART_NUM];

每个task_group会根据当前的pthread_numeric_id来分配parking_lot,这样在一定程度上会减少一定的竞争。

1
2
static const int PARKING_LOT_NUM = 4;
ParkingLot _pl[PARKING_LOT_NUM];

PARKING_LOPART_NUM 在 bthread 中为常量,不可调节。PARKING_LOT_NUM 不同值在不同的情况下会不会成为瓶颈有待验证。当有新的bthread加入的时候,如果设置了signal参数,则会调用TaskControl的signal_task唤醒任务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
void TaskControl::signal_task(int num_task) {
if (num_task <= 0) {
return;
}
// TODO(gejun): Current algorithm does not guarantee enough threads will
// be created to match caller's requests. But in another side, there's also
// many useless signalings according to current impl. Capping the concurrency
// is a good balance between performance and timeliness of scheduling.

if (num_task > 2) {
num_task = 2;
}
int start_index = butil::fmix64(pthread_numeric_id()) % PARKING_LOT_NUM;
num_task -= _pl[start_index].signal(1);
if (num_task > 0) {
for (int i = 1; i < PARKING_LOT_NUM && num_task > 0; ++i) {
if (++start_index >= PARKING_LOT_NUM) {
start_index = 0;
}
num_task -= _pl[start_index].signal(1);
}
}
if (num_task > 0 &&
FLAGS_bthread_min_concurrency > 0 && // test min_concurrency for performance
_concurrency.load(butil::memory_order_relaxed) < FLAGS_bthread_concurrency) {
// TODO: Reduce this lock
BAIDU_SCOPED_LOCK(g_task_control_mutex);
if (_concurrency.load(butil::memory_order_acquire) < FLAGS_bthread_concurrency) {
add_workers(1);
}
}
}

signal_task 用于唤醒等待的 workers,为了减少无用的唤醒,会做一个限制,每次唤醒的次数最多2个,如果2个都未唤醒成功,并且设置了FLAGS_bthread_min_concurrency, 则会加入新的 worker 以满足当前的并发量。

4. StealTask

在 TaskGroup 中有两个用于存储 bthread 的队列:

1
2
WorkStealingQueue<bthread_t> _rq;
RemoteTaskQueue _remote_rq;

其中 WorkStealingQueue 用于存储由 task_group(worker) 自身生成的 bthread,RemoteTaskQueue 用于存储非 task_group 生成的bthread,因为 WorkStealingQueue 中的 push 和 pop 只会被本 worker 调用,仅与 steal 产生竞争,所以使用无锁实现减少锁的开销。RemoteTaskQueue 由于是由非 worker 调用,且是随机选择task_group, 所以使用锁的方式实现。StealTask 是 brpc 里调度bthread 的策略,即可以从其他 task_group(worker) 的队列里面去拿任务,是实现 $M:N$ 模型的方法。

  1. 被唤醒时会触发steal_task
    worker wait 的前提是本身队列没有任务,且未能偷到任务,所以当唤醒时,会触发 steal_task 去拿任务

    1
    2
    3
    4
    5
    _pl->wait(_last_pl_state);
    LOG(FATAL) << "start steal task";
    if (steal_task(tid)) {
    return true;
    }
  2. ending_sched时可能触发steal_task

    1
    2
    3
    4
    5
    6
    7
    8
    9
    #ifndef BTHREAD_FAIR_WSQ
    const bool popped = g->_rq.pop(&next_tid);
    #else
    const bool popped = g->_rq.steal(&next_tid);
    #endif
    if (!popped && !g->steal_task(&next_tid)) {
    // Jump to main task if there's no task to run.
    next_tid = g->_main_tid;
    }

    如果本 taskgroup 的 _rq 没有任务时,就会触发 steal_task。task_group 的 steal_task 会首先从本地的 _remote_rq 取任务,如果没有则会从其他 task_group 拿任务。

1
2
3
4
5
6
7
8
9
 bool steal_task(bthread_t* tid) {
if (_remote_rq.pop(tid)) {
return true;
}
#ifndef BTHREAD_DONT_SAVE_PARKING_STATE
_last_pl_state = _pl->get_state();
#endif
return _control->steal_task(tid, &_steal_seed, _steal_offset);
}

通过 task_control 这个管理者实现从其他 task_group 拿任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
bool stolen = false;
size_t s = *seed;
for (size_t i = 0; i < ngroup; ++i, s += offset) {
TaskGroup* g = _groups[s % ngroup];
// g is possibly NULL because of concurrent _destroy_group
if (g) {
if (g->_rq.steal(tid)) {
stolen = true;
break;
}
if (g->_remote_rq.pop(tid)) {
stolen = true;
break;
}
}
}

即从随机位置开始遍历,每次跨度 offset,遍历 ngroup 次,先从其他 task_group 的 working_stealing_queue 里面取,其次从remote_rq 取,取到任务后立即返回。

从上面分析中可以看出,如果开启很多task_group后,每个task_group任务消耗完后会产生大量的steal_task调用。如果工作的负载处于波动的情况下,处于波峰的时候会拉起很多task_group工作,到达波谷时task_group会发生大量的steal_work,而如果负载一直处于较低或较高的状态,steal_work调用会显著降低。

5. 两种bthread启动方式

在 bthrea d中有两种方式开启新的 bthread:

  1. bthread_start_background

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    int bthread_start_background(bthread_t* __restrict tid,
    const bthread_attr_t* __restrict attr,
    void * (*fn)(void*),
    void* __restrict arg) {
    bthread::TaskGroup* g = bthread::tls_task_group;
    if (g) {
    // start from worker
    return g->start_background<false>(tid, attr, fn, arg);
    }
    return bthread::start_from_non_worker(tid, attr, fn, arg);
    }
  2. 如果创建这个bthread的调用者是外部的线程(非task_group),调用start_from_non_worker。
    start_from_non_worker的工作就是从task_control随机选择一个task_group作为这个新bthread的归宿。

  3. 如果创建者是已经存在于task_group的bthread则会以当前的task_group作为最终归宿。
    task_group中的start_background做的工作就是为新的bthread创建资源TaskMeta,然后根据上面两种情况分别加入不同的对列,情况1 加入 remote_rq, 情况2 加入 _rq 。

    1
    2
    3
    4
    5
    if (REMOTE) {
    ready_to_run_remote(m->tid, (using_attr.flags & BTHREAD_NOSIGNAL));
    } else {
    ready_to_run(m->tid, (using_attr.flags & BTHREAD_NOSIGNAL));
    }
  4. bthread_start_urgent
    如果调用者是外部的线程则和上面分析一致,如果调用者是已经存在的task_group 的 bthread,则会调用当前 task_group 的start_foreground

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    int TaskGroup::start_foreground(TaskGroup** pg,
    bthread_t* __restrict th,
    const bthread_attr_t* __restrict attr,
    void * (*fn)(void*),
    void* __restrict arg) {
    ......
    // initialize task meta
    if (g->is_current_pthread_task()) {
    g->ready_to_run(m->tid, (using_attr.flags & BTHREAD_NOSIGNAL));
    } else {
    // NOSIGNAL affects current task, not the new task.
    RemainedFn fn = NULL;
    if (g->current_task()->about_to_quit) {
    fn = ready_to_run_in_worker_ignoresignal;
    } else {
    fn = ready_to_run_in_worker;
    }
    ReadyToRunArgs args = {
    g->current_tid(),
    (bool)(using_attr.flags & BTHREAD_NOSIGNAL)
    };
    g->set_remained(fn, &args);
    TaskGroup::sched_to(pg, m->tid);
    }
    return 0;
    }

    首先也是初始化 taskmeta,用于后续的调度,由于是 start_foreground,当前的task如果还未完成,则会在后续调用read_ro_run_in_worker,将当前未完成的 bthread 加入队列中,该逻辑封装在 RemainedFn 中。然后调用sched_to 调度当前新的 bthread。新的bthread会被包裹在 task_runner 中:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    void TaskGroup::task_runner(intptr_t skip_remained) {
    // NOTE: tls_task_group is volatile since tasks are moved around
    // different groups.
    TaskGroup* g = tls_task_group;

    if (!skip_remained) {
    while (g->_last_context_remained) {
    RemainedFn fn = g->_last_context_remained;
    g->_last_context_remained = NULL;
    fn(g->_last_context_remained_arg);
    g = tls_task_group;
    }

    #ifndef NDEBUG
    --g->_sched_recursive_guard;
    #endif
    }

    do {
    ......
    void* thread_return;
    try {
    thread_return = m->fn(m->arg);
    } catch (ExitException& e) {
    thread_return = e.value();
    }

    // Group is probably changed
    g = tls_task_group;

    // TODO: Save thread_return
    (void)thread_return;

    ......
    g->_control->_nbthreads << -1;
    g->set_remained(TaskGroup::_release_last_context, m);
    ending_sched(&g);

    } while (g->_cur_meta->tid != g->_main_tid);
    }

    首先就是调用 remain 函数(上面的将未完成的bthread加入队列),接着完成当前 bthread 具体的函数(fn),最终进入 ending_sched 调度下一个任务直到没有任务,然后进入 main_tid。

Bthread在BRPC中的应用

BRPC 是百度开源的一款 rpc 框架,内部使用了Bthread的线程模型brpc 的流程大致如下:

EventDispatcher 主要利用 epoll 监听读写事件,并分发事件到相应的处理handler。

1
2
3
4
5
6
7
8
9
10
11
void InitializeGlobalDispatchers() {
g_edisp = new EventDispatcher[FLAGS_event_dispatcher_num];
for (int i = 0; i < FLAGS_event_dispatcher_num; ++i) {
const bthread_attr_t attr = FLAGS_usercode_in_pthread ?
BTHREAD_ATTR_PTHREAD : BTHREAD_ATTR_NORMAL;
CHECK_EQ(0, g_edisp[i].Start(&attr));
}
// This atexit is will be run before g_task_control.stop() because above
// Start() initializes g_task_control by creating bthread (to run epoll/kqueue).
CHECK_EQ(0, atexit(StopAndJoinGlobalDispatchers));
}

初始化时,会开启 event_dispatcher_num(默认为1)个disaptcher,运行在 bthread 中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
 for (int i = 0; i < n; ++i) {
#if defined(OS_LINUX)
if (e[i].events & (EPOLLIN | EPOLLERR | EPOLLHUP)
#ifdef BRPC_SOCKET_HAS_EOF
|| (e[i].events & has_epollrdhup)
#endif
) {
// We don't care about the return value.
Socket::StartInputEvent(e[i].data.u64, e[i].events,
_consumer_thread_attr);
}
}
for (int i = 0; i < n; ++i) {
if (e[i].events & (EPOLLOUT | EPOLLERR | EPOLLHUP)) {
// We don't care about the return value.
Socket::HandleEpollOut(e[i].data.u64);
}
......
}

不断的读取发生的读写事件并交由处理函数处理。
如果是发生读事件,会抢占当前 epoll 所在 task_group 的执行权限,会调用bthread_start_ugrent来执行读事件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
int Socket::StartInputEvent(SocketId id, uint32_t events,
const bthread_attr_t& thread_attr) {
SocketUniquePtr s;
if (Address(id, &s) < 0) {
return -1;
}
if (NULL == s->_on_edge_triggered_events) {
return 0;
}
......
if (s->_nevent.fetch_add(1, butil::memory_order_acq_rel) == 0) {
g_vars->neventthread << 1;

bthread_t tid;
// transfer ownership as well, don't use s anymore!
Socket* const p = s.release();

bthread_attr_t attr = thread_attr;
attr.keytable_pool = p->_keytable_pool;
if (bthread_start_urgent(&tid, &attr, ProcessEvent, p) != 0) {
LOG(FATAL) << "Fail to start ProcessEvent";
ProcessEvent(p);
}
}
return 0;
}

bthread_start_urgent 会调用 bthread 中的start_foreground,当前正在执行的 bthread 会被挂起到 _rq 队列中(可以被其他 task_group steal_task ),读事件 handler 的 btrhead 会被调度到当前task_group中(发生上下文切换 )。在brpc中读事件一般分为两种:

  1. 新的请求连接(OnNewConnections)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    SocketId socket_id;
    SocketOptions options;
    options.keytable_pool = am->_keytable_pool;
    options.fd = in_fd;
    options.remote_side = butil::EndPoint(*(sockaddr_in*)&in_addr);
    options.user = acception->user();
    options.on_edge_triggered_events = InputMessenger::OnNewMessages;
    options.initial_ssl_ctx = am->_ssl_ctx;
    if (Socket::Create(options, &socket_id) != 0) {
    LOG(ERROR) << "Fail to create Socket";
    continue;
    }

    获得远端的 socket 的信息并创建新的 socket_id, 并将读事件的handler 设置为 OnNewMessages

  2. 读socket上的message (OnNewMessages)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    void InputMessenger::OnNewMessages(Socket* m) {
    InputMessenger* messenger = static_cast<InputMessenger*>(m->user());
    const InputMessageHandler* handlers = messenger->_handlers;
    int progress = Socket::PROGRESS_INIT;
    while (!read_eof) {

    ......
    // Read.
    const ssize_t nr = m->DoRead(once_read);
    ......
    m->AddInputBytes(nr);
    while (1) {
    size_t index = 8888;
    ParseResult pr = messenger->CutInputMessage(m, &index, read_eof);
    ......
    m->AddInputMessages(1);
    ......
    DestroyingPtr<InputMessageBase> msg(pr.message());
    QueueMessage(last_msg.release(), &num_bthread_created,
    m->_keytable_pool);
    if (handlers[index].process == NULL) {
    LOG(ERROR) << "process of index=" << index << " is NULL";
    continue;
    }
    m->ReAddress(&msg->_socket);
    m->PostponeEOF();
    msg->_process = handlers[index].process;
    msg->_arg = handlers[index].arg;
    ......
    if (num_bthread_created) {
    bthread_flush();
    }
    }
    if (read_eof) {
    m->SetEOF();
    }
    }

    OnNewMessages 主要是从 sokcet 上读取消息,解析消息(协议、内容等),并且将要处理的任务(用户逻辑代码)先放入队列中(QueueMessage,生成bthread,但是不signal),最终会将解析到的多个请求一并 flush(signal 所有未被signal的任务),减少不必要的开销。
    当用户处理完逻辑后,会向soket发起写事件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
int Socket::Write(butil::IOBuf* data, const WriteOptions* options_in) {
WriteOptions opt;
if (options_in) {
opt = *options_in;
}
if (data->empty()) {
return SetError(opt.id_wait, EINVAL);
}
WriteRequest* req = butil::get_object<WriteRequest>();
if (!req) {
return SetError(opt.id_wait, ENOMEM);
}

req->data.swap(*data);
// Set `req->next' to UNCONNECTED so that the KeepWrite thread will
// wait until it points to a valid WriteRequest or NULL.
req->next = WriteRequest::UNCONNECTED;
req->id_wait = opt.id_wait;
req->set_pipelined_count_and_user_message(
opt.pipelined_count, DUMMY_USER_MESSAGE, opt.with_auth);
return StartWrite(req, opt);
}

将写事件封装为WriteRequest请求,并加入到链表中,然后开始StartWrite,StartWrite的逻辑就是不断的将buffer中的数据写完到socket中,如果在当前bthread中能够写完数据则成功返回,否则则加入keepwrite中,开启bthread继续写,直到写完数据。