1. thread pool 简介

MariaDB 共有三种线程调度方式

  • one-thread-per-connection 每个连接一个线程

  • no-threads 所有连接共用一个线程

  • pool-of-threads 线程池

no-threads 只适用于简单的系统,并发数稍高性能就会严重下降

one-thread-per-connection 在多数情况下性能优良,是个合适的选择,生产系统也常用此配置。但在高并发、短连接的业务场景下,使用 one-thread-per-connection 会频繁得创建和销毁线程,严重影响性能

pool-of-threads 适用于高并发短连接的业务场景,线程复用,避免频繁创建和销毁线程带来的性能损耗

MariaDB 的 thread pool 在 win 和 unix 系统的实现不同,本文分析 unix 系统下的实现

thread pool 由若干个 thread_group 组成,每个 thread_group 有若干个 worker 线程,和 0~1 个 listenr 线程

server 接收到连接请求时,将这个连接分配给一个 group 处理,listener 线程负责监听请求,worker 线程处理请求内容

2.代码概览

struct scheduler_functions 内部声明了连接建立相关的属性和方法,这里使用函数指针实现多态

每种线程调度方式,分别实例化一个struct scheduler_functions,给相关变量和函数指针赋值

thread pool 相关实现在 sql/threadpool_common.cc 和 sql/threadpool_unix.cc 中

  1. sql/scheduler.h
  2. struct scheduler_functions
  3. {
  4. uint max_threads, *connection_count;
  5. ulong *max_connections;
  6. bool (*init)(void);
  7. bool (*init_new_connection_thread)(void);
  8. void (*add_connection)(THD *thd);
  9. void (*thd_wait_begin)(THD *thd, int wait_type);
  10. void (*thd_wait_end)(THD *thd);
  11. void (*post_kill_notification)(THD *thd);
  12. bool (*end_thread)(THD *thd, bool cache_thread);
  13. void (*end)(void);
  14. };
  15. -------------------------------------------------
  16. sql/threadpool_common.cc
  17. static scheduler_functions tp_scheduler_functions=
  18. {
  19. 0, // max_threads
  20. NULL,
  21. NULL,
  22. tp_init, // init
  23. NULL, // init_new_connection_thread
  24. tp_add_connection, // add_connection
  25. tp_wait_begin, // thd_wait_begin
  26. tp_wait_end, // thd_wait_end
  27. post_kill_notification, // post_kill_notification
  28. NULL, // end_thread
  29. tp_end // end
  30. };
  • tp_init

初始化 thread_group

  1. main
  2. mysqld_main
  3. network_init
  4. /* 调用相应 thread_scheduler 下的 init 函数,这里是 tp_init */
  5. MYSQL_CALLBACK_ELSE(thread_scheduler, init, (), 0)
  6. tp_init
  7. /* thread_group_t 申请内存 */
  8. all_groups= (thread_group_t *)
  9. my_malloc(sizeof(thread_group_t) * threadpool_max_size, MYF(MY_WME|MY_ZEROFILL));
  10. /* 初始化 thread_group_t 数组 */
  11. for (uint i= 0; i < threadpool_max_size; i++)
  12. {
  13. thread_group_init(&all_groups[i], get_connection_attrib());
  14. }
  15. /* 设置最大执行时间,worker thread */
  16. pool_timer.tick_interval= threadpool_stall_limit;
  17. /* 开启 timer thread*/
  18. start_timer(&pool_timer);
  • tp_add_connection

创建 connection,将 connection 分配到一个 thread_group,插入 group 队列

唤醒或创建一个 worker thread,如果 group 中没有活跃的 worker thread

  1. main
  2. mysqld_main
  3. handle_connections_sockets
  4. create_new_thread
  5. /* 调用相应 thread_scheduler 下的 add_connection 函数,这里是 tp_add_connection */
  6. MYSQL_CALLBACK(thd->scheduler, add_connection, (thd));
  7. tp_add_connection
  8. /* 申请一个 connection */
  9. connection_t *connection= alloc_connection(thd);
  10. /* round-robin 映射到一个 group */
  11. thread_group_t *group=
  12. &all_groups[thd->thread_id%group_count];
  13. queue_put(group, connection)
  14. /* connection 插入队列 */
  15. thread_group->queue.push_back(connection);
  16. /*
  17. 如果没有活跃 woker 线程,唤醒一个处理新连接
  18. 此时连接尚未 login,认证等工作在 worker 线程中完成
  19. */
  20. if (thread_group->active_thread_count == 0)
  21. wake_or_create_thread(thread_group);
  22. wake_or_create_thread
  23. /* 取出 thread_group->waiting_threads 中的 waiting thread (如有) */
  24. if (wake_thread(thread_group) == 0)
  25. DBUG_RETURN(0);
  26. /*
  27. 没有活跃线程时,立即创建一个,在其他地方也有这个逻辑
  28. 保证每个 thread_group 只是存在一个活跃线程
  29. */
  30. if (thread_group->active_thread_count == 0)
  31. DBUG_RETURN(create_worker(thread_group));
  32. /*
  33. 对创建新的 woker thread 限流,距离上个线程创建间隔一定时间才允许再次创建 worker thread
  34. 这个 group 线程总数越多,要求的间隔越长
  35. 4线程以下间隔为0,大于等于 4/8/16 线程时,要求间隔分别为 50*1000/100*1000/200*1000 微秒
  36. */
  37. if (time_since_last_thread_created >
  38. microsecond_throttling_interval(thread_group))
  39. {
  40. DBUG_RETURN(create_worker(thread_group));
  41. }
  • worker_main

