bRPC源码解析·IOBuf

(作者简介:KIDGINBROOK,在昆仑芯参与训练框架开发工作)

brpc使用butil::IOBuf作为一些协议中的附件或http body的数据结构,它是一种非连续零拷贝缓冲,在其他项目中得到了验证并有出色的性能。IOBuf的接口和std::string类似,但不相同。

整体架构如下所示,从上到下结构分别为IOBuf,BlockRef和Block,Block负责实际数据的存储,一个IOBuf通过多个BlockRef引用多个Block。 IOBuf

Block

首先看下Block的结构,block就是一段内存,默认大小为8k,负责数据的实际存储。size表示使用了多少内存,cap为这段内存的容量,数据存储在data,portal_next指向在链表结构下的一块block。

  1. struct IOBuf::Block {
  2. butil::atomic<int> nshared;
  3. uint16_t flags;
  4. uint16_t abi_check; // original cap, never be zero.
  5. uint32_t size;
  6. uint32_t cap;
  7. // When flag is 0, portal_next is valid.
  8. // When flag & IOBUF_BLOCK_FLAGS_USER_DATA is non-0, data_meta is valid.
  9. union {
  10. Block* portal_next;
  11. uint64_t data_meta;
  12. } u;
  13. // When flag is 0, data points to `size` bytes starting at `(char*)this+sizeof(Block)'
  14. // When flag & IOBUF_BLOCK_FLAGS_USER_DATA is non-0, data points to the user data and
  15. // the deleter is put in UserDataExtension at `(char*)this+sizeof(Block)'
  16. char* data;
  17. ...
  18. }

创建一个block的接口为create_block,block通过blockmem_allocate申请内存,默认设置为malloc,用户可以通过设置blockmem_allocate实现自定义的内存分配,如rdma场景即可通过改写blockmem_allocate实现锁页内存的分配。通过malloc申请到8k内存mem后,在mem上调用placement new,因为block本身也占用了内存,因此实际数据存储data指向mem + sizeof(Block),容量大小为8k-sizeof(Block)。block的写入永远是追加写,不会修改已写入的内容;为了避免全局竞争带来的开销,block引入了tls优化。

  1. static const size_t butil::IOBuf::DEFAULT_BLOCK_SIZE = 8192UL;
  2. void* (*blockmem_allocate)(size_t) = ::malloc;
  3. inline IOBuf::Block* create_block() {
  4. return create_block(IOBuf::DEFAULT_BLOCK_SIZE);
  5. }
  6. inline IOBuf::Block* create_block(const size_t block_size) {
  7. if (block_size > 0xFFFFFFFFULL) {
  8. LOG(FATAL) << "block_size=" << block_size << " is too large";
  9. return NULL;
  10. }
  11. char* mem = (char*)iobuf::blockmem_allocate(block_size);
  12. if (mem == NULL) {
  13. return NULL;
  14. }
  15. return new (mem) IOBuf::Block(mem + sizeof(IOBuf::Block),
  16. block_size - sizeof(IOBuf::Block));
  17. }

block中的nshared字段表示当前block被多少个BlockRef所引用,初始值为1,当block没有被引用时便会被释放。

  1. void inc_ref() {
  2. check_abi();
  3. nshared.fetch_add(1, butil::memory_order_relaxed);
  4. }
  5. void dec_ref() {
  6. check_abi();
  7. if (nshared.fetch_sub(1, butil::memory_order_release) == 1) {
  8. butil::atomic_thread_fence(butil::memory_order_acquire);
  9. if (!flags) {
  10. iobuf::g_nblock.fetch_sub(1, butil::memory_order_relaxed);
  11. iobuf::g_blockmem.fetch_sub(cap + sizeof(Block),
  12. butil::memory_order_relaxed);
  13. this->~Block();
  14. iobuf::blockmem_deallocate(this);
  15. } else if (flags & IOBUF_BLOCK_FLAGS_USER_DATA) {
  16. get_user_data_extension()->deleter(data);
  17. this->~Block();
  18. free(this);
  19. }
  20. }
  21. }

BlockRef

BlockRef引用了一个block,指向了一个block中的一段区域,开始位置为offset,长度为length。

  1. struct BlockRef {
  2. // NOTICE: first bit of `offset' is shared with BigView::start
  3. uint32_t offset;
  4. uint32_t length;
  5. Block* block;
  6. };

IOBuf

然后看下IOBuf,IOBuf本质就是管理了多个BlockRef,IOBuf的主要结构为一个BigView和SmallView的联合体

  1. union {
  2. BigView _bv;
  3. SmallView _sv;
  4. };

sv表示两个Ref,而bv表示一个ref数组

  1. struct SmallView {
  2. BlockRef refs[2];
  3. };
  4. struct BigView {
  5. int32_t magic;
  6. uint32_t start;
  7. BlockRef* refs;
  8. uint32_t nref;
  9. uint32_t cap_mask;
  10. size_t nbytes;
  11. ...
  12. }

主要api

然后介绍下IOBuf的主要api。

默认构造函数

