环形缓冲区(Ring Buffer)是一种常见的用于数据流缓冲的结构,通常用于生产者-消费者模型、音视频处理等场景。
因为环形缓冲区使用的场景大多为性能敏感的场景,我们采用数组的数据结构和位运算来实现,以提高代码效率。位运算的效率要高于模运算,但是用位运算替代模运算的前提是缓冲区的大小必须为 2 的整数次幂,因为对于 2 的幂来说,模运算就是屏蔽高位,这个在下面展示代码的时候细说。
因为要适配不同的类型,在头文件中使用模板,由于模板类和模板函数是在使用时才实例化的,编译器需要在包含模板的地方就能看到其完整实现,如果编译器看不到它的实现,在链接时就会报错(undefined reference),通常不能将模板的实现写在.cpp
文件中。但是也不推荐把模板类的声明和定义全写在.h
文件里,推荐的方式是.h + .tpp
的方式,这样可以分离接口与实现,提高可读性,也可以避免不必要的重复编译——如果都写在.h
文件中,每次这个头文件被#include
,就会重新编译一遍模板定义,编译时间会变长。
环形缓冲区类的头文件 RingBuffer.h
#ifndef RINGBUFFER_H
#define RINGBUFFER_Htemplate<typename T, size_t Capacity>
class RingBuffer {static_assert((Capacity & (Capacity - 1)) == 0, "Capacity must be a power of 2");public:RingBuffer();bool push(const T& item);bool pop(T& item);bool empty() const;bool full() const;size_t size() const;void reset();private:T buffer_[Capacity];size_t head_;size_t tail_;bool full_;
};#include "RingBuffer.tpp"#endif
环形缓冲区类的实现部分RingBuffer.tpp
:
#ifndef RINGBUFFER_TPP
#define RINGBUFFER_TPPtemplate<typename T, size_t Capacity>
RingBuffer<T, Capacity>::RingBuffer(): head_(0),tail_(0),full_(false) {}template<typename T, size_t Capacity>
bool RingBuffer<T, Capacity>::push(const T &item) {if (full_) return false;buffer_[head_] = item;head_ = (head_ + 1) & (Capacity - 1);if (head_ == tail_) full_ = true;return true;
}template<typename T, size_t Capacity>
bool RingBuffer<T, Capacity>::pop(T &item) {if (empty()) return false;item = buffer_[tail_];tail_ = (tail_ + 1) & (Capacity - 1);full_ = false;return true;
}template<typename T, size_t Capacity>
bool RingBuffer<T, Capacity>::empty() const {return (!full_ && (head_ == tail_));
}template<typename T, size_t Capacity>
bool RingBuffer<T, Capacity>::full() const {return full_;
}template<typename T, size_t Capacity>
size_t RingBuffer<T, Capacity>::size() const {if (full_) return Capacity;if (head_ >= tail_) return head_ - tail_;return head_ + Capacity - tail_;
}template<typename T, size_t Capacity>
void RingBuffer<T, Capacity>::reset() {head_ = tail_ = 0;full_ = false;
}#endif
主函数代码:
#include <iostream>
#include "RingBuffer.h"using namespace std;int main() {RingBuffer<int, 8> buffer;for (int i = 0; i < 7; ++i) {if (buffer.push(i))cout << "Pushed: " << i << "\n";elsecout << "Buffer full, cannot push: " << i << "\n";}int val;while (buffer.pop(val)) {cout << "Popped: " << val << "\n";cout << buffer.size() << endl;}return 0;
}
如何判断一个数是 2 的幂?可以通过:
(Capacity & (Capacity - 1)) == 0
因为 2 的幂都是形如 1000 这样的数字,减一后除了首位外全为 1,利用&
的位运算之后全为 0。
如何用位运算替代模运算?就是利用位运算屏蔽高位。例如用 a & (b - 1)
替代 a % b
,前提 b 是 2 的整数次幂。例如 1111 % 1000,就是把高于 000 的位数全部去掉,因此可以利用 1000 - 1 = 0111 的高位 0 来“与”掉所有的高位,因为 0 与任何数还是 0 ,1 与任何数还是数本身。例如:
15 % 8 == 7
0b1111 & 0b0111 = 0b0111
注意这里不能用移位操作( >> 或者 << ),左移和右移操作替代的是除法和乘法:
x / 2^n → x >> n
x * 2^n → x << n
环形缓冲区的头尾初始值都是 0,符合条件时进行 push 和 pop 操作时,head_ 和 tail_ 的值都后移。
每次执行 push 操作,先检测一下缓冲区是否是满的,如果不满就将数据插入头位置,然后把头位置后移一位,如果 head_ + 1 大于 Capacity,则触发一次回绕,通过求余(通过模运算或者位运算)得到新的 head_。除了初始状态head_
= tail_
= 0,full_ = false
,后续的操作中,如果head_
和tail_
的值相同,则判定现在缓冲区已满(因为 head_ == tail_
时既可能是空也可能是满,必须通过 full_
标志来区分)。
size() 函数在处理 head_ < tail_ 的情况时(这种情况出现在多次 push 操作使得回绕被触发,且 pop 的操作次数少于 push 操作的时候),计算缓冲区中的数据量时需要用 Capacity 减去 head_ 和 tail_ 的差值。