Zephyr内核对象-数据传递之Message Queue

本文简要说明Zephyr Message Queue的使用和实现。

Zephyr内核对象–数据传递对象简介一文中已经大概介绍了Message queue的特性,本文将继续说明F其使用和实现。

使用 链接到标题

API 链接到标题

Message queue的API有下面10个全部声明在kernel.h中,每个函数都有参数struct k_msgq msgq. 都是指该函数操作或者使用的msgq后面就不在单独列出说明 **void k_msgq_init(struct k_msgq q, char buffer, size_t msg_size, u32_t max_msgs); 作用:初始化一个msgq, 内存由使用者分配 buffer: msgq的buffer,需要由使用者分配,大小为msg_sizemax_msgs msg_size: msgq中每个message的大小 max_mags: msgq中最多容纳的message数量 *__syscall int k_msgq_alloc_init(struct k_msgq msgq, size_t msg_size, u32_t max_msgs); 作用:初始化一个msgq, 内存由msgq从线程池中分配 msg_size: msgq中每个message的大小 max_mags: msgq中最多容纳的message数量 *int k_msgq_cleanup(struct k_msgq msgq); 作用:释放k_msgq_alloc_init分配msgq内存 **__syscall int k_msgq_put(struct k_msgq msgq, void data, s32_t timeout); 作用:将message放入到msgq data:message数据 timeout: 等待时间,单位ms。K_NO_WAIT不等待, K_FOREVER一直等 返回值:放入成功返回0 **__syscall int k_msgq_get(struct k_msgq msgq, void data, s32_t timeout); 作用:从msgq读出message data:message数据 timeout: 等待时间,单位ms。K_NO_WAIT不等待, K_FOREVER一直等 返回值:读出成功返回0 **__syscall int k_msgq_peek(struct k_msgq msgq, void data); 作用:peek msgq data: peek到的message 返回: peek到数据返回0 *__syscall void k_msgq_purge(struct k_msgq msgq); 作用:清空msgq中的message *__syscall u32_t k_msgq_num_free_get(struct k_msgq msgq); 作用:获取msgq还可以放多少个message 返回值:空闲数目 **__syscall void k_msgq_get_attrs(struct k_msgq msgq, struct k_msgq_attrs attrs); 作用:获取msgq的信息,也就是message的大小,总数量和已使用数量,都放在struct k_msgq_attrs 内 *__syscall u32_t k_msgq_num_used_get(struct k_msgq msgq); 作用:获取msgq中有多少个message 返回值:message数目

使用说明 链接到标题

可以在ISR中put msgq.也可在ISR内get msgq,但不能等待。 msgq必须事先指定message的大小和个数。大小需要是2的幂对齐。 msgq用于异步传输小数据。msgq在读写时需要锁中断,因此不建议用来传输大数据。

初始化 链接到标题

初始化一个queue, 由用户分配内存

struct data_item_type {     //message的数据结构
    u32_t field1;
    u32_t field2;
    u32_t field3;
};

char __aligned(4) my_msgq_buffer[10 * sizeof(data_item_type)];
struct k_msgq my_msgq;

k_msgq_init(&my_msgq, my_msgq_buffer, sizeof(data_item_type), 10);

由message queue自己在线程池中分配

k_msgq_alloc_init(&my_msgq, sizeof(data_item_type), 10);

写入message 链接到标题

运行在线程或者ISR中写入message,ISR中写入时不能发生等待。示例如下

void producer_thread(void)
{
    struct data_item_t data;

    while (1) {
        /* create data item to send (e.g. measurement, timestamp, ...) */
        data = ...

        /* send data to consumers */
        while (k_msgq_put(&my_msgq, &data, K_NO_WAIT) != 0) {
            /* message queue is full: purge old data & try again */
            //这里purge并不是必要步骤,是否需要进行purge根据实际应用由用户自己选择
            k_msgq_purge(&my_msgq);
        }

        /* data item was successfully added to message queue */
    }
}

读message 链接到标题

可以将message从msgq读出, 之后msgq中不再有该message

void consumer_thread(void)
{
    struct data_item_t data;

    while (1) {
        /* get a data item */
        k_msgq_get(&my_msgq, &data, K_FOREVER);

        /* process data item */
        ...
    }
}

也可以只是peek,该message任然保留在msgq中

void consumer_thread(void)
{
    struct data_item_t data;

    while (1) {
        /* read a data item by peeking into the queue */
        if(0 == k_msgq_peek(&my_msgq, &data)){
            /* process data item */
        }
        ...
    }
}

实现 链接到标题

msgq的实现代码在zephyr/kernel/msg_q.c中,msgq是以ringbuffer的模式进行管理,在初始化的时候建立ringbuffer,读写数据时都是以固定的单位大小从ringbuffer内读写数据。 msgq的数据结构如下