从 queue 中取出和处理 event

在 group 没有 listener 时会变成 listener

没取到 event 时,进入休眠状态,等待被唤醒

最后进入休眠状态的 worker thread 最先被唤醒

  1. worker_main
  2. for(;;)
  3. {
  4. connection = get_event(&this_thread, thread_group, &ts);
  5. /* 没有 event 时,跳出循环,结束 worker thread */
  6. if (!connection)
  7. break;
  8. handle_event(connection);
  9. }
  10. get_event
  11. for(;;)
  12. {
  13. /*
  14. 取出 tp_add_connection/listener 插入的 connection
  15. */
  16. connection = queue_get(thread_group);
  17. /* 取到 connection,跳出循环,进入下一步 handle_event */
  18. if(connection)
  19. break;
  20. /* 如果没有 listener thread,这个线程变为 listener */
  21. if(!thread_group->listener)
  22. {
  23. /* listener 变为 worker 处理一个 connection */
  24. connection = listener(current_thread, thread_group);
  25. /* listener 已变为 worker */
  26. thread_group->listener= NULL;
  27. /* 跳出循环,进入 handle_event 处理 connection */
  28. break;
  29. }
  30. /* 进入休眠状态前,尝试非阻塞方式获取一个 connection */
  31. if (io_poll_wait(thread_group->pollfd,&nev,1, 0) == 1)
  32. {
  33. thread_group->io_event_count++;
  34. connection = (connection_t *)native_event_get_userdata(&nev);
  35. break;
  36. }
  37. /*
  38. 进入休眠状态
  39. 最后休眠的 worker thread 最先被唤醒,更容易命中 cache
  40. */
  41. current_thread->woken = false;
  42. thread_group->waiting_threads.push_front(current_thread);
  43. /* 等待被唤醒 */
  44. mysql_cond_wait
  45. }
  46. /* 返回获取到的 connection */
  47. DBUG_RETURN(connection);
  48. handle_event
  49. if (!connection->logged_in)
  50. {
  51. /* login connection */
  52. err= threadpool_add_connection(connection->thd);
  53. login_connection
  54. connection->logged_in= true;
  55. }
  56. else
  57. {
  58. /* 处理 connection 请求 */
  59. err= threadpool_process_request(connection->thd);
  60. do_command
  61. }
  62. /*
  63. 告诉客户端可以读
  64. 可能会变动 connection 所属的 group
  65. */
  66. err= start_io(connection);
  67. /*
  68. group_count 允许动态改变,所以处理 connection 的 group 可能发生变动
  69. 这里检查 group 是否需要变动
  70. */
  71. thread_group_t *group =
  72. &all_groups[connection->thd->thread_id%group_count];
  73. if (group != connection->thread_group)
  74. change_group(connection, connection->thread_group, group)
  75. return io_poll_start_read(group->pollfd, fd, connection);
  • listener

listener 线程通过 epoll_wait 监听 group 关联的描述符,epoll使用一个文件描述符管理多个描述符

监听获得的 event 插入队列,如果插入前队列为空,listener 变成 worker 线程处理第一个event,其余插入队列,否则所有 event 插入队列

如果 group 中没有活跃线程,唤醒或者创建一个 worker 线程

