重要数据结构

image.png

  1. Rpl_info 的基类,保存了一些错误信息,如 IO/SQL thread last error
  2. class Slave_reporting_capability
  3. {
  4. // 获取last error
  5. Error const& last_error() const { return m_last_error; }
  6. }
  7. Master_info Relay_log_info 的基类,很多重要的锁和信号量都在这里
  8. class Rpl_info : public Slave_reporting_capability
  9. {
  10. /*
  11. 为了避免死锁,需要按照以下顺序加锁
  12. run_lock, data_lock, relay_log.LOCK_log, relay_log.LOCK_index
  13. run_lock, sleep_lock
  14. run_lock, info_thd_lock
  15. info_thd_lock 保护对 info_thd 的操作
  16. 读操作需要获取 info_thd_lock 或 run_lock
  17. 写操作需要获取 info_thd_lock 和 run_lock
  18. data_lock : 保护对数据的读写
  19. run_lock : 保护运行状态,变量 slave_running, slave_run_id
  20. sleep_lock: 对slave_sleep做互斥,防止同时进入多个slave_sleep
  21. */
  22. mysql_mutex_t data_lock, run_lock, sleep_lock, info_thd_lock;
  23. /*
  24. data_cond: data_lock 保护的数据被修改时发出此信号楼,只有 Relay_log_info 会使用
  25. start_cond: sql thread start (start slave 先启动 io thread,后启动 sql thread,sql thread 启动代表全部启动成功)
  26. stop_cond: sql/io thread stop
  27. sleep_cond: slave被kill,目前只发现5.6中有应用,5.7 中没找到发出此信号量的代码
  28. */
  29. mysql_cond_t data_cond, start_cond, stop_cond, sleep_cond;
  30. /*
  31. 关联 io/sql/worker thread 的 thd
  32. Master_info 对应 io thread
  33. Relay_log_info 对应 sql thread
  34. Slave_worker 对应 worker thread
  35. */
  36. THD *info_thd;
  37. /* 已初始化 */
  38. bool inited;
  39. /* 已终止slave */
  40. volatile bool abort_slave;
  41. /*
  42. 当前slave运行状态,io thread 即 mi->slave_running有三种状态
  43. MYSQL_SLAVE_NOT_RUN 0
  44. MYSQL_SLAVE_RUN_NOT_CONNECT 1
  45. MYSQL_SLAVE_RUN_CONNECT 2
  46. sql thread 即 mi->slave_running 只有 0 1两种状态
  47. */
  48. volatile uint slave_running;
  49. /* 用于判断slave thread 是否启动的变量,每次启动时递增1 */
  50. volatile ulong slave_run_id;
  51. /* repository's handler */
  52. Rpl_info_handler *handler;
  53. /*
  54. 唯一标识一条 info 记录的 id,标识一行或一个文件
  55. Master_info 和 Relay_log_info 可以通过 channel 标识唯一
  56. Worker_info 需要通过 {id, channel} 标识唯一
  57. */
  58. uint internal_id;
  59. /* 通道名,多源复制使用 */
  60. char channel[CHANNEL_NAME_LENGTH+1];
  61. /* 实现了两个虚函数,读写 Rpl_info */
  62. virtual bool read_info(Rpl_info_handler *from)= 0;
  63. virtual bool write_info(Rpl_info_handler *to)= 0;
  64. }
  65. /*
  66. Master_info 用户 IO thread
  67. 主要保存以下信息:
  68. 连接到Master的用户信息
  69. 当前 master log name
  70. 当前 master log offset
  71. 一些其他控制变量
  72. Master_info 读取 master.info repository 初始化,通常是表或文件
  73. 通过函数 mi_init_info() 进行初始化
  74. 调用 flush_info() 可以将 master.info 写入磁盘,每次从master读取数据都需要刷盘
  75. */
  76. class Master_info : public Rpl_info
  77. {
  78. /* 前面保存了 user、host 等连接信息 */
  79. /* 下面是连接信息的 set/get 函数 */
  80. /* 和 master 的连接 */
  81. MYSQL* mysql;
  82. /* 对应的 Relay_log_info */
  83. Relay_log_info *rli;
  84. /* IO 线程复制延迟 */
  85. long clock_diff_with_master;
  86. /* 心跳间隔 */
  87. float heartbeat_period; // interface with CHANGE MASTER or master.info
  88. /* 收到心跳次数 */
  89. ulonglong received_heartbeats; // counter of received heartbeat events
  90. /* 上次心跳时间 */
  91. time_t last_heartbeat;
  92. /* 上次收到event的时间 */
  93. time_t last_recv_event_timestamp;
  94. /* 忽略复制的server_id */
  95. Server_ids *ignore_server_ids;
  96. /* master server_id */
  97. ulong master_id;
  98. /* FORMAT_DESCRIPTION_EVENT前的checksum 算法 */
  99. binary_log::enum_binlog_checksum_alg checksum_alg_before_fd;
  100. /* 初始化 Master_info, 里面调用read_info() 从 Rpl_info_handler 中读取信息 */
  101. int mi_init_info();
  102. /* 清理 Master_info,里面会调用 Rpl_info_handler 的 eng_info() */
  103. void end_info();
  104. /* Master_info 信息落盘,每次从主库读取数据后都会执行 */
  105. int flush_info(bool force= FALSE);
  106. /*
  107. 从master收到的 Format_description_log_event 写在 relay log 末尾
  108. IO thread 开始时创建,IO thread 结束时销毁
  109. IO thread 收到一个 Format_description_log_event 时更新
  110. 每次rotate时,IO thread 写Format_description_log_event到新relay log
  111. 每次执行FLUSH LOGS,client 写Format_description_log_event到新relay log
  112. */
  113. Format_description_log_event *mi_description_event;
  114. /* 最近一个GTID,可能是未完成事务的GTID,用于事务结束时写入 Retrieved_Gtid_Set */
  115. Gtid last_gtid_queued;
  116. /* 用于判断事务边界 */
  117. Transaction_boundary_parser transaction_parser;
  118. /*
  119. channel lock,以下操作需要持有写锁
  120. START SLAVE;
  121. STOP SLAVE;
  122. CHANGE MASTER;
  123. RESET SLAVE;
  124. end_slave();
  125. */
  126. Checkable_rwlock *m_channel_lock;
  127. /* channel 被引用的次数,只有为0时可以删除channel */
  128. Atomic_int32 references;
  129. }
  1. /*
  2. 主要保存以下信息:
  3. 当前relay log
  4. 当前relay log offset
  5. master log name
  6. 与上次更新对应的主库日志序列
  7. sql thread 其他信息
  8. 初始化过程和 Master_info 类似
  9. 以下情况下 relay.info table/file 需要更新:
  10. 1. relay log file rotated
  11. 2. SQL thread stopped
  12. 3. while processing a Xid_log_event
  13. 4. after a Query_log_event(commit or rollback)
  14. 5. after processing any statement written to the binary log without a transaction context.
  15. 并行复制相关代码留作以后分析,本次暂不涉及
  16. */
  17. class Relay_log_info : public Rpl_info
  18. {
  19. /* 备份状态标志位,用于标志是否在语句中 */
  20. enum enum_state_flag {
  21. /* 在语句中 */
  22. IN_STMT,
  23. /** Flag counter. Should always be last */
  24. STATE_FLAGS_COUNT
  25. };
  26. /* 是否复制相同server_id的event,一般是false */
  27. bool replicate_same_server_id;
  28. /* 正在执行或最后一个执行的GTID,用来填充 performance_schema.replication_applier_status_by_worke 的 last_seen_transaction 列
  29. */
  30. Gtid_specification currently_executing_gtid;
  31. /* 读取下面变量时,必须受data_lock保护 */
  32. /* 当前relay log的文件描述符 */
  33. File cur_log_fd;
  34. /* reay_log 对象,MYSQL_BIN_LOG类留作以后分析*/
  35. MYSQL_BIN_LOG relay_log;
  36. /* 主要用于查询log_pos */
  37. LOG_INFO linfo;
  38. /*
  39. cur_log 指向 relay_log.get_log_file() 或者 cache_buf
  40. 取决于relay_log_file是热日志,还是需要打开冷日志
  41. cache_buf 在打开冷日志时适用
  42. */
  43. IO_CACHE cache_buf,*cur_log;
  44. /* 标识是否正在recovery */
  45. bool is_relay_log_recovery;
  46. /* 下面的变量可以不加锁读 */
  47. /*
  48. restart slave 时需要访问临时表。
  49. 这个变量值在 init/end 时修改
  50. SQL thread 只读
  51. */
  52. TABLE *save_temporary_tables;
  53. /* 对应的 Master_info */
  54. Master_info *mi;
  55. /* 打开临时表的数量 */
  56. Atomic_int32 channel_open_temp_tables;
  57. /* 保存 relay_log.get_open_count() */
  58. uint32 cur_log_old_open_count;
  59. /* init_info() 曾经失败,RESET SLAVE 可以修复错误 */
  60. bool error_on_rli_init_info;
  61. /* 这里跳过一些 Group replication 相关变量 */
  62. /* received gtid set */
  63. Gtid_set gtid_set;
  64. /* 标识此对象是否属于SQL线程(属于SQL线程为0) */
  65. bool rli_fake;
  66. /* 标识 retrieved GTID set 是否已被初始化 */
  67. bool gtid_retrieved_initialized;
  68. /* 上一个错误的GTID */
  69. Gtid last_sql_error_gtid;
  70. /* 日志空间限制,日志空间总量(用于sys_var:relay_log_space,控制relay log空间) */
  71. ulonglong log_space_limit,log_space_total;
  72. /* 是否忽略日志空间限制 */
  73. bool ignore_log_space_limit;
  74. /* 需要清理空间时,SQL线程指示IO线程rotate logs */
  75. bool sql_force_rotate_relay;
  76. /* 上次主库记录binlog的时间 */
  77. time_t last_master_timestamp;
  78. /* 上次执行event的时间 */
  79. time_t last_exec_event_timestamp;
  80. /* 跳过error event */
  81. volatile uint32 slave_skip_counter;
  82. /* 标记是否需要中断pos_wait,change master 和 reset slave时需要中断 */
  83. volatile ulong abort_pos_wait;
  84. /* log_space 相关信号量*/
  85. mysql_mutex_t log_space_lock;
  86. mysql_cond_t log_space_cond;
  87. /* 这里有一些 START SLAVE UNTIL 相关变量 */
  88. /* 重试事务次数(trans_retries是重试次数上限),重试事务计数(retried_trans记录重试了多少次) */
  89. ulong trans_retries, retried_trans;
  90. /*
  91. 延迟复制时间
  92. CHANGE MASTER TO MASTER_DELAY=X.
  93. 由data_lock保护, SQL thread 读取
  94. SQL thread 运行时该变量不可写
  95. */
  96. time_t sql_delay;
  97. /* sql_delay 结束时间 */
  98. time_t sql_delay_end;
  99. /* enum_state_flag 的标志位 */
  100. uint32 m_flags;
  101. }