struct k_msgq {
	_wait_q_t wait_q;       //wait_q用于控制msgq的等待
	struct k_spinlock lock;  //msgq多线程保护锁
	size_t msg_size;        // message的大小
	u32_t max_msgs;     // msgq最大容纳message的个数
	char *buffer_start;     //msgq ringbuffer的开始地址
	char *buffer_end;       //msgq ringbuffer的结束地址
	char *read_ptr;             //msgq ringbuffer的读指针
	char *write_ptr;        //msgq ringbuffer的写指针
	u32_t used_msgs;    //msgq中有效message的个数
	u8_t flags;                     //msgq的ringbuffer从线程池分配标志
};

示意图 如下,每读或者写一个message,读写指针就向前移动msg_size msgq

初始化/释放 链接到标题

初始化msgq就是对struct_msgq中的各成员进行初始化

void k_msgq_init(struct k_msgq *msgq, char *buffer, size_t msg_size,
		 u32_t max_msgs)
{
    //初始化各成员
	msgq->msg_size = msg_size;
	msgq->max_msgs = max_msgs;
	msgq->buffer_start = buffer;
	msgq->buffer_end = buffer + (max_msgs * msg_size);
	msgq->read_ptr = buffer;
	msgq->write_ptr = buffer;
	msgq->used_msgs = 0;
	msgq->flags = 0;        // msgq ringbuffer是由使用者分配,这里设为0
	z_waitq_init(&msgq->wait_q);    //初始化msgq的wait_q
	msgq->lock = (struct k_spinlock) {};

	z_object_init(msgq);
}

k_msgq_alloc_init->z_impl_k_msgq_alloc_init, msgq内存从线程池中分配,再使用k_msgq_init初始化

int z_impl_k_msgq_alloc_init(struct k_msgq *msgq, size_t msg_size,
			    u32_t max_msgs)
{
	void *buffer;
	int ret;
	size_t total_size;

    //计算msg_size乘max_msgs,并检查是否溢出,实际调用的是__builtin_mul_overflow
	if (size_mul_overflow(msg_size, max_msgs, &total_size)) {
		ret = -EINVAL;
	} else {
        //从线程池中分配msgq的ringbuffer 内存
		buffer = z_thread_malloc(total_size);
		if (buffer != NULL) {
            //初始化各变量
			k_msgq_init(msgq, buffer, msg_size, max_msgs);
            //使用K_MSGQ_FLAG_ALLOC在flags中标识ringbuffer是从线程池中分配
			msgq->flags = K_MSGQ_FLAG_ALLOC;
			ret = 0;
		} else {
			ret = -ENOMEM;
		}
	}

	return ret;
}

如果msgq是从线程池中分配的内存,可以使用k_msgq_cleanup将其释放

int k_msgq_cleanup(struct k_msgq *msgq)
{
    //如果还有thread在等待msgq,说明不能释放,退出
	CHECKIF(z_waitq_head(&msgq->wait_q) != NULL) {
		return -EBUSY;
	}

    //判断alloc标志,并释放内存
	if ((msgq->flags & K_MSGQ_FLAG_ALLOC) != 0) {
		k_free(msgq->buffer_start);
		msgq->flags &= ~K_MSGQ_FLAG_ALLOC;
	}
	return 0;
}

mssage操作 链接到标题

写msgq 链接到标题

k_msgq_put->z_impl_k_msgq_put

int z_impl_k_msgq_put(struct k_msgq *msgq, void *data, s32_t timeout)
{
    //isr内写msgq不能等
	__ASSERT(!arch_is_in_isr() || timeout == K_NO_WAIT, "");

	struct k_thread *pending_thread;
	k_spinlock_key_t key;
	int result;

	key = k_spin_lock(&msgq->lock);


	if (msgq->used_msgs < msgq->max_msgs) {
		//msgq中ringbuffer有空间

        //检查是否有thread在等待读取msgq的message
		pending_thread = z_unpend_first_thread(&msgq->wait_q);
		if (pending_thread != NULL) {
			//有线程在等message,直接将该数据提供给等待线程
			(void)memcpy(pending_thread->base.swap_data, data,
			       msgq->msg_size);
			//等待线程拿到数据后,让等待线程ready,并重新调度
			arch_thread_return_value_set(pending_thread, 0);
			z_ready_thread(pending_thread);
			z_reschedule(&msgq->lock, key);
			return 0;
		} else {
			//没有线程需要数据,则将数据放入ringbuffer
			(void)memcpy(msgq->write_ptr, data, msgq->msg_size);
			msgq->write_ptr += msgq->msg_size;
			if (msgq->write_ptr == msgq->buffer_end) {
				msgq->write_ptr = msgq->buffer_start;
			}
            //更新msgq剩余的message数
			msgq->used_msgs++;
		}
		result = 0;
	} else if (timeout == K_NO_WAIT) {
		//msgq ringbuffer满,且不等待就立即退出
		result = -ENOMSG;
	} else {
		//msgq 满,将message放入swap_data,等待其它thread来读
		_current->base.swap_data = data;
		return z_pend_curr(&msgq->lock, key, &msgq->wait_q, timeout);
	}

	k_spin_unlock(&msgq->lock, key);

	return result;
}

