从线程池模型看 Oceanbase 线程池的应用和实现

1. 线程池的基本原理和组件

线程池的定义:A thread pool maintains multiple threads waiting for tasks to be allocated for concurrent execution by the supervising program. Client can send “work” to the pool and somehow this work gets done without blocking the main thread. It’s efficient because threads are not initialized each time we want the work to be done. Threads are initialized once and remain inactive until some work has to be done.

三个基本要点:

  • 任务队列

  • 线程池调度

  • 所有任务结束后唤醒主线程

image-20240113191409986

一个高质量的C实现线程池:https://github.com/Pithikos/C-Thread-Pool

1.1 线程池需要向外提供的必要接口
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 1.初始化
threadpool thpool_init(int num_threads);

// 2.构造job/task,封装成函数调用,压入任务队列
int thpool_add_work(threadpool, void (*function_p)(void*), void* arg_p);

// 3.等待所有任务执行完成,both in queue and currently running
void thpool_wait(threadpool);

// 4.暂停所有线程,无论它们处在空闲还是工作
void thpool_pause(threadpool);

// 5.恢复:从暂停中恢复所以线程原本的工作
void thpool_resume(threadpool);

// 6.销毁线程池,如果有线程还在运行,等待它工作完毕
void thpool_destroy(threadpool);

// 7.返回正在工作的线程数量
int thpool_num_threads_working(threadpool);
1.2 用到的数据结构
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// 任务:本质上是一个个由工作函数构成的链表结点
typedef struct job{
struct job* prev;
void (*function)(void* arg);
void* arg;
} job;

// 任务队列:可以通过链表实现,在并发场景下,需要增加读写锁、信号量
typedef struct jobqueue{
pthread_mutex_t rwmutex;
job *front;
job *rear;
bsem *has_jobs; // 这里设置一个布尔的信号量,用于各个线程等待还是执行
int len;
} jobqueue;

// 线程:基于pthread
typedef struct thread{
int id;
pthread_t pthread;
struct thpool_* thpool_p;
} thread;

// 线程池
typedef struct thpool_{
thread** threads; // 由指针组成的数组
volatile int num_threads_alive;
volatile int num_threads_working;
pthread_mutex_t thcount_lock; // 互斥锁,用于主线程计算线程数
pthread_cond_t threads_all_idle;
jobqueue jobqueue;
} thpool_;
1.3 重要接口的内部实现

线程池部分:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
struct thpool_* thpool_init(int num_threads){
// 1.Make new thread pool
// 2.Initialise the job queue
// 3.malloc threads in pool
// 4.each thread init, 每个线程调用thread_init(..)开始执行
}

int thpool_add_work(thpool_* thpool_p, void (*function_p)(void*), void* arg_p){
// 1.malloc new job
// 2.jobqueue_push(job)
}

void thpool_wait(thpool_* thpool_p){
// 1.lock the thcount_lock
// 2.若任务队列不为空或有线程正在执行任务,循环等待
// 同时设置条件变量 threads_all_idle,由于每个线程都会跟踪线程池
// 只要正在工作的线程数=0,将调用 pthread_cond_signal 通知 thpool_wait
// 3.unlock the thcount_lock
}


线程部分:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
static int thread_init (...){
// 1.malloc new thread
// 2.create the thread, use do_task(args) to finish the work
pthread_create(.., .., do_task, args);
// 3.detach the child thread and main thread
pthread_detach(..)
}

static void* do_task(struct thread* thread_p){
// 1.Register signal handler
// 2.Mark thread as alive (initialized)
lock the thcount_lock, and let num_threads_alive++, unlock
// 3.while loop, execute the task
lock the thcount_lock, and let num_threads_runnning++, unlock
job* j = jobqueue_pull();
j.func(args);
lock the thcount_lock, and let num_threads_runnning--, unlock
// 4. if num_threads_runnning == 0, call the main thread
// main thread will keep the thpool_wait()
pthread_cond_signal(&thpool_p->threads_all_idle);
// 5. Mark thread as finish
lock the thcount_lock, and let num_threads_alive--, unlock
}

2. Oceanbase创建线程池并启动任务

1. 使用线程组创建函数注册一个线程池