首先是默认构造函数,默认为sv

  1. inline void reset_block_ref(IOBuf::BlockRef& ref) {
  2. ref.offset = 0;
  3. ref.length = 0;
  4. ref.block = NULL;
  5. }
  6. inline IOBuf::IOBuf() {
  7. reset_block_ref(_sv.refs[0]);
  8. reset_block_ref(_sv.refs[1]);
  9. }

append一个IOBuf

append一个IOBuf,不涉及到实际内存的拷贝,只要push下other的ref即可,主要逻辑就是ref的合并;遍历other所有的ref,然后调用_push_back_ref

  1. void IOBuf::append(const IOBuf& other) {
  2. const size_t nref = other._ref_num();
  3. for (size_t i = 0; i < nref; ++i) {
  4. _push_back_ref(other._ref_at(i));
  5. }
  6. }
  7. inline void IOBuf::_push_back_ref(const BlockRef& r) {
  8. if (_small()) {
  9. return _push_or_move_back_ref_to_smallview<false>(r);
  10. } else {
  11. return _push_or_move_back_ref_to_bigview<false>(r);
  12. }
  13. }
  1. 如果当前IOBuf为sv,那么调用_push_or_move_back_ref_to_smallview,流程如下:

    1. 如果当前IOBuf没有ref,那么将sv[0]设置为r,并将r所引用的block进行inc_ref()
    2. 如果当前IOBuf的ref[1]为空,那么判断ref[0]和r是否可以合并,如果两个ref引用的block一致,且两段区域连续,那么就将r合并到ref[0]上,否则将sv[1]设置为r
    3. 如果当前IOBuf的ref均不为空,那么尝试合并r到ref[1],若无法合并,则将当前iobuf由sv转成bv,为bv申请一定量的ref数组,并将r添加到bv中
  2. 如果当前IOBuf为bv,_push_or_move_back_ref_to_bigview则判断r是否能与bv的最后一个合并,若不能,则添加到bv最后。

append一个std::string

此时会涉及内存的拷贝

  1. inline int IOBuf::append(const std::string& s) {
  2. return append(s.data(), s.length());
  3. }
  4. int IOBuf::append(void const* data, size_t count) {
  5. if (BAIDU_UNLIKELY(!data)) {
  6. return -1;
  7. }
  8. if (count == 1) {
  9. return push_back(*((char const*)data));
  10. }
  11. ...
  12. }

如果只append一个字符,就会调用push_back()接口,share_tls_block接口是获取到一个未满的block,这个接口后面会介绍,然后将这个字符加到block中,创建ref并push到当前iobuf中,即完成了append。

  1. int IOBuf::push_back(char c) {
  2. IOBuf::Block* b = iobuf::share_tls_block();
  3. if (BAIDU_UNLIKELY(!b)) {
  4. return -1;
  5. }
  6. b->data[b->size] = c;
  7. const IOBuf::BlockRef r = { b->size, 1, b };
  8. ++b->size;
  9. _push_back_ref(r);
  10. return 0;
  11. }

如果是append多个字符,那么循环调用share_tls_block得到未满的block,memcpy data到block中,直到拷贝完所有字符。

appendv

  1. int IOBuf::appendv(const const_iovec* vec, size_t n)

该接口是将n个iovec的数据append到iobuf,和上述原理差不多,就是循环执行。 其他对于cut类,pop类接口,原理和append也比较相近,不再赘述。

TLS优化

然后看下之前提到的tls优化。 TLSData就是一个block链表,每个线程有个thread local的TLSData,num_blocks表示cache了多少个block,registered表示是否注册了线程退出对tls清理的函数。

  1. struct TLSData {
  2. // Head of the TLS block chain.
  3. IOBuf::Block* block_head;
  4. // Number of TLS blocks
  5. int num_blocks;
  6. // True if the remote_tls_block_chain is registered to the thread.
  7. bool registered;
  8. };

然后看下上文提到的share_tls_block 首先是判断当前线程TLSData头结点,如果头结点b不是null且未满,那么直接返回b;如果b已经满了,那么将b从tls中移除,dec_ref,并走到b的下一个节点new_block;如果b是null,那么如果没注册线程退出函数就注册下,如果new_block为null,那么通过create_block申请一个block并加入到tls链表中。

  1. IOBuf::Block* share_tls_block() {
  2. TLSData& tls_data = g_tls_data;
  3. IOBuf::Block* const b = tls_data.block_head;
  4. if (b != NULL && !b->full()) {
  5. return b;
  6. }
  7. IOBuf::Block* new_block = NULL;
  8. if (b) {
  9. new_block = b;
  10. while (new_block && new_block->full()) {
  11. IOBuf::Block* const saved_next = new_block->u.portal_next;
  12. new_block->dec_ref();
  13. --tls_data.num_blocks;
  14. new_block = saved_next;
  15. }
  16. } else if (!tls_data.registered) {
  17. tls_data.registered = true;
  18. // Only register atexit at the first time
  19. butil::thread_atexit(remove_tls_block_chain);
  20. }
  21. if (!new_block) {
  22. new_block = create_block(); // may be NULL
  23. if (new_block) {
  24. ++tls_data.num_blocks;
  25. }
  26. }
  27. tls_data.block_head = new_block;
  28. return new_block;
  29. }