读msgq 链接到标题

k_msgq_get->z_impl_k_msgq_get

int z_impl_k_msgq_get(struct k_msgq *msgq, void *data, s32_t timeout)
{
        //isr内读msgq不能等
	__ASSERT(!arch_is_in_isr() || timeout == K_NO_WAIT, "");

	k_spinlock_key_t key;
	struct k_thread *pending_thread;
	int result;

	key = k_spin_lock(&msgq->lock);

	if (msgq->used_msgs > 0) {
		//ringbuffer中有数据,直接从ringbuffer中读出message
		(void)memcpy(data, msgq->read_ptr, msgq->msg_size);
		msgq->read_ptr += msgq->msg_size;
		if (msgq->read_ptr == msgq->buffer_end) {
			msgq->read_ptr = msgq->buffer_start;
		}

        //更新msgq剩余的message数
		msgq->used_msgs--;

		//此时ringbuffer有空闲空间,如果有thead在等待写msgq,则在这里写入
		pending_thread = z_unpend_first_thread(&msgq->wait_q);
		if (pending_thread != NULL) {
			/* add thread's message to queue */
			(void)memcpy(msgq->write_ptr, pending_thread->base.swap_data,
			       msgq->msg_size);
			msgq->write_ptr += msgq->msg_size;
			if (msgq->write_ptr == msgq->buffer_end) {
				msgq->write_ptr = msgq->buffer_start;
			}
            //更新msgq剩余的message数
			msgq->used_msgs++;

			//等待写入msgq的thread在写入msgq后变为ready,并重新调度
			arch_thread_return_value_set(pending_thread, 0);
			z_ready_thread(pending_thread);
			z_reschedule(&msgq->lock, key);
			return 0;
		}
		result = 0;
	} else if (timeout == K_NO_WAIT) {
		/msgq ringbuffer空,且不等待就立即退出
		result = -ENOMSG;
	} else {
		//msgq 空,将message放入swap_data,等待其它thread来写
		_current->base.swap_data = data;
		return z_pend_curr(&msgq->lock, key, &msgq->wait_q, timeout);
	}

	k_spin_unlock(&msgq->lock, key);

	return result;
}

peek msgq 链接到标题

也可以通过peek读message,该方式不会将message从msgq的ringbuffer中删除 k_msgq_peek->z_impl_k_msgq_peek

int z_impl_k_msgq_peek(struct k_msgq *msgq, void *data)
{
	k_spinlock_key_t key;
	int result;

	key = k_spin_lock(&msgq->lock);

	if (msgq->used_msgs > 0) {
		//ringbuffer中有数据直接copy出去
		(void)memcpy(data, msgq->read_ptr, msgq->msg_size);
		result = 0;
	} else {
		//ringbuffer中有无数据返回错误
		result = -ENOMSG;
	}

	k_spin_unlock(&msgq->lock, key);

	return result;
}

清空msgq 链接到标题

当不需要msgq中的数据时可以使用k_msgq_purge清空 k_msgq_purge->z_impl_k_msgq_purge

void z_impl_k_msgq_purge(struct k_msgq *msgq)
{
	k_spinlock_key_t key;
	struct k_thread *pending_thread;

	key = k_spin_lock(&msgq->lock);

	//清空ringbuffer前,会先让等待读msgq的thead将message读走
	while ((pending_thread = z_unpend_first_thread(&msgq->wait_q)) != NULL) {
		arch_thread_return_value_set(pending_thread, -ENOMSG);
		z_ready_thread(pending_thread);
	}

    //复位 ringbuffer
	msgq->used_msgs = 0;
	msgq->read_ptr = msgq->write_ptr;

	z_reschedule(&msgq->lock, key);
}

获取msgq信息 链接到标题

获取msgq的信息函数实现很简单结合前面k_msgq的结构体很容易理解,这里就不再注释分析了

void z_impl_k_msgq_get_attrs(struct k_msgq *msgq, struct k_msgq_attrs *attrs)
{
	attrs->msg_size = msgq->msg_size;
	attrs->max_msgs = msgq->max_msgs;
	attrs->used_msgs = msgq->used_msgs;
}

static inline u32_t z_impl_k_msgq_num_free_get(struct k_msgq *msgq)
{
	return msgq->max_msgs - msgq->used_msgs;
}

static inline u32_t z_impl_k_msgq_num_used_get(struct k_msgq *msgq)
{
	return msgq->used_msgs;
}

参考 链接到标题

https://gcc.gnu.org/onlinedocs/gcc/Integer-Overflow-Builtins.html https://docs.zephyrproject.org/latest/reference/kernel/data_passing/message_queues.html