线程池注册函数:TG_DEF(id, name, type, args…)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/*
lib::TGDefIDs::id 将参数id传入作用域,获取该id对应的函数编号
宏定义的一些语法:A##B:连接AB字符串,#取字符串
*/
#define TG_DEF(id, name, type, args...) \
lib::create_funcs_[lib::TGDefIDs::id] = []() { \
auto ret = OB_NEW(TG_##type, SET_USE_500("tg"), args); \
ret->attr_ = {#name, TGType::type}; \
return ret; \
};
#include "share/ob_thread_define.h"
#undef TG_DEF

// type类别
enum class TGType
{
INVALID,
REENTRANT_THREAD_POOL,
THREAD_POOL,
TIMER,
QUEUE_THREAD,
DEDUP_QUEUE,
ASYNC_TASK_QUEUE,
MAP_QUEUE_THREAD
};

// 比如注册一个 ParallelDDLPool 线程池
TG_DEF(ParallelDDLPool, ParallelDDLPool, QUEUE_THREAD, 16, 2000)

为每个注册了的线程池赋予一个系统tg_id: lib::TGDefIDs::{id}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// #define TG_DEF(id, ...) id, 映射整个TG_DEF() 取其中id
// 相当于给各个id赋予一个编号,比如:enum {OB_START=-1, ParallelDDLPool, OB_END}
namespace lib {
namespace TGDefIDs {
enum OBTGDefIDEnum
{
OB_START = TGDefIDs::LIB_END - 1,
#define TG_DEF(id, ...) id,
#include "share/ob_thread_define.h"
#undef TG_DEF
OB_END,
};
}
}

2. 创建线程池类(基于 TGRunnableTGTaskHandler 接口)

  • TGRunnable:适合处理无参数允许的任务,比如开启一个定时器等;
    • 纯虚函数:virtual void run1() = 0;
  • TGTaskHandler:适合处理任何任务
    • 纯虚函数:virtual void handle(void *task) = 0;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
class ObParallelDDLOperator : public lib::TGTaskHandler {
public:
ObParallelDDLOperator();
~ObParallelDDLOperator();

public:
static int init();
virtual void handle(void *task) override;
static int push_task(void *task);
void stop();
void wait();
static void destroy();
static ObParallelDDLOperator &get_instance() { // 单例模式
return parallel_ddl_;
}
static inline ObArenaAllocator &get_allocator() {
return parallel_ddl_.allocator_;
}

private:
int tg_id_;
bool is_inited_;
ObArenaAllocator allocator_;
static ObParallelDDLOperator parallel_ddl_;
};
2.1 线程池的初始化

1、根据已注册线程池的id,生成系统 tg_id,同时映射上 <tg_id,tg_queue_thread*>

2、将已实现的基于 TGTaskHandler 的 parallel_ddl_ 交给线程池,并调用 tg_queue_thread->start();

3、为方便后面调试线程数量,将线程数解耦放入 global context,可在配置文件中自定义。

1
2
3
4
5
6
int ObParallelDDLOperator::init() 
{
TG_CREATE(lib::TGDefIDs::ParallelDDLPool, parallel_ddl_.tg_id_);
TG_SET_HANDLER_AND_START(parallel_ddl_.tg_id_, parallel_ddl_);
TG_SET_THREAD_CNT(parallel_ddl_.tg_id_, GCTX.parallel_ddl_thread_num_);
}
2.2 线程池的任务队列接口
1
2
3
4
int ObParallelDDLOperator::push_task(void *task) 
{
TG_PUSH_TASK(parallel_ddl_.tg_id_, task);
}
2.3 每个线程完成任务的接口

具体的任务工作可由task对应类型的实现类的 do_task() 接口完成

1
2
3
4
5
6
7
8
9
void ObParallelDDLOperator::handle(void *task) 
{
ObIDDLOperatorTask *ddl_task = static_cast<ObIDDLOperatorTask*>(task);
if (xxx == ddl_task->get_type()) {
auto *trans_task = static_cast<ObTransEndDDLOperatorTask*>(ddl_task);
trans_task->do_task();
}
free(ddl_task);
}

3. 定义 Task 基类,实现各类型 Task 需要完成操作的接口

ob决赛中有这样一个小情景:在创建系统租户的过程中,需要完成 1650 张系统表的创建,

在原本的实现中采用的是一张一张表的创建,效率非常低,因此我们的思路是通过创建若干线程建表,

在实际的实现过程中,我们发现每张表都在使用事务进行创建,而事务创建和销毁也有开销;

因此,我们最终的方案是通过 batch create 来实现,每个 batch 由一个事务构成。

1
2
3
4
5
// 基类
class ObIDDLOperatorTask {
public:
virtual ObDDLOperatorTaskType get_type() = 0;
};
3.1 实现类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 分批并行执行 create_table
class ObCreateTableBatchDDLOperatorTask : public ObDDLOperatorTask {
public:
ObCreateTableBatchDDLOperatorTask(ObDDLService *ddl_service, uint64_t tenant_id, CallbackFunc callback, ObIArray<ObTableSchema> &schemas, int64_t begin, int64_t end);
virtual ~ObCreateTableBatchDDLOperatorTask() override;
virtual int do_task() override;
virtual int do_task(ObDDLSQLTransaction *trans) override;
virtual ObDDLOperatorTaskType get_type() override;

private:
CallbackFunc callback_;
ObSArray<ObTableSchema> schemas_;
int64_t begin_;
int64_t end_;
};

构造函数:将任务所需的所有参数传入

回调函数:在这里调用的是下面的主线程原子操作:ObBootstrap::inc_finish_ddl_task_cnt(),目的是每创建一个任务,主线程记录的任务数量 + 1,最后主线程只需通过返回的 get_finish_ddl_task_cnt() 来判断是否继续进行下去。

话说,这里其实可以使用条件变量来实现,不用这么麻烦(之后有时间再说)

do_task():综合各个参数,创建事务,执行建表指令

3.2 在服务逻辑中构造任务队列,使用线程池
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 1.单例模式,先获取线程池
ObParallelDDLOperator &parallel_ddl_operator = ObParallelDDLOperator::get_instance();

// 2.划分数据的区间,每个区间构造一个任务,push进线程池
for (int64_t i = 0; i < batch_count; ++i) {
int64_t left = i * batch_size;
int64_t right = left + batch_size - 1;
if (right >= table_schemas.count()) {
right = table_schemas.count()-1;
}

// push the task into thread pool
ObDDLOperatorTask *task = new ObCreateTableBatchDDLOperatorTask(&ddl_service, OB_SYS_TENANT_ID, &ObBootstrap::inc_finish_ddl_task_cnt, table_schemas, left, right + 1);
parallel_ddl_operator.push_task(task);
ObBootstrap::inc_start_ddl_task_cnt();
}

使用原子操作等待所有任务的执行完毕,主线程才将继续

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 等待所有任务执行完毕
const int64_t start_time = ObTimeUtility::current_time();
while (OB_SUCC(ret) && ObBootstrap::get_finish_ddl_task_cnt() < ObBootstrap::get_start_ddl_task_cnt()) {
if (ObTimeUtility::current_time() - start_time > WAIT_CREATE_SCHEMAS_TIMEOUT_US) {
ret = OB_TIMEOUT;
break;
}
}

// 几个主线程等待的原子操作
int64_t ObBootstrap::finish_ddl_task_cnt = 0;

void ObBootstrap::inc_finish_ddl_task_cnt()
{
ATOMIC_INC(&finish_ddl_task_cnt);
}
int64_t ObBootstrap::get_finish_ddl_task_cnt()
{
return ATOMIC_LOAD(&finish_ddl_task_cnt);
}

3. oceanbase线程池的实现原理

在前面的例子里, 以 tg_queue_thread 为例,从上层向底层探索:

上层的线程池类型实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
// 1. 使用ITG作为基类,设置基本的接口
class ITG {
virtual int thread_cnt() = 0;
virtual int set_thread_cnt(int64_t) = 0;
virtual int start() = 0;
virtual void stop() = 0;
virtual void wait() = 0;
virtual int push_task(void *task);
virtual int set_handler(TGTaskHandler &handler);
virtual int wait_task(const common::ObTimerTask &task);
}

// 2.作为一种线程池实现类型,为上层提供接口:set_handler(TGTaskHandler &handler)
// 与 TG_THREAD_POOL 的区别在于,后者接口主要服务于无参数的任务:TGRunnable
class TG_QUEUE_THREAD : public ITG
{
public:
TG_QUEUE_THREAD(ThreadCountPair pair, const int64_t task_num_limit);
~TG_QUEUE_THREAD() { destroy(); }
int thread_cnt() override { return (int)thread_num_; }

int set_handler(TGTaskHandler &handler)
{
int ret = common::OB_SUCCESS;
uint64_t tenant_id = get_tenant_id();
if (qth_ != nullptr) {
ret = common::OB_ERR_UNEXPECTED;
} else {
qth_ = new (buf_) MySimpleThreadPool();
qth_->handler_ = &handler;
qth_->set_run_wrapper(tg_helper_);
ret = qth_->init(thread_num_, task_num_limit_, attr_.name_, tenant_id);
}
return ret;
}

int start() override
{
int ret = common::OB_SUCCESS;
return ret;
}

void wait() override
{
if (qth_ != nullptr) {
qth_->wait();
destroy();
}
}

int push_task(void *task) override
{
int ret = common::OB_SUCCESS;
if (OB_ISNULL(qth_)) {
ret = common::OB_ERR_UNEXPECTED;
} else {
ret = qth_->push(task);
}
return ret;
}

private:
char buf_[sizeof(MySimpleThreadPool)];
MySimpleThreadPool *qth_ = nullptr; // 继承自 ObSimpleThreadPool
int64_t thread_num_;
int64_t task_num_limit_;
};

继续往下分析,ObSimpleThreadPool 是整个ob线程池实现模型的核心环节

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/*
init:初始化元数据,开启线程池;
*/
int ObSimpleThreadPool::init(...)
{
queue_.init(task_num_limit, name, tenant_id);
set_thread_count(thread_num);
lib::ThreadPool::start();
}

void ObSimpleThreadPool::run1()
{
/*
使用 ObAdaptiveStrategy 策略,并使用 ATOMIC_STORE 原子存储 total_thread_num_ 和 active_thread_num_;
如果当前有线程空闲,对任务队列执行pop操作,用上层传入的handler执行这个任务;
比如:ObParallelDDLOperator有用户实现的自定义handler,执行具体的任务。
*/
}

继续往下,分析线程池中线程的调度:

每个线程其实封装了worker,在当前线程的生命周期内,可以通过线程本地存储TLS获取到这个worker对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
int Threads::start() {
// 1.在检查租户上下文完备后,内存分配一个 threads_ 动态数组
// 2.每一个thread使用 Threads::create_thread -> Thread::start()
}

void Threads::run(int64_t idx)
{
ObTLTaGuard ta_guard(GET_TENANT_ID() ?:OB_SERVER_TENANT_ID);
thread_idx_ = static_cast<uint64_t>(idx);
Worker worker;
Worker::set_worker_to_thread_local(&worker);
run1();
}

最底层线程的实现,本质就是调用 POSIX 中关于 pthread 的各类接口,并做一定的封装

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/*
pthread_create(
pthread_t pointer,
attr of new thread,
the main execute function pointer,
args which will send to main function)
*/
int Thread::start() {
pret = pthread_create(&pth_, &attr, __th_start, this);
ATOMIC_FAA(&total_thread_count_, -1);
}

void Thread::dump_pth() // 可以用于 debug pthread join failed

void Thread::wait() {
pthread_join(pth_, nullptr);
}

void* Thread::__th_start(void *arg) {
// 主要工作:设置线程上下文、元数据,并调用 th->run(); 执行 Threads::run
}

4. 复习:

4.1 条件变量 condition variable

作用:条件变量通过将等待线程挂起,直到条件满足主线程才继续,而不是通过一直循环检查条件,从而可以有效避免忙等,节省 CPU 资源。

1、多线程中用于线程间同步与通信的一种机制;

2、允许一个线程A休眠,直到等待另一个线程B满足特定条件时,A才再继续执行。

3、在不用条件变量时,线程A等待线程B满足一个条件C时,若C迟迟不能满足,A将一直使用CPU资源进行轮询,直到看到满足条件C时,不再轮询,A继续执行:这导致空耗CPU资源。

4.2 信号量
1
2
3
4
5
6
/* Binary semaphore */
typedef struct bsem {
pthread_mutex_t mutex;
pthread_cond_t cond;
int v;
} bsem;

基本操作:

1
2
3
4
5
6
7
8
9
10
static void  bsem_init(struct bsem *bsem_p, int value);
static void bsem_reset(struct bsem *bsem_p);

// V操作:设置信号量为1,通知其他线程,有资源可用
static void bsem_post(struct bsem *bsem_p);
static void bsem_post_all(struct bsem *bsem_p);

// P操作:如果信号量为 1,表示资源可用,线程将 v=v-1,继续执行
// 如果为 0,表示资源不可用,线程阻塞等待条件变量
static void bsem_wait(struct bsem *bsem_p);

Reference:

[1] https://github.com/mtrebi/thread-pool

[2] https://github.com/Pithikos/C-Thread-Pool

[3] https://open.oceanbase.com/train?questionId=600006


从线程池模型看 Oceanbase 线程池的应用和实现
https://yanghy233.github.io/2024/01/16/thread-pool-learning/
作者
yanghy
发布于
2024年1月16日
许可协议