566 字 | 2 分钟

网络线程池示意图

network

MainThd

主线程,模块初始化及创建相关线程,等待shutdown信号 调用堆栈 main

  1. -- tendisplus::ServerEntry->startup
  2. ---- 初始化 rocksdb/ WorkerPool / NetworkAsio
  3. ---- WorkerPool::startup
  4. ------ new thread(WorkerPool::consumeTasks()/_ioCtx->run()) n
  5. ---- NetworkAsio::prepare
  6. ------ asio::ip::tcp::acceptor _acceptor(_acceptCtx, tcp::endpoint); //绑定端口
  7. ---- ReplManager::startup
  8. ---- NetworkAsio::run
  9. ------ new thread(_acceptThd)
  10. ------ new thread(asio::io_context::work(_rwCtx); _rwCtx->run()) n
  11. ------ NetworkAsio::doAccept
  12. -------- _acceptor->async_accept(_rwCtx, std::move(cb))
  13. -- setupSignals
  14. -- waitForExit

_acceptThd

监听线程,绑定端口进行监听

  1. NetworkAsio::doAccept->cb
  2. -- ServerEntry::addSession(new NetSession);
  3. ---- NetSession::start
  4. ------ NetSession::stepState
  5. -------- drainReqNet
  6. ---------- _sock.async_read_some(buffer, []{NetSession::drainReqCallback})

NetworkAsio

网络IO线程池 , 负责命令响应,并发到工作线程任务队列

  1. NetSession::drainReqCallback (read回调)
  2. -- processInlineBuffer
  3. ---- NetSession::setState(State::Process);
  4. ---- NetSession::schedule()
  5. ------ ServerEntry::schedule
  6. -------- WorkPool::schedule([]{NetSession::stepState})
  7. ---------- asio::post(*_ioCtx, std::move([]{NetSession::stepState}))
  8. 命令应答,向客户端返回消息。(readwrite解耦,不用互相等待)
  9. NetSession::drainRspCallback (write回调)
  10. -- drainRsp
  11. ---- asio::async_write(_sock, buf, []{NetSession::drainRspCallback}

网络模型如下图所示: network

ServerEntry::_executor

工作线程池,处理命令解析并执行

  1. NetSession::stepState
  2. -- NetSession::processReq
  3. ---- ServerEntry::processRequest
  4. ------ Command::precheck
  5. ------ Command::runSessionCmd
  6. ------ NetSession::setResponse
  7. -------- NetSession::_isSendRunning = true or _sendBuffer.push_back
  8. -------- NetSession::drainRsp
  9. ---------- asio::async_write(_sock buffer, size, [] {drainRspCallback});
  10. ---- NetSession::setState(State::DrainReqNet) or NetSession::setState(State::DrainReqBuf)
  11. ---- NetSession::schedule()

复制相关线程池

1.ReplManager::_fullReceiver slave向master发起全量复制命令fullsync,并同步处理master的backup

  1. ReplManager::slaveSyncRoutine
  2. --ReplManager::slaveStartFullsync (StoreMeta::replState == ReplState::REPL_CONNECT)
  3. ---- store->stop
  4. ---- client = ReplManager::createClient()
  5. ---- SET STATE TO REPL_TRANFER
  6. ---- getBackupInfo
  7. ------ client->writeLine("fullsync ...")
  8. ---- get all files
  9. ---- client->writeLine("+OK")
  10. ---- store->restart
  11. ---- SET STATE TO REPL_CONNECTED

2.ReplManager::_incrChecker 线程数:2 slave向master发起增量复制命令incrsync,并关联的session信息

  1. ReplManager::slaveSyncRoutine
  2. -- ReplManager::slaveChkSyncStatus (StoreMeta::replState == ReplState::REPL_CONNECTED)
  3. ---- if _syncStatus[storeId]->sessionid is valid return; # noop
  4. ---- client = ReplManager::createClient()
  5. ---- client->writeLine("INCRSYNC", ...)
  6. ---- client->readLine
  7. ---- client->writeLine("+PONG") # Master should sending binlog after read the "+PONG"
  8. ---- ReplManager::_syncStatus[storeId]->sessionId = sessionId

线程池_fullReciver, _incrChecker只有在slave初次建立,或slave重启后才真正工作,大部分时间都是空跑。

3.ReplManager::_fullPusher

  1. ReplManager::supplyFullSyncRoutine

4.ReplManager::_incrPusher 5.ReplManager::_logRecycler

cluster相关线程池

  1. MigrateManager:: _migrateReceiver 搬迁接收方处理任务的线程池,默认4个
  1. receiveSchedule
  2. -- MigrateReceiveState::RECEIVE_SNAPSHOT
  3. -- fullRecieve
  4. -- receiveSnapShot
  5. -- client->writeLine("readymigrate ", taskid)
  6. -- client->readLine
  1. MigrateManage:: _migrateSender 搬迁发送方处理任务的线程池,默认4个
  1. senderSchedule
  2. --- MigrateSendState::START ,
  3. ---_migrateSender->schedule()
  4. --- sendSlots
  5. --- sendSnaphsot
  6. --- sendBinlog
  7. --- client->writeLine("migrateend", taskid)
  8. --- client->readLine
  9. --- setslot
  1. GCManager:_gcDeleter 清理脏数据的线程池,默认1个
  1. --- DeleteRangeState::START
  2. --- task->garbageDelete
  3. --- deleteSlotRange