使用C11里的原子变量实现,没有用互斥锁,效率更高。
ring_buffer.h:
/*** @file ring_buffer.h* @author tl* @brief 单生产者多消费者环形缓冲区,每条数据被所有消费者读后才释放。读线程安全,写仅单线程。* @version* @date 2025-08-06** @copyright Copyright (c) 2025**/#ifndef _RINGBUFFER_H_
#define _RINGBUFFER_H_#ifdef __cplusplus
extern "C"
{
#endif#include <stdint.h>
#include <stddef.h>/*** @brief 环形缓冲区数据包头结构*/typedef struct __attribute__((packed)){uint32_t length; ///< 负载数据长度uint16_t type; ///< 类型uint16_t reserved; ///< 预留扩展} mu_ring_buffer_pkt_hdr;/*** @brief 环形缓冲区句柄*/typedef struct mu_ring_buffer mu_ring_buffer;/*** @brief 环形缓冲区读者句柄*/typedef struct mu_ring_buffer_reader mu_ring_buffer_reader;/*** @brief 创建环形缓冲区* @param capacity 缓冲区总大小* @return mu_ring_buffer* 成功返回指针,失败返回NULL*/mu_ring_buffer *mu_ring_buffer_create(size_t capacity);/*** @brief 销毁环形缓冲区** @param pp_rb 缓冲区指针的地址*/void mu_ring_buffer_destroy(mu_ring_buffer **pp_rb);/*** @brief 添加一个新的读者** @param rb 环形缓冲区指针* @return mu_ring_buffer_reader* 成功返回读者指针,失败返回NULL*/mu_ring_buffer_reader *mu_ring_buffer_add_reader(mu_ring_buffer *rb);/*** @brief 删除读者** @param rb 环形缓冲区指针* @param pp_reader 读者指针的地址* @note 本函数不保证多线程并发安全,调用前请确保该读者没有在读取数据*/void mu_ring_buffer_remove_reader(mu_ring_buffer *rb, mu_ring_buffer_reader **pp_reader);/*** @brief 写入数据到环形缓冲区** @param rb 环形缓冲区指针* @param hdr 包头* @param data 负载数据* @return int 成功返回 0,失败返回负数(-1参数错误或内部错误,-2空间不足)* @note 仅支持单线程,不支持多线程并发写入*/int mu_ring_buffer_write(mu_ring_buffer *rb, const mu_ring_buffer_pkt_hdr *hdr, const void *data);/*** @brief 从环形缓冲区读取数据** @param reader 读者指针* @param hdr 包头* @param data 负载数据缓冲区* @param data_size data缓冲区大小* @return int 成功返回 0,失败返回负数(-1参数或内部错误,-2无数据,-3缓冲区空间不足)* @note 线程安全,支持多线程并发读取*/int mu_ring_buffer_read(mu_ring_buffer_reader *reader, mu_ring_buffer_pkt_hdr *hdr, void *data, size_t data_size);#ifdef __cplusplus
}
#endif#endif // _RINGBUFFER_H_
ring_buffer.c:
#include "ring_buffer.h"
#include <stdlib.h>
#include <string.h>
#include <stdatomic.h>
#include <assert.h>#define PKT_HDR_SIZE (sizeof(mu_ring_buffer_pkt_hdr))
#define MAX_READERS 32struct mu_ring_buffer_reader
{unsigned index;struct mu_ring_buffer *rb;
};struct mu_ring_buffer
{uint8_t *buffer;size_t capacity; // 缓冲区容量atomic_size_t tail; // 写指针atomic_size_t *reader_heads; // 每个读者的读指针(head)atomic_uint_fast32_t reader_bitmap; // 读者分配bitmap
};static inline unsigned find_first_zero_bit(uint32_t v)
{for (unsigned i = 0; i < MAX_READERS; ++i)if (!(v & (1u << i)))return i;return MAX_READERS;
}mu_ring_buffer *mu_ring_buffer_create(size_t capacity)
{if (capacity < PKT_HDR_SIZE + 1)return NULL;mu_ring_buffer *rb = (mu_ring_buffer *)calloc(1, sizeof(mu_ring_buffer));if (!rb)return NULL;rb->buffer = (uint8_t *)malloc(capacity);if (!rb->buffer){free(rb);return NULL;}rb->capacity = capacity;rb->reader_heads = (atomic_size_t *)calloc(MAX_READERS, sizeof(atomic_size_t));if (!rb->reader_heads){free(rb->buffer);free(rb);return NULL;}atomic_store(&rb->tail, 0);atomic_store(&rb->reader_bitmap, 0);// 初始化所有reader_heads为(size_t)-1for (unsigned i = 0; i < MAX_READERS; ++i)atomic_store(&rb->reader_heads[i], (size_t)-1);return rb;
}void mu_ring_buffer_destroy(mu_ring_buffer **pp_rb)
{if (!pp_rb || !*pp_rb)return;mu_ring_buffer *rb = *pp_rb;if (rb->reader_heads)free(rb->reader_heads);if (rb->buffer)free(rb->buffer);free(rb);*pp_rb = NULL;
}mu_ring_buffer_reader *mu_ring_buffer_add_reader(mu_ring_buffer *rb)
{if (!rb)return NULL;uint_fast32_t old, newval;do{old = atomic_load(&rb->reader_bitmap);unsigned idx = find_first_zero_bit(old);if (idx >= MAX_READERS)return NULL;newval = old | (1u << idx);if (atomic_compare_exchange_weak(&rb->reader_bitmap, &old, newval)){mu_ring_buffer_reader *r = (mu_ring_buffer_reader *)malloc(sizeof(mu_ring_buffer_reader));if (!r)return NULL;r->index = idx;r->rb = rb;// 新读者head指针初始化为tailatomic_store(&rb->reader_heads[idx], atomic_load(&rb->tail));return r;}} while (1);
}void mu_ring_buffer_remove_reader(mu_ring_buffer *rb, mu_ring_buffer_reader **pp_reader)
{if (!rb || !pp_reader || !*pp_reader)return;mu_ring_buffer_reader *reader = *pp_reader;unsigned idx = reader->index;atomic_store(&rb->reader_heads[idx], (size_t)-1); // 标记为无效uint32_t mask = ~(1u << idx);atomic_fetch_and(&rb->reader_bitmap, mask);free(reader);*pp_reader = NULL;
}// 返回所有有效reader head的最小值
static inline size_t min_reader_head(mu_ring_buffer *rb)
{size_t min = atomic_load(&rb->tail);uint32_t bm = atomic_load(&rb->reader_bitmap);int found = 0;for (unsigned i = 0; i < MAX_READERS; ++i){if (bm & (1u << i)){size_t h = atomic_load_explicit(&rb->reader_heads[i], memory_order_acquire);if (h == (size_t)-1)continue; // 已注销if (!found || ((h + rb->capacity - min) % rb->capacity < (min + rb->capacity - min) % rb->capacity)){min = h;found = 1;}}}return min;
}static inline void ring_buffer_write_data(mu_ring_buffer *rb, size_t offset, const void *src, size_t length)
{if (offset + length <= rb->capacity){memcpy(rb->buffer + offset, src, length);}else{size_t first = rb->capacity - offset;memcpy(rb->buffer + offset, src, first);memcpy(rb->buffer, (const uint8_t *)src + first, length - first);}
}static inline void ring_buffer_read_data(mu_ring_buffer *rb, size_t offset, void *dst, size_t length)
{if (offset + length <= rb->capacity){memcpy(dst, rb->buffer + offset, length);}else{size_t first = rb->capacity - offset;memcpy(dst, rb->buffer + offset, first);memcpy((uint8_t *)dst + first, rb->buffer, length - first);}
}int mu_ring_buffer_write(mu_ring_buffer *rb, const mu_ring_buffer_pkt_hdr *hdr, const void *data)
{if (!rb || !hdr || !data || hdr->length == 0)return -1;size_t total = PKT_HDR_SIZE + hdr->length;if (total > rb->capacity - 1)return -2;size_t tail = atomic_load_explicit(&rb->tail, memory_order_relaxed);size_t min_head = min_reader_head(rb);size_t used = (tail + rb->capacity - min_head) % rb->capacity;size_t free_space = rb->capacity - used - 1;if (free_space < total)return -2;// 写包头ring_buffer_write_data(rb, tail, hdr, PKT_HDR_SIZE);// 写数据size_t data_offset = (tail + PKT_HDR_SIZE) % rb->capacity;ring_buffer_write_data(rb, data_offset, data, hdr->length);// 更新tailatomic_store_explicit(&rb->tail, (tail + total) % rb->capacity, memory_order_release);return 0;
}int mu_ring_buffer_read(mu_ring_buffer_reader *reader, mu_ring_buffer_pkt_hdr *hdr, void *data, size_t data_size)
{if (!reader || !hdr || !data)return -1;mu_ring_buffer *rb = reader->rb;unsigned idx = reader->index;size_t head = atomic_load_explicit(&rb->reader_heads[idx], memory_order_relaxed);size_t tail = atomic_load_explicit(&rb->tail, memory_order_acquire);if (head == (size_t)-1) // 已删除return -1;if (head == tail) // 没有新数据return -2;// 读取包头ring_buffer_read_data(rb, head, hdr, PKT_HDR_SIZE);if (hdr->length == 0)return -1;// 检查数据完整性size_t pkt_total = PKT_HDR_SIZE + hdr->length;size_t avail = (tail + rb->capacity - head) % rb->capacity; // 计算可读数据量if (avail < pkt_total)return -1;if (data_size < hdr->length)return -3;// 读取数据size_t data_offset = (head + PKT_HDR_SIZE) % rb->capacity;ring_buffer_read_data(rb, data_offset, data, hdr->length);// 更新自己的headatomic_store_explicit(&rb->reader_heads[idx], (head + pkt_total) % rb->capacity, memory_order_release);return 0;
}
单元测试 main.c:
#include "ring_buffer.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <unistd.h>
#include <assert.h>
#include <time.h>
#include <errno.h>// 测试统计
static int tests_run = 0;
static int tests_passed = 0;#define TEST_ASSERT(condition, message) \do \{ \tests_run++; \if (condition) \{ \tests_passed++; \printf("✓ %s\n", message); \} \else \{ \printf("✗ %s\n", message); \} \} while (0)// 测试用例1: 基本创建和销毁
void test_basic_create_destroy()
{printf("\n=== 测试基本创建和销毁 ===\n");// 正常创建mu_ring_buffer *rb = mu_ring_buffer_create(1024);TEST_ASSERT(rb != NULL, "正常大小缓冲区创建");// 销毁mu_ring_buffer_destroy(&rb);TEST_ASSERT(rb == NULL, "缓冲区销毁后指针为NULL");// 边界条件测试mu_ring_buffer *rb_small = mu_ring_buffer_create(sizeof(mu_ring_buffer_pkt_hdr));TEST_ASSERT(rb_small == NULL, "过小容量创建失败");mu_ring_buffer *rb_min = mu_ring_buffer_create(sizeof(mu_ring_buffer_pkt_hdr) + 1);TEST_ASSERT(rb_min != NULL, "最小容量创建成功");mu_ring_buffer_destroy(&rb_min);// 重复销毁mu_ring_buffer_destroy(&rb);TEST_ASSERT(rb == NULL, "重复销毁安全");// NULL指针销毁mu_ring_buffer **null_ptr = NULL;mu_ring_buffer_destroy(null_ptr);TEST_ASSERT(1, "NULL指针销毁安全");
}// 测试用例2: 读者管理
void test_reader_management()
{printf("\n=== 测试读者管理 ===\n");mu_ring_buffer *rb = mu_ring_buffer_create(1024);TEST_ASSERT(rb != NULL, "创建缓冲区");// 添加读者mu_ring_buffer_reader *reader1 = mu_ring_buffer_add_reader(rb);TEST_ASSERT(reader1 != NULL, "添加第一个读者");mu_ring_buffer_reader *reader2 = mu_ring_buffer_add_reader(rb);TEST_ASSERT(reader2 != NULL, "添加第二个读者");// 添加最大数量的读者mu_ring_buffer_reader *readers[32];int added_count = 2;for (int i = 2; i < 32; i++){readers[i] = mu_ring_buffer_add_reader(rb);if (readers[i] != NULL){added_count++;}}TEST_ASSERT(added_count <= 32, "最多添加32个读者");// 尝试添加超出限制的读者mu_ring_buffer_reader *extra_reader = mu_ring_buffer_add_reader(rb);TEST_ASSERT(extra_reader == NULL, "超出限制的读者添加失败");// 删除读者mu_ring_buffer_remove_reader(rb, &reader1);TEST_ASSERT(reader1 == NULL, "删除读者后指针为NULL");// 删除后可以重新添加reader1 = mu_ring_buffer_add_reader(rb);TEST_ASSERT(reader1 != NULL, "删除后重新添加读者");// 清理mu_ring_buffer_remove_reader(rb, &reader1);mu_ring_buffer_remove_reader(rb, &reader2);for (int i = 2; i < added_count; i++){if (readers[i]){mu_ring_buffer_remove_reader(rb, &readers[i]);}}mu_ring_buffer_destroy(&rb);
}// 测试用例3: 基本读写功能
void test_basic_read_write()
{printf("\n=== 测试基本读写功能 ===\n");mu_ring_buffer *rb = mu_ring_buffer_create(1024);mu_ring_buffer_reader *reader = mu_ring_buffer_add_reader(rb);// 写入数据mu_ring_buffer_pkt_hdr write_hdr = {10, 1, 0};char write_data[10] = "hello";int ret = mu_ring_buffer_write(rb, &write_hdr, write_data);TEST_ASSERT(ret == 0, "写入数据成功");// 读取数据mu_ring_buffer_pkt_hdr read_hdr;char read_data[20] = {0};ret = mu_ring_buffer_read(reader, &read_hdr, read_data, sizeof(read_data));TEST_ASSERT(ret == 0, "读取数据成功");TEST_ASSERT(read_hdr.length == write_hdr.length, "读取包头长度正确");TEST_ASSERT(read_hdr.type == write_hdr.type, "读取包头类型正确");TEST_ASSERT(strcmp(read_data, write_data) == 0, "读取数据内容正确");// 再次读取应该无数据ret = mu_ring_buffer_read(reader, &read_hdr, read_data, sizeof(read_data));TEST_ASSERT(ret == -2, "重复读取返回无数据");mu_ring_buffer_remove_reader(rb, &reader);mu_ring_buffer_destroy(&rb);
}// 测试用例4: 多个读者读取
void test_multiple_readers()
{printf("\n=== 测试多个读者读取 ===\n");mu_ring_buffer *rb = mu_ring_buffer_create(1024);mu_ring_buffer_reader *reader1 = mu_ring_buffer_add_reader(rb);mu_ring_buffer_reader *reader2 = mu_ring_buffer_add_reader(rb);// 写入数据mu_ring_buffer_pkt_hdr hdr = {5, 1, 0};char data[5] = "test";int ret = mu_ring_buffer_write(rb, &hdr, data);TEST_ASSERT(ret == 0, "写入数据成功");// 两个读者都能读到数据mu_ring_buffer_pkt_hdr read_hdr1, read_hdr2;char read_data1[10] = {0}, read_data2[10] = {0};ret = mu_ring_buffer_read(reader1, &read_hdr1, read_data1, sizeof(read_data1));TEST_ASSERT(ret == 0, "读者1读取成功");ret = mu_ring_buffer_read(reader2, &read_hdr2, read_data2, sizeof(read_data2));TEST_ASSERT(ret == 0, "读者2读取成功");TEST_ASSERT(strcmp(read_data1, read_data2) == 0, "两个读者读取数据一致");mu_ring_buffer_remove_reader(rb, &reader1);mu_ring_buffer_remove_reader(rb, &reader2);mu_ring_buffer_destroy(&rb);
}// 测试用例5: 环形缓冲区绕行
void test_ring_wraparound()
{printf("\n=== 测试环形缓冲区绕行 ===\n");// 创建较小的缓冲区便于测试绕行size_t capacity = 100;mu_ring_buffer *rb = mu_ring_buffer_create(capacity);mu_ring_buffer_reader *reader = mu_ring_buffer_add_reader(rb);// 写入多个数据包,使其绕行char test_data[20];mu_ring_buffer_pkt_hdr hdr = {20, 1, 0};// 写入足够多的数据使其绕行for (int i = 0; i < 5; i++){snprintf(test_data, sizeof(test_data), "packet_%d", i);int ret = mu_ring_buffer_write(rb, &hdr, test_data);if (ret != 0){break; // 缓冲区满了}}// 读取所有数据int read_count = 0;mu_ring_buffer_pkt_hdr read_hdr;char read_data[30];while (1){int ret = mu_ring_buffer_read(reader, &read_hdr, read_data, sizeof(read_data));if (ret != 0){break;}read_count++;printf(" 读取到: %s\n", read_data);}TEST_ASSERT(read_count > 0, "成功读取数据包");mu_ring_buffer_remove_reader(rb, &reader);mu_ring_buffer_destroy(&rb);
}// 测试用例6: 缓冲区满的处理
void test_buffer_full()
{printf("\n=== 测试缓冲区满的处理 ===\n");size_t capacity = 64; // 较小的缓冲区mu_ring_buffer *rb = mu_ring_buffer_create(capacity);mu_ring_buffer_reader *reader = mu_ring_buffer_add_reader(rb);// 写入大量数据直到缓冲区满mu_ring_buffer_pkt_hdr hdr = {10, 1, 0};char data[10] = "testdata";int write_count = 0;while (1){int ret = mu_ring_buffer_write(rb, &hdr, data);if (ret != 0){TEST_ASSERT(ret == -2, "缓冲区满时返回-2");break;}write_count++;}printf(" 成功写入 %d 个数据包\n", write_count);TEST_ASSERT(write_count > 0, "至少能写入一些数据");// 读取一些数据后应该能继续写入mu_ring_buffer_pkt_hdr read_hdr;char read_data[15];int ret = mu_ring_buffer_read(reader, &read_hdr, read_data, sizeof(read_data));TEST_ASSERT(ret == 0, "读取数据成功");// 现在应该能再写入一个包ret = mu_ring_buffer_write(rb, &hdr, data);TEST_ASSERT(ret == 0, "读取后能继续写入");mu_ring_buffer_remove_reader(rb, &reader);mu_ring_buffer_destroy(&rb);
}// 测试用例7: 错误参数处理
void test_error_handling()
{printf("\n=== 测试错误参数处理 ===\n");mu_ring_buffer *rb = mu_ring_buffer_create(1024);mu_ring_buffer_reader *reader = mu_ring_buffer_add_reader(rb);mu_ring_buffer_pkt_hdr hdr = {10, 1, 0};char data[10] = "test";char read_data[10];// 测试NULL参数TEST_ASSERT(mu_ring_buffer_write(NULL, &hdr, data) == -1, "写入NULL缓冲区返回错误");TEST_ASSERT(mu_ring_buffer_write(rb, NULL, data) == -1, "写入NULL包头返回错误");TEST_ASSERT(mu_ring_buffer_write(rb, &hdr, NULL) == -1, "写入NULL数据返回错误");mu_ring_buffer_pkt_hdr zero_hdr = {0, 1, 0};TEST_ASSERT(mu_ring_buffer_write(rb, &zero_hdr, data) == -1, "写入零长度数据返回错误");// 测试读取错误参数TEST_ASSERT(mu_ring_buffer_read(NULL, &hdr, read_data, sizeof(read_data)) == -1, "NULL读者返回错误");TEST_ASSERT(mu_ring_buffer_read(reader, NULL, read_data, sizeof(read_data)) == -1, "NULL包头返回错误");TEST_ASSERT(mu_ring_buffer_read(reader, &hdr, NULL, sizeof(read_data)) == -1, "NULL数据缓冲区返回错误");// 测试数据缓冲区太小mu_ring_buffer_write(rb, &hdr, data);char small_buf[5];TEST_ASSERT(mu_ring_buffer_read(reader, &hdr, small_buf, sizeof(small_buf)) == -3, "数据缓冲区太小返回-3");mu_ring_buffer_remove_reader(rb, &reader);mu_ring_buffer_destroy(&rb);
}// 多线程测试结构
typedef struct
{mu_ring_buffer *rb;int thread_id;int iterations;int *success_count;pthread_mutex_t *mutex;
} thread_data_t;// 写线程函数
void *writer_thread(void *arg)
{thread_data_t *data = (thread_data_t *)arg;for (int i = 0; i < data->iterations; i++){mu_ring_buffer_pkt_hdr hdr = {16, (uint16_t)data->thread_id, 0};char write_data[16];snprintf(write_data, sizeof(write_data), "T%d_MSG_%d", data->thread_id, i);int ret = mu_ring_buffer_write(data->rb, &hdr, write_data);if (ret == 0){pthread_mutex_lock(data->mutex);(*data->success_count)++;pthread_mutex_unlock(data->mutex);}usleep(1000); // 1ms延迟}return NULL;
}// 读线程函数
void *reader_thread(void *arg)
{thread_data_t *data = (thread_data_t *)arg;mu_ring_buffer_reader *reader = mu_ring_buffer_add_reader(data->rb);if (!reader){return NULL;}int read_count = 0;time_t start_time = time(NULL);while (time(NULL) - start_time < 5){ // 运行5秒mu_ring_buffer_pkt_hdr hdr;char read_data[32];int ret = mu_ring_buffer_read(reader, &hdr, read_data, sizeof(read_data));if (ret == 0){read_count++;printf(" 读者%d读取: %s\n", data->thread_id, read_data);}else if (ret == -2){usleep(10000); // 10ms延迟}}pthread_mutex_lock(data->mutex);(*data->success_count) += read_count;pthread_mutex_unlock(data->mutex);mu_ring_buffer_remove_reader(data->rb, &reader);return NULL;
}// 读者管理线程数据结构
typedef struct
{mu_ring_buffer *rb;int thread_id;volatile int *stop_flag;pthread_mutex_t *mutex;int *operation_count;
} reader_manager_data_t;// 读者管理线程函数
void *reader_manager_thread(void *arg)
{reader_manager_data_t *data = (reader_manager_data_t *)arg;mu_ring_buffer_reader *managed_readers[5] = {0}; // 最多管理5个读者int reader_count = 0;int cycle = 0;printf(" 读者管理线程启动\n");while (!(*data->stop_flag)){cycle++;// 每个周期添加1-3个读者int readers_to_add = 1 + (cycle % 3);for (int i = 0; i < readers_to_add && reader_count < 5; i++){managed_readers[reader_count] = mu_ring_buffer_add_reader(data->rb);if (managed_readers[reader_count]){reader_count++;printf(" 管理线程添加读者 #%d (总数: %d)\n", reader_count, reader_count);pthread_mutex_lock(data->mutex);(*data->operation_count)++;pthread_mutex_unlock(data->mutex);}}// 让读者工作一段时间 (200-500ms)int work_time = 200 + (cycle % 300);usleep(work_time * 1000);// 读取一些数据for (int i = 0; i < reader_count; i++){if (managed_readers[i]){mu_ring_buffer_pkt_hdr hdr;char read_data[32];// 尝试读取几次for (int j = 0; j < 3; j++){int ret = mu_ring_buffer_read(managed_readers[i], &hdr, read_data, sizeof(read_data));if (ret == 0){printf(" 管理的读者#%d读取: %s\n", i + 1, read_data);pthread_mutex_lock(data->mutex);(*data->operation_count)++;pthread_mutex_unlock(data->mutex);break;}else if (ret == -2){break; // 无数据}}}}// 删除一些读者 (保留至少1个)int readers_to_remove = (reader_count > 2) ? 1 + (cycle % 2) : 0;for (int i = 0; i < readers_to_remove && reader_count > 1; i++){// 从末尾删除读者int idx = reader_count - 1;if (managed_readers[idx]){mu_ring_buffer_remove_reader(data->rb, &managed_readers[idx]);managed_readers[idx] = NULL;reader_count--;printf(" 管理线程删除读者 (剩余: %d)\n", reader_count);pthread_mutex_lock(data->mutex);(*data->operation_count)++;pthread_mutex_unlock(data->mutex);}}// 周期间隔usleep(100000); // 100ms}// 清理剩余的读者printf(" 读者管理线程清理剩余读者...\n");for (int i = 0; i < reader_count; i++){if (managed_readers[i]){mu_ring_buffer_remove_reader(data->rb, &managed_readers[i]);printf(" 清理读者 #%d\n", i + 1);}}printf(" 读者管理线程结束\n");return NULL;
}void test_multithreading()
{printf("\n=== 测试多线程并发 ===\n");mu_ring_buffer *rb = mu_ring_buffer_create(8192); // 增大缓冲区pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;volatile int stop_flag = 0;// 创建写线程数据int write_success = 0;thread_data_t writer_data = {rb, 0, 200, &write_success, &mutex}; // 增加写入次数// 创建固定读线程数据int read_success = 0;thread_data_t reader_data1 = {rb, 1, 0, &read_success, &mutex};thread_data_t reader_data2 = {rb, 2, 0, &read_success, &mutex};// 创建读者管理线程数据int manager_operations = 0;reader_manager_data_t manager_data = {rb, 99, &stop_flag, &mutex, &manager_operations};// 创建线程pthread_t writer_tid, reader_tid1, reader_tid2, manager_tid;printf(" 启动固定读者线程...\n");pthread_create(&reader_tid1, NULL, reader_thread, &reader_data1);pthread_create(&reader_tid2, NULL, reader_thread, &reader_data2);printf(" 启动读者管理线程...\n");pthread_create(&manager_tid, NULL, reader_manager_thread, &manager_data);usleep(200000); // 让读者和管理线程先启动printf(" 启动写入线程...\n");pthread_create(&writer_tid, NULL, writer_thread, &writer_data);// 等待写线程完成pthread_join(writer_tid, NULL);printf(" 写入线程完成\n");// 让系统再运行2秒处理剩余数据sleep(2);// 通知停止stop_flag = 1;// 等待所有线程完成pthread_join(reader_tid1, NULL);pthread_join(reader_tid2, NULL);pthread_join(manager_tid, NULL);printf(" 所有线程已完成\n");printf(" 写入成功: %d\n", write_success);printf(" 固定读者读取成功: %d\n", read_success);printf(" 读者管理操作数: %d\n", manager_operations);TEST_ASSERT(write_success > 0, "写入成功");TEST_ASSERT(read_success > 0, "读取成功");TEST_ASSERT(manager_operations > 0, "读者管理操作成功");// 验证最终状态printf(" 验证最终缓冲区状态...\n");pthread_mutex_destroy(&mutex);mu_ring_buffer_destroy(&rb);printf(" 多线程并发测试完成\n");
}// 测试用例9: 大数据包测试
void test_large_packets()
{printf("\n=== 测试大数据包 ===\n");size_t capacity = 8192;mu_ring_buffer *rb = mu_ring_buffer_create(capacity);mu_ring_buffer_reader *reader = mu_ring_buffer_add_reader(rb);// 测试接近最大大小的数据包size_t large_size = capacity / 2;char *large_data = malloc(large_size);memset(large_data, 'A', large_size);large_data[large_size - 1] = '\0';mu_ring_buffer_pkt_hdr hdr = {(uint32_t)large_size, 1, 0};int ret = mu_ring_buffer_write(rb, &hdr, large_data);TEST_ASSERT(ret == 0, "写入大数据包成功");// 读取大数据包char *read_buffer = malloc(large_size + 100);mu_ring_buffer_pkt_hdr read_hdr;ret = mu_ring_buffer_read(reader, &read_hdr, read_buffer, large_size + 100);TEST_ASSERT(ret == 0, "读取大数据包成功");TEST_ASSERT(read_hdr.length == hdr.length, "大数据包长度正确");// 测试超大数据包(应该失败)size_t too_large = capacity;mu_ring_buffer_pkt_hdr huge_hdr = {(uint32_t)too_large, 1, 0};ret = mu_ring_buffer_write(rb, &huge_hdr, large_data);TEST_ASSERT(ret == -2, "超大数据包写入失败");free(large_data);free(read_buffer);mu_ring_buffer_remove_reader(rb, &reader);mu_ring_buffer_destroy(&rb);
}// 测试用例10: 性能测试
void test_performance()
{printf("\n=== 性能测试 ===\n");mu_ring_buffer *rb = mu_ring_buffer_create(1024 * 1024); // 1MBmu_ring_buffer_reader *reader = mu_ring_buffer_add_reader(rb);const int iterations = 10000;mu_ring_buffer_pkt_hdr hdr = {64, 1, 0};char data[64];memset(data, 'X', sizeof(data));// 写入性能测试clock_t start = clock();int write_success = 0;for (int i = 0; i < iterations; i++){if (mu_ring_buffer_write(rb, &hdr, data) == 0){write_success++;}}clock_t write_time = clock() - start;printf(" 写入 %d/%d 个数据包,耗时: %ld ms\n",write_success, iterations, write_time * 1000 / CLOCKS_PER_SEC);// 读取性能测试start = clock();int read_success = 0;mu_ring_buffer_pkt_hdr read_hdr;char read_data[100];for (int i = 0; i < write_success; i++){if (mu_ring_buffer_read(reader, &read_hdr, read_data, sizeof(read_data)) == 0){read_success++;}}clock_t read_time = clock() - start;printf(" 读取 %d 个数据包,耗时: %ld ms\n",read_success, read_time * 1000 / CLOCKS_PER_SEC);TEST_ASSERT(write_success > iterations * 0.8, "写入成功率 > 80%");TEST_ASSERT(read_success == write_success, "读取数量与写入数量一致");mu_ring_buffer_remove_reader(rb, &reader);mu_ring_buffer_destroy(&rb);
}int main()
{printf("开始环形缓冲区测试...\n");test_basic_create_destroy();test_reader_management();test_basic_read_write();test_multiple_readers();test_ring_wraparound();test_buffer_full();test_error_handling();test_multithreading();test_large_packets();test_performance();printf("\n=== 测试结果 ===\n");printf("总测试数: %d\n", tests_run);printf("通过测试: %d\n", tests_passed);printf("失败测试: %d\n", tests_run - tests_passed);printf("通过率: %.2f%%\n", (float)tests_passed / tests_run * 100);return (tests_passed == tests_run) ? 0 : 1;
}