概述 链接到标题

MPSC_PBUF ( Multi Producer Single Consumer Packet Buffer ) 多生产者单消费者包缓冲器是一个循环缓冲区,MPSC_PBUF 中的数据以包为单位进行管理,并且包的大小不定,用户以包为单位从 MPSC_PBUF 中读取或者存储。为了减少数据在内存中的复制,生产数据分为两个步骤:分配数据包空间和填充数据并提交。消费数据包也分两个步骤:消费者请求数据包,使用数据并释放数据包。MPSC 的关键特性如下:

  • 分配+提交用于生产
  • 请求+释放用于消费
  • 分配器会确保分配到的空间连续
  • 分配空间的时候可以指定等待超时
  • 在不等待超时的情况分配不到空间时处理策略:
    • 覆盖: 丢弃最老的数据包, 丢弃时会有回调产生
    • 不覆盖:直接返回失败

虽然 MPSC 也是使用环形缓冲区,但不同的是 MPSC 的数据是以包为单位在环形缓冲区中处理,因此除了同样拥有先进先出的特性,由于每个包之间是有明显界限的,也就可以支持多生产者。在 Zephyr 中 log 系统使用了 MPSC,不同模块作为多个生产者产生 log 信息,log 系统作为一个消费者,顺序的消费这些 log 进行顺序显示。

使用方法 链接到标题

MPSC 的 API 定义参考下面两个文件:

  • zephyr/include/zephyr/sys/mpsc_packet.h
  • zephyr/include/zephyr/sys/mpsc_pbuf.h

定义包结构 链接到标题

MPSC 进行包管理,MPSC 要求占用包的最开始 2bit 作为管理位,在代码上使用 MPSC_PBUF_HDR 来为 MPSC 预留包管理位,定义用户包结构

struct mpsc_sample_header{
    MPSC_PBUF_HDR;
    uint32_t length: 32 - MPSC_PBUF_HDR_BITS;
};

struct mpsc_sample_packet{
    struct mpsc_sample_header hdr;
    uint8_t data[];
}

//对于 mpsc 来说,只会操作 union mpsc_pbuf_generic,因此定义为共用方便操作
union mpsc_packet{
    union mpsc_pbuf_generic hdr;
    struct mpsc_sample_packet packet;
}

初始化 MPSC 链接到标题

在 MPSC 整个生命周期中使用唯一的 struct mpsc_pbuf_buffer 代表一个 MPSC,其内容由 MPSC 自己管理,初始化的时候使用 struct mpsc_pbuf_buffer_config 进行配置,内容如下:

struct mpsc_pbuf_buffer_config {
    //MPSC 使用的 buf, 所有的包都从该 buf 内分配内存
    uint32_t *buf;

    //MPSC buf 的大小
    uint32_t size;

    //使用覆盖策略时,丢弃包的时候会调用该回调
    mpsc_pbuf_notify_drop notify_drop;

    //获取包的大小
    mpsc_pbuf_get_wlen get_wlen;

    //配置参数
    uint32_t flags;
};

需要为 MPSC 提供一个丢包的回调函数,在丢包的时候会回调该函数,可以在该函数中筛选出必须使用的包进行处理,示例中我们只提示,不处理

static void drop ( const struct mpsc_pbuf_buffer *buffer, const union mpsc_pbuf_generic *item )
{
    struct mpsc_sample_header *t_item = ( struct mpsc_sample_header * ) item;
    LOG_INF ( "drop packet length %d", t_item->length ) ;
}

除了 MPSC_PBUF_HDR 外 MPSC 内部并不关心和访问 header 内任何数据,因此需要注册一个获取长度的回调用于 MPSC 内部获取数据包的长度

static uint32_t get_wlen ( const union mpsc_pbuf_generic *item )
{
    struct mpsc_sample_header *t_item = ( struct mpsc_sample_header * ) item;
    LOG_INF ( "get_wlen %d", t_item->length ) ;
    return t_item->length;
}

执行初始化

//MPSC 的数据包 buffer
static uint32_t buf [128];

//mpsc 管理器
struct mpsc_pbuf_buffer buffer;

static int mpsc_sample_cmd ( const struct shell *shell, size_t argc, char **argv )
{
    //初始化 mpsc_buf_cfg,设置 buffer 和 callback, 分配不到空间时运行 overwrite
    struct mpsc_pbuf_buffer_config mpsc_buf_cfg = {
        .buf = buf32,
        .size = ARRAY_SIZE ( buf32 ) ,
        .notify_drop = drop,
        .get_wlen = get_wlen,
        .flags = MPSC_PBUF_MODE_OVERWRITE;
    };

    mpsc_pbuf_init ( &buffer, &mpsc_buf_cfg ) ;
}

