概述 链接到标题
MPSC_PBUF ( Multi Producer Single Consumer Packet Buffer ) 多生产者单消费者包缓冲器是一个循环缓冲区,MPSC_PBUF 中的数据以包为单位进行管理,并且包的大小不定,用户以包为单位从 MPSC_PBUF 中读取或者存储。为了减少数据在内存中的复制,生产数据分为两个步骤:分配数据包空间和填充数据并提交。消费数据包也分两个步骤:消费者请求数据包,使用数据并释放数据包。MPSC 的关键特性如下:
- 分配+提交用于生产
- 请求+释放用于消费
- 分配器会确保分配到的空间连续
- 分配空间的时候可以指定等待超时
- 在不等待超时的情况分配不到空间时处理策略:
- 覆盖: 丢弃最老的数据包, 丢弃时会有回调产生
- 不覆盖:直接返回失败
虽然 MPSC 也是使用环形缓冲区,但不同的是 MPSC 的数据是以包为单位在环形缓冲区中处理,因此除了同样拥有先进先出的特性,由于每个包之间是有明显界限的,也就可以支持多生产者。在 Zephyr 中 log 系统使用了 MPSC,不同模块作为多个生产者产生 log 信息,log 系统作为一个消费者,顺序的消费这些 log 进行顺序显示。
使用方法 链接到标题
MPSC 的 API 定义参考下面两个文件:
zephyr/include/zephyr/sys/mpsc_packet.h
zephyr/include/zephyr/sys/mpsc_pbuf.h
定义包结构 链接到标题
MPSC 进行包管理,MPSC 要求占用包的最开始 2bit 作为管理位,在代码上使用 MPSC_PBUF_HDR
来为 MPSC 预留包管理位,定义用户包结构
struct mpsc_sample_header{
MPSC_PBUF_HDR;
uint32_t length: 32 - MPSC_PBUF_HDR_BITS;
};
struct mpsc_sample_packet{
struct mpsc_sample_header hdr;
uint8_t data[];
}
//对于 mpsc 来说,只会操作 union mpsc_pbuf_generic,因此定义为共用方便操作
union mpsc_packet{
union mpsc_pbuf_generic hdr;
struct mpsc_sample_packet packet;
}
初始化 MPSC 链接到标题
在 MPSC 整个生命周期中使用唯一的 struct mpsc_pbuf_buffer
代表一个 MPSC,其内容由 MPSC 自己管理,初始化的时候使用 struct mpsc_pbuf_buffer_config
进行配置,内容如下:
struct mpsc_pbuf_buffer_config {
//MPSC 使用的 buf, 所有的包都从该 buf 内分配内存
uint32_t *buf;
//MPSC buf 的大小
uint32_t size;
//使用覆盖策略时,丢弃包的时候会调用该回调
mpsc_pbuf_notify_drop notify_drop;
//获取包的大小
mpsc_pbuf_get_wlen get_wlen;
//配置参数
uint32_t flags;
};
需要为 MPSC 提供一个丢包的回调函数,在丢包的时候会回调该函数,可以在该函数中筛选出必须使用的包进行处理,示例中我们只提示,不处理
static void drop ( const struct mpsc_pbuf_buffer *buffer, const union mpsc_pbuf_generic *item )
{
struct mpsc_sample_header *t_item = ( struct mpsc_sample_header * ) item;
LOG_INF ( "drop packet length %d", t_item->length ) ;
}
除了 MPSC_PBUF_HDR
外 MPSC 内部并不关心和访问 header 内任何数据,因此需要注册一个获取长度的回调用于 MPSC 内部获取数据包的长度
static uint32_t get_wlen ( const union mpsc_pbuf_generic *item )
{
struct mpsc_sample_header *t_item = ( struct mpsc_sample_header * ) item;
LOG_INF ( "get_wlen %d", t_item->length ) ;
return t_item->length;
}
执行初始化
//MPSC 的数据包 buffer
static uint32_t buf [128];
//mpsc 管理器
struct mpsc_pbuf_buffer buffer;
static int mpsc_sample_cmd ( const struct shell *shell, size_t argc, char **argv )
{
//初始化 mpsc_buf_cfg,设置 buffer 和 callback, 分配不到空间时运行 overwrite
struct mpsc_pbuf_buffer_config mpsc_buf_cfg = {
.buf = buf32,
.size = ARRAY_SIZE ( buf32 ) ,
.notify_drop = drop,
.get_wlen = get_wlen,
.flags = MPSC_PBUF_MODE_OVERWRITE;
};
mpsc_pbuf_init ( &buffer, &mpsc_buf_cfg ) ;
}
初始化的 flags
用户可配置参数为:
MPSC_PBUF_MODE_OVERWRITE
: 分配不到包空间时进行丢最老包的操作MPSC_PBUF_MAX_UTILIZATION
: 记录 MPSC buffer 最大使用到多少,一般 debug 用
生产者 链接到标题
生产者产生数据:
- 从 MPSC 申请空间
mpsc_pbuf_alloc
- 填充数据包
- 提交数据包
mpsc_pbuf_commit
需要注意的是从 MPSC 申请的空间是包含了包头的空间,如果你要通过 MPSC 传送 32 个字节的净数据的包,那么就需要申请 32+sizeof ( mpsc_sample_header )
个字节的空间
//传递 32 个字节净数据
uint32_t packet_len = 32+sizeof ( mpsc_sample_header ) ;
//从 MPSC 申请空间
union mpsc_packet *packet = mpsc_pbuf_alloc ( &buffer, packet_len, K_NO_WAIT ) ;
//填充数据
memset ( packet->packet.data, 0x5a, 32 ) ;
//提交数据
mpsc_pbuf_commit ( &buffer, packet ) ;
mpsc_pbuf_alloc
的特性:
- buffer 空间充足:申请到包空间
- buffer 空间不足:
- 有等待超时:超时时间内等待消费者释放包空间,如果释放的包空间小于请求的空间,将会再次等待 ( 这可能会实际导致等待时间几倍于指定的等待时间 )
- 无等待超时:当指定了
MPSC_PBUF_MODE_OVERWRITE
的情况下,进行丢老包处理,腾出空间放新包。如果没有指定MPSC_PBUF_MODE_OVERWRITE
则请求空间失败。
消费者 链接到标题
消费者消费数据:
- 从 MPSC 中请求一个数据包
mpsc_pbuf_claim
- 处理数据
- 通知 MPSC 释放数据包
mpsc_pbuf_free
//请求一个数据包
union mpsc_packet *packet_read = mpsc_pbuf_claim ( &buffer ) ;
if ( packet_read ) {
//处理数据包
shell_print ( shell, "Dump packet[%d]:", packet_read->packet.hdr.length ) ;
shell_hexdump ( shell, ( const uint8_t* ) packet_read->packet.data, packet_read->packet.hdr.length ) ;
//释放数据包
mpsc_pbuf_free ( &buffer, packet_read ) ;
}
其它 API 链接到标题
除了前面 5 个 API 外,MPSC 还提供其它的辅助 API。
下面三个 API 是简化生产者的操作,不必分请求和提交两步进行,但需要注意的是当 MPSC 中 buffer 不足且未指定 MPSC_PBUF_MODE_OVERWRITE
时,下面三个 API 都不会成功的将数据放入到 MPSC 中,在调用这三个 API 前可以先通过 mpsc_pbuf_get_utilization
进行空间确认
//向 buffer 中写单个字 ( 4byte 数据 ) word->raw
void mpsc_pbuf_put_word ( struct mpsc_pbuf_buffer *buffer,
const union mpsc_pbuf_generic word ) ;
//向 buffer 中写单个字 ( 4byte 数据 ) word->raw 和一个数据指针 data
void mpsc_pbuf_put_word_ext ( struct mpsc_pbuf_buffer *buffer,
const union mpsc_pbuf_generic word,
const void *data ) ;
//向 buffer 中写入指向 data 的 wlen 长度的数据
void mpsc_pbuf_put_data ( struct mpsc_pbuf_buffer *buffer,
const uint32_t *data, size_t wlen ) ;
下面三个 API 时获取 MPSC 的状态
//返回 MPSC 中是否还有可用包
bool mpsc_pbuf_is_pending ( struct mpsc_pbuf_buffer *buffer ) ;
//MPSC 中 buffer 使用情况, now 表示已使用,size 表示总数。
void mpsc_pbuf_get_utilization ( struct mpsc_pbuf_buffer *buffer,
uint32_t *size, uint32_t *now ) ;
//max 为 MPSC buffer 最大的使用量情况,当初始化 MPSC 没有指定 MPSC_PBUF_MAX_UTILIZATION 时将返回-ENOTSUP
int mpsc_pbuf_get_max_utilization ( struct mpsc_pbuf_buffer *buffer, uint32_t *max ) ;
参考 链接到标题
https://docs.zephyrproject.org/latest/kernel/data_structures/mpsc_pbuf.html