而acquire_tls_block接口和share_tls_block类似,区别是acquire会将返回的block移除tls,而share不会移除。

release_tls_block,release_tls_block_chain则是将单个/多个block归还TLSData

这里申请和归还block并不能保证在同一个thread,可能会出现在A线程申请,在B线程归还的场景。

IOPortal

然后看下IOPortal,IOPortal是IOBuf的子类,可以从fd中读数据,一般用于和socket的交互。 do while循环做的事情是不断申请block,直到这些block剩余空间能够存下max_count的数据;并初始化好iovec,iovec初始化为这些block;然后调用readv或者preadv,最后根据block生成blockref。

  1. ssize_t IOPortal::pappend_from_file_descriptor(
  2. int fd, off_t offset, size_t max_count) {
  3. iovec vec[MAX_APPEND_IOVEC];
  4. int nvec = 0;
  5. size_t space = 0;
  6. Block* prev_p = NULL;
  7. Block* p = _block;
  8. // Prepare at most MAX_APPEND_IOVEC blocks or space of blocks >= max_count
  9. do {
  10. if (p == NULL) {
  11. p = iobuf::acquire_tls_block();
  12. if (BAIDU_UNLIKELY(!p)) {
  13. errno = ENOMEM;
  14. return -1;
  15. }
  16. if (prev_p != NULL) {
  17. prev_p->u.portal_next = p;
  18. } else {
  19. _block = p;
  20. }
  21. }
  22. vec[nvec].iov_base = p->data + p->size;
  23. vec[nvec].iov_len = std::min(p->left_space(), max_count - space);
  24. space += vec[nvec].iov_len;
  25. ++nvec;
  26. if (space >= max_count || nvec >= MAX_APPEND_IOVEC) {
  27. break;
  28. }
  29. prev_p = p;
  30. p = p->u.portal_next;
  31. } while (1);
  32. ...

append_from_reader和pappend_from_file_descriptor逻辑差不多,区别是前者使用IReader的ReadV,后者使用系统调用readv或者preadv。

  1. ssize_t IOPortal::pappend_from_file_descriptor(
  2. int fd, off_t offset, size_t max_count) {
  3. ...
  4. ssize_t nr = 0;
  5. if (offset < 0) {
  6. nr = readv(fd, vec, nvec);
  7. } else {
  8. static iobuf::iov_function preadv_func = iobuf::get_preadv_func();
  9. nr = preadv_func(fd, vec, nvec, offset);
  10. }
  11. if (nr <= 0) { // -1 or 0
  12. if (empty()) {
  13. return_cached_blocks();
  14. }
  15. return nr;
  16. }
  17. ...
  18. }

最后构造BlockRef

  1. ssize_t IOPortal::pappend_from_file_descriptor(
  2. int fd, off_t offset, size_t max_count) {
  3. ...
  4. size_t total_len = nr;
  5. do {
  6. const size_t len = std::min(total_len, _block->left_space());
  7. total_len -= len;
  8. const IOBuf::BlockRef r = { _block->size, (uint32_t)len, _block };
  9. _push_back_ref(r);
  10. _block->size += len;
  11. if (_block->full()) {
  12. Block* const saved_next = _block->u.portal_next;
  13. _block->dec_ref(); // _block may be deleted
  14. _block = saved_next;
  15. }
  16. } while (total_len);
  17. return nr;
  18. }

protobuf接口

最后是基于protobuf的IOBufAsZeroCopyInputStream和IOBufAsZeroCopyOutputStream,继承自google::protobuf::io::ZeroCopyInputStream 和google::protobuf::io::ZeroCopyOutputStream,目的是为了消除用户逻辑同stream交互时发生的拷贝,例如从stream的内存到用户的buf间的拷贝;具体做法为buf的内存不应该由用户逻辑管理,而是由stream来管理;对外暴露两个接口,分别为

  1. bool Next(void** data, int* size) // 返回一段可写入的连续内存(*data),长度为(*size)
  2. void BackUp(int count) // 归还不需要使用的内存。

然后看下next接口

  1. bool IOBufAsZeroCopyOutputStream::Next(void** data, int* size) {
  2. if (_cur_block == NULL || _cur_block->full()) {
  3. _release_block();
  4. if (_block_size > 0) {
  5. _cur_block = iobuf::create_block(_block_size);
  6. } else {
  7. _cur_block = iobuf::acquire_tls_block();
  8. }
  9. if (_cur_block == NULL) {
  10. return false;
  11. }
  12. }
  13. const IOBuf::BlockRef r = { _cur_block->size,
  14. (uint32_t)_cur_block->left_space(),
  15. _cur_block };
  16. *data = _cur_block->data + r.offset;
  17. *size = r.length;
  18. _cur_block->size = _cur_block->cap;
  19. _buf->_push_back_ref(r);
  20. _byte_count += r.length;
  21. return true;
  22. }

其实就是申请一个block,然后返回,size为这个block的剩余空间,也因为这样,所以需要acquire来占住整个block。

修改于 2023年11月3日: Release bRPC 1.7.0 (#164) (c91a354)