初始化的 flags 用户可配置参数为:

  • MPSC_PBUF_MODE_OVERWRITE : 分配不到包空间时进行丢最老包的操作
  • MPSC_PBUF_MAX_UTILIZATION : 记录 MPSC buffer 最大使用到多少,一般 debug 用

生产者 链接到标题

生产者产生数据:

  • 从 MPSC 申请空间 mpsc_pbuf_alloc
  • 填充数据包
  • 提交数据包 mpsc_pbuf_commit

需要注意的是从 MPSC 申请的空间是包含了包头的空间,如果你要通过 MPSC 传送 32 个字节的净数据的包,那么就需要申请 32+sizeof ( mpsc_sample_header ) 个字节的空间

//传递 32 个字节净数据
uint32_t packet_len = 32+sizeof ( mpsc_sample_header ) ;
//从 MPSC 申请空间
union mpsc_packet *packet = mpsc_pbuf_alloc ( &buffer, packet_len, K_NO_WAIT ) ;
//填充数据
memset ( packet->packet.data, 0x5a, 32 ) ;
//提交数据
mpsc_pbuf_commit ( &buffer, packet ) ;

mpsc_pbuf_alloc 的特性:

  • buffer 空间充足:申请到包空间
  • buffer 空间不足:
    • 有等待超时:超时时间内等待消费者释放包空间,如果释放的包空间小于请求的空间,将会再次等待 ( 这可能会实际导致等待时间几倍于指定的等待时间 )
    • 无等待超时:当指定了 MPSC_PBUF_MODE_OVERWRITE 的情况下,进行丢老包处理,腾出空间放新包。如果没有指定 MPSC_PBUF_MODE_OVERWRITE 则请求空间失败。

消费者 链接到标题

消费者消费数据:

  • 从 MPSC 中请求一个数据包 mpsc_pbuf_claim
  • 处理数据
  • 通知 MPSC 释放数据包 mpsc_pbuf_free
//请求一个数据包
union mpsc_packet *packet_read = mpsc_pbuf_claim ( &buffer ) ;

if ( packet_read ) {
    //处理数据包
    shell_print ( shell, "Dump packet[%d]:", packet_read->packet.hdr.length ) ;
    shell_hexdump ( shell, ( const uint8_t* ) packet_read->packet.data, packet_read->packet.hdr.length ) ;

    //释放数据包
    mpsc_pbuf_free ( &buffer, packet_read ) ;
}

其它 API 链接到标题

除了前面 5 个 API 外,MPSC 还提供其它的辅助 API。

下面三个 API 是简化生产者的操作,不必分请求和提交两步进行,但需要注意的是当 MPSC 中 buffer 不足且未指定 MPSC_PBUF_MODE_OVERWRITE 时,下面三个 API 都不会成功的将数据放入到 MPSC 中,在调用这三个 API 前可以先通过 mpsc_pbuf_get_utilization 进行空间确认

//向 buffer 中写单个字 ( 4byte 数据 ) word->raw
void mpsc_pbuf_put_word ( struct mpsc_pbuf_buffer *buffer,
            const union mpsc_pbuf_generic word ) ;

//向 buffer 中写单个字 ( 4byte 数据 ) word->raw 和一个数据指针 data
void mpsc_pbuf_put_word_ext ( struct mpsc_pbuf_buffer *buffer,
                const union mpsc_pbuf_generic word,
                const void *data ) ;

//向 buffer 中写入指向 data 的 wlen 长度的数据
void mpsc_pbuf_put_data ( struct mpsc_pbuf_buffer *buffer,
            const uint32_t *data, size_t wlen ) ;

下面三个 API 时获取 MPSC 的状态

//返回 MPSC 中是否还有可用包
bool mpsc_pbuf_is_pending ( struct mpsc_pbuf_buffer *buffer ) ;

//MPSC 中 buffer 使用情况, now 表示已使用,size 表示总数。
void mpsc_pbuf_get_utilization ( struct mpsc_pbuf_buffer *buffer,
                   uint32_t *size, uint32_t *now ) ;

//max 为 MPSC buffer 最大的使用量情况,当初始化 MPSC 没有指定 MPSC_PBUF_MAX_UTILIZATION 时将返回-ENOTSUP
int mpsc_pbuf_get_max_utilization ( struct mpsc_pbuf_buffer *buffer, uint32_t *max ) ;

参考 链接到标题

https://docs.zephyrproject.org/latest/kernel/data_structures/mpsc_pbuf.html