函数分析

mysql_execute_command() 当做入口开始分析,可以看出change_master需要SUPER权限

  1. case SQLCOM_CHANGE_MASTER:
  2. {
  3. if (check_global_access(thd, SUPER_ACL))
  4. goto error;
  5. res= change_master_cmd(thd);
  6. break;
  7. }
  8. /*
  9. 函数具备以下功能
  10. 更改接收/执行日志的配置/位点
  11. purge relay log
  12. 删除 worker info(并行复制使用)
  13. */
  14. /* 分析 change_master 函数会略过一部分逻辑 */
  15. int change_master(THD* thd, Master_info* mi, LEX_MASTER_INFO* lex_mi,
  16. bool preserve_logs)
  17. {
  18. /*
  19. 如果SQL thread 和 IO thread已经停止,并且没有指定 relay_log_pos和relay_log_file
  20. 会purge relay log
  21. */
  22. bool need_relay_log_purge= 1;
  23. /* 为了修改 mysql.slave_master_info,需要无视read_only和super_read_only */
  24. thd->set_skip_readonly_check();
  25. /* channel 加读写锁,即将对channel做修改,函数结束时才会释放锁 */
  26. mi->channel_wrlock();
  27. /*
  28. 对 mi->run_lock 和 rli->run_lock 加锁
  29. 防止线程运行状态发生变化
  30. */
  31. lock_slave_threads(mi);
  32. /* 设置thread_mask,用来标识 IO/SQL thread 的运行状态 */
  33. init_thread_mask(&thread_mask, mi, 0);
  34. /* 设置auto_position=1需要IO/SQL thread 都不在运行状态,否则报错退出 */
  35. if (thread_mask)
  36. {
  37. if (lex_mi->auto_position != LEX_MASTER_INFO::LEX_MI_UNCHANGED)
  38. {
  39. error= ER_SLAVE_CHANNEL_MUST_STOP;
  40. my_error(ER_SLAVE_CHANNEL_MUST_STOP, MYF(0), mi->get_channel());
  41. goto err;
  42. }
  43. /* 如果 SQL thread 和 IO thread 没有全部停止,不能purge relay log */
  44. need_relay_log_purge= 0;
  45. }
  46. /*
  47. 下面是一些错误判断,都是很明显的错误
  48. 1. 如果设置了auto_position,同时又指定了复制位点,如 relay_log_pos,报错退出
  49. 2. auto_position 需要 GTID_MODE != OFF
  50. 3. IO thread 运行时不能改变 IO thread 相关配置
  51. 4. SQL thread 运行时不能改变 SQL thread 相关配置
  52. 5. 如果指定了master_host,那么master_host不能是空串
  53. */
  54. /* 记录当前状态 */
  55. THD_STAGE_INFO(thd, stage_changing_master);
  56. /* 标识停止的线程,给load_mi_and_rli_from_repositories()使用 */
  57. init_thread_mask(&thread_mask_stopped_threads, mi, 1);
  58. /*
  59. 从仓库加载 mi 和 rli 的配置
  60. 只有停止状态的线程可以加载配置(SQL thread 对应 rli,IO thread 对应 mi)
  61. */
  62. if (load_mi_and_rli_from_repositories(mi, false, thread_mask_stopped_threads))
  63. {
  64. error= ER_MASTER_INFO;
  65. my_message(ER_MASTER_INFO, ER(ER_MASTER_INFO), MYF(0));
  66. goto err;
  67. }
  68. /*
  69. 修改mi相关配置,并保存老配置
  70. save_ 变量中保存的老配置用于打印日志
  71. */
  72. if (have_receive_option)
  73. {
  74. strmake(saved_host, mi->host, HOSTNAME_LENGTH);
  75. strmake(saved_bind_addr, mi->bind_addr, HOSTNAME_LENGTH);
  76. saved_port= mi->port;
  77. strmake(saved_log_name, mi->get_master_log_name(), FN_REFLEN - 1);
  78. saved_log_pos= mi->get_master_log_pos();
  79. if ((error= change_receive_options(thd, lex_mi, mi)))
  80. {
  81. goto err;
  82. }
  83. }
  84. /* 打印日志,change master 的源值和目标值 */
  85. if (have_receive_option)
  86. sql_print_information("'CHANGE MASTER TO%s executed'. "
  87. "Previous state master_host='%s', master_port= %u, master_log_file='%s', "
  88. "master_log_pos= %ld, master_bind='%s'. "
  89. "New state master_host='%s', master_port= %u, master_log_file='%s', "
  90. "master_log_pos= %ld, master_bind='%s'.",
  91. mi->get_for_channel_str(true),
  92. saved_host, saved_port, saved_log_name, (ulong) saved_log_pos,
  93. saved_bind_addr, mi->host, mi->port, mi->get_master_log_name(),
  94. (ulong) mi->get_master_log_pos(), mi->bind_addr);
  95. /* 修改rli相关配置 */
  96. if (have_execute_option)
  97. change_execute_options(lex_mi, mi);
  98. /* 持久化master_info */
  99. if ((thread_mask & SLAVE_IO) == 0 && flush_master_info(mi, true))
  100. {
  101. error= ER_RELAY_LOG_INIT;
  102. my_error(ER_RELAY_LOG_INIT, MYF(0), "Failed to flush master info file");
  103. goto err;
  104. }
  105. if ((thread_mask & SLAVE_SQL) == 0)
  106. {
  107. /* 记录全局变量 relay_log_purge */
  108. bool save_relay_log_purge= relay_log_purge;
  109. if (need_relay_log_purge)
  110. {
  111. const char* errmsg= 0;
  112. /* purge relay logs */
  113. relay_log_purge= 1;
  114. THD_STAGE_INFO(thd, stage_purging_old_relay_logs);
  115. if (mi->rli->purge_relay_logs(thd,
  116. 0 /* not only reset, but also reinit */,
  117. &errmsg))
  118. {
  119. error= ER_RELAY_LOG_FAIL;
  120. my_error(ER_RELAY_LOG_FAIL, MYF(0), errmsg);
  121. goto err;
  122. }
  123. }
  124. else
  125. {
  126. const char* msg;
  127. relay_log_purge= 0;
  128. DBUG_ASSERT(mi->rli->inited);
  129. /*初始化 relay_log_pos */
  130. if (mi->rli->init_relay_log_pos(mi->rli->get_group_relay_log_name(),
  131. mi->rli->get_group_relay_log_pos(),
  132. true/*we do need mi->rli->data_lock*/,
  133. &msg, 0))
  134. {
  135. error= ER_RELAY_LOG_INIT;
  136. my_error(ER_RELAY_LOG_INIT, MYF(0), msg);
  137. goto err;
  138. }
  139. }
  140. /* 恢复全局变量 relay_log_purge 的值 */
  141. relay_log_purge= save_relay_log_purge;
  142. /* 清理until condition */
  143. mi->rli->clear_until_condition();
  144. /* relay_log_info 持久化到磁盘 */
  145. if (mi->rli->flush_info(true))
  146. {
  147. error= ER_RELAY_LOG_INIT;
  148. my_error(ER_RELAY_LOG_INIT, MYF(0), "Failed to flush relay info file.");
  149. goto err;
  150. }
  151. }
  152. /* 出错后跳到次数,释放之前申请的锁 */
  153. err:
  154. unlock_slave_threads(mi);
  155. mi->channel_unlock();
  156. DBUG_RETURN(error);
  157. }

总结

change master 主要功能是修改 SQL 和 IO 线程的配置信息,执行时可能会purge relay log

没有特殊情况,建议指定auto_position=1,不要自己指定复制位点,避免数据丢失风险

如需对change master 做修改,需要注意在锁保护下修改变量,同时注意加锁顺序,避免死锁