这里考虑一种情况,如果监听只获得一个 event 且队列为空,那么这个 event 将会被 listener 处理,队列中不会插入新的 event,此时只需要 listener 变成 worker 线程处理 event,不需要再唤醒其他 worker 线程

  1. listener
  2. for(;;)
  3. {
  4. /* 监听事件 */
  5. cnt = io_poll_wait(thread_group->pollfd, ev, MAX_EVENTS, -1);
  6. /* 如果队列为空,listener 处理第一个事件,一定概率(只有一个事件)可以不再唤醒 worker thread */
  7. bool listener_picks_event= thread_group->queue.is_empty();
  8. /* 第一个事件留给 listener 处理,其余放入队列,或者全部放入队列 */
  9. for(int i=(listener_picks_event)?1:0; i < cnt ; i++)
  10. {
  11. connection_t *c= (connection_t *)native_event_get_userdata(&ev[i]);
  12. thread_group->queue.push_back(c);
  13. }
  14. if (listener_picks_event)
  15. {
  16. /* Handle the first event. */
  17. retval= (connection_t *)native_event_get_userdata(&ev[0]);
  18. mysql_mutex_unlock(&thread_group->mutex);
  19. break;
  20. }
  21. /*
  22. 队列中已经有一些 event,但是没有活跃线程
  23. 唤醒一个 worker 线程处理队列中 event
  24. 唤醒失败,并且仅有一个线程(仅有listener),创建一个 worker 线程
  25. */
  26. if(thread_group->active_thread_count==0)
  27. {
  28. if(wake_thread(thread_group))
  29. {
  30. if(thread_group->thread_count == 1)
  31. create_worker(thread_group);
  32. }
  33. }
  34. }
  35. DBUG_RETURN(retval);
  • timer_thread

检查 listener、worker thread 的运行情况和 connection 超时情况,每隔 thread_pool_stall_limit 检查一次

如果 listener 不存在,且检查周期内没有监听到新的 event,则创建一个 worker thread(自动变成 listener)

如果没有活跃线程(运行状态的 worker thread),且检查周期内没有处理 event,创建一个 worker thread

关闭长时间空闲的 connection

  1. timer_thread
  2. for(;;)
  3. {
  4. /*
  5. 等待一个 tick_interval,即 thread_pool_stall_limit
  6. 正常情况下回等待至超时,只有 stop_timer 会发出 timer->cond 信号
  7. */
  8. set_timespec_nsec(ts,timer->tick_interval*1000000);
  9. err= mysql_cond_timedwait(&timer->cond, &timer->mutex, &ts);
  10. /* ETIMEDOUT 等待超时,代表 timer 没有被 shutdown */
  11. if (err == ETIMEDOUT)
  12. {
  13. timer->current_microtime= microsecond_interval_timer();
  14. /* 遍历 thread_group,查看是否 stall */
  15. for (i= 0; i < threadpool_max_size; i++)
  16. {
  17. if(all_groups[i].connection_count)
  18. check_stall(&all_groups[i]);
  19. }
  20. /* 关闭空闲连接 */
  21. if (timer->next_timeout_check <= timer->current_microtime)
  22. timeout_check(timer);
  23. }
  24. }
  25. check_stall
  26. /*
  27. thread_group->io_event_count 代表插入队列的 event 数量
  28. thread_group 没有 listener,且没有 event 在队列中,这个检查周期内 group 没有监听连接信息
  29. 唤醒或者创建一个 worker 线程,这个线程会自动成为 listener
  30. */
  31. if (!thread_group->listener && !thread_group->io_event_count)
  32. {
  33. wake_or_create_thread(thread_group);
  34. return;
  35. }
  36. /* Reset io event count */
  37. thread_group->io_event_count= 0;
  38. /*
  39. thread_group->queue_event_count 代表已经出队的 event 数量
  40. 队列非空,并且出队 event 数量为0,这个检查周期内 worker thread 没有处理 event
  41. 唤醒或创建一个 worker thread, thread_group 标记为 stalled, 下个 event 出队时 reset stalled标记
  42. */
  43. if (!thread_group->queue.is_empty() && !thread_group->queue_event_count)
  44. {
  45. thread_group->stalled= true;
  46. wake_or_create_thread(thread_group);
  47. }
  48. /* Reset queue event count */
  49. thread_group->queue_event_count= 0;
  • tp_wait_begin/tp_wait_end

线程进入阻塞状态前/阻塞状态结束后,分别调用者两个函数汇报状态,用于维护 active_thread_count 即活跃线程数量

tp_wait_begin 将活跃线程数 -1,活跃线程为 0 时,唤醒或创建一个线程

tp_wait_eng 将活跃线程数 +1

这里需要注意,不是所有的等待都需要调用 tp_wait_begin,预期内短时间结束的等待,比如 mutex,可以不调用 tp_wait_begin

行锁或者磁盘IO这种长时间的等待,需要调用 tp_wait_begin,汇报这个线程暂时无法处理请求,需要开启新的线程

线程池和连接池

线程池是在 server 端的优化,避免频繁的创建和销毁线程

连接池是在 client 端的优化,减少连接创建时间,节省资源

连接池和线程池是两个不同的概念,可以同时使用