Zephyr线程阻塞和超时机制分析

本文分析Zephyr是如何实现对线程阻塞和超时机制。

概述 链接到标题

从Zephyr的代码结构上来看阻塞和超时的实现都放在sched.c中,但为了更容易的理解Zephyr内核的调度,本文将阻塞和超时抽取出来,分析Zephyr的同步/数据传递对线程的阻塞和超时实现,同时还分析k_sleep是如何让thread进入sleep并从sleep中恢复。

基础概念 链接到标题

在分析具体的阻塞实现和超时前先介绍以下基本概念: zephyr kernel有一个ready_q,所有就绪可以被调度的thread都放在这个ready_q中,其数据结构在include/kernel_structs.h中 ready_q的根据配置的调度方式可以是链表/红黑树/queue,本文我们只需要记住任务就绪时被放入到ready_q中即可。

阻塞等待 链接到标题

Zephyr内核对象–同步之信号量等文章(sem,msgq,poll,queue,stack,msgq)中我们可以看到,当这些内核对象都是依赖wait_q来实现对thread的等待. 可以在对应的数据结构struct k_mutex, struct k_sem, struct k_queue, struct k_stack,struct k_msgq,内都可以找到_wait_q_t wait_q, k_poll比较特殊,在Zephyr内核对象–同步之轮询一文中已经介绍poll的wait_q用的是局部变量,但等待的方式还是wait_q。 无论上面哪种内核对象最后都通过下面的代码等待内核对象就绪,大家可以自行查看相关内核对象的代码。

等待 链接到标题

这里以sem为例进行说明,阻塞等待timeout=K_FOREVER

int z_impl_k_sem_take(struct k_sem *sem, s32_t timeout)
{
	int ret = 0;

	k_spinlock_key_t key = k_spin_lock(&lock);

	if (likely(sem->count > 0U)) {
		sem->count--;
		k_spin_unlock(&lock, key);
		ret = 0;
		goto out;
	}

	if (timeout == K_NO_WAIT) {
		k_spin_unlock(&lock, key);
		ret = -EBUSY;
		goto out;
	}

	//等待sem的wait_q
	ret = z_pend_curr(&lock, key, &sem->wait_q, timeout);


	return ret;
}

等待的机制如下:

  1. 将thread从ready_q中移除,之后调度不会再调度到该thread
  2. 将thread加入到wait_q
  3. 重新调度,重调度将从该thread切到其它thread运行,该thread就保持在z_swap的上下文而被阻塞
int z_pend_curr(struct k_spinlock *lock, k_spinlock_key_t key,
	       _wait_q_t *wait_q, s32_t timeout)
{
	//将当前的thread加入到wait_q中
	pend(_current, wait_q, timeout);

	//进行调度
	return z_swap(lock, key);
}

static void pend(struct k_thread *thread, _wait_q_t *wait_q, s32_t timeout)
{
	LOCKED(&sched_spinlock) {
		add_to_waitq_locked(thread, wait_q);
	}

	//阻塞等待时timeout=K_FOREVER,add_thread_timeout_ms退化为不做任何动作
	add_thread_timeout_ms(thread, timeout);
}

static void add_to_waitq_locked(struct k_thread *thread, _wait_q_t *wait_q)
{
	//将thread从ready_q中移除
	unready_thread(thread);
	z_mark_thread_as_pending(thread);

	if (wait_q != NULL) {
		//更新thread等待的wait_q
		thread->base.pended_on = wait_q;
		//将thread放入wait_q
		z_priq_wait_add(&wait_q->waitq, thread);
	}
}

z_swap进行重调度z_swap->z_swap_irqlock->arch_swap对于arm 32位arch来说如下

int arch_swap(unsigned int key)
{
	_current->arch.basepri = key;
	_current->arch.swap_return_value = _k_neg_eagain;

	//触发pendsv,将在这里进入pendsv exception
	//在exception中保存该thread上下文,然后切到其它thread运行
	SCB->ICSR |= SCB_ICSR_PENDSVSET_Msk;

	//当该thread重新被调度时,会从这里继续执行,然后pend返回

	/* clear mask or enable all irqs to take a pendsv */
	irq_unlock(0);

	//恢复调度后的返回值,将会在解除等待时设置
	return _current->arch.swap_return_value;
}

解除等待 链接到标题

解除等待的机制如下

  1. 从wait_q中选取一个thread,将thread从wait_q中移除
  2. 设置thread 恢复时swap的换回值
  3. 将thread放回ready_q, 之后将会被kernel调度恢复运行
void z_impl_k_sem_give(struct k_sem *sem)
{
	k_spinlock_key_t key = k_spin_lock(&lock);

	//从wait_q中取出一个thread
	struct k_thread *thread = z_unpend_first_thread(&sem->wait_q);

	if (thread != NULL) {
		//为恢复的thread设置swap返回值
		arch_thread_return_value_set(thread, 0);

		//将thread放入ready_q等待被调度
		z_ready_thread(thread);
	} else {
		sem->count += (sem->count != sem->limit) ? 1U : 0U;
		handle_poll_events(sem);
	}

	z_reschedule(&lock, key);
}

超时等待 链接到标题

等待 链接到标题

和阻塞等待的流程一样,但是会建立一个timer

static void pend(struct k_thread *thread, _wait_q_t *wait_q, s32_t timeout)
{
	LOCKED(&sched_spinlock) {
		add_to_waitq_locked(thread, wait_q);
	}

	//建立超时timer
	add_thread_timeout_ms(thread, timeout);
}

建立的timer的代码如下,最后使用的z_add_timeout,详情可参考Zephyr内核Timeout模块简介

static void add_thread_timeout_ms(struct k_thread *thread, s32_t timeout)
{
	if (timeout != K_FOREVER) {
		s32_t ticks;

		if (timeout < 0) {
			timeout = 0;
		}
		//计算timer要等待的tick数
		ticks = _TICK_ALIGN + k_ms_to_ticks_ceil32(timeout);

		//建立超时timer, tick后超时
		z_add_thread_timeout(thread, ticks);
	}
}

static inline void z_add_thread_timeout(struct k_thread *th, s32_t ticks)
{
	//thread ticks超时后会调用z_thread_timeout
	z_add_timeout(&th->base.timeout, z_thread_timeout, ticks);
}

被其它thread解除等待 链接到标题

被其它thread解除等待的流程和阻塞式的大致一样,因为在超时前已经解除等待,因此timer的timeout不再需要,在thread从wait_q中取出thread时,会删除timer

struct k_thread *z_unpend_first_thread(_wait_q_t *wait_q)
{
	struct k_thread *thread = z_unpend1_no_timeout(wait_q);

	if (thread != NULL) {
		//将thread的timer移除
		(void)z_abort_thread_timeout(thread);
	}

	return thread;
}

超时解除等待 链接到标题

在等待时注册了timer,该timer超时后会调用z_thread_timeout

void z_thread_timeout(struct _timeout *timeout)
{
	struct k_thread *thread = CONTAINER_OF(timeout,
					       struct k_thread, base.timeout);

	//超时发生时尚未被其它thread解除等待pended_on将不为NULL
	if (thread->base.pended_on != NULL) {
		//将thread从wait_q中移除
		z_unpend_thread_no_timeout(thread);
	}
	z_mark_thread_as_started(thread);
	z_mark_thread_as_not_suspended(thread);
	//将thread放入ready_q
	z_ready_thread(thread);
}

此时thread因为超时发生被放入了ready_q,kernel将对其进行调度运行,当thread重新被调度时pend将会返回-EAGAIN表示timeout了 下面看下pend是如何返回-EAGAIN, 从前面的分析可以知道pend最后是调用arch_swap,进入pending状态,恢复调度后又从arch_swap继续运行并返回

int arch_swap(unsigned int key)
{
	_current->arch.basepri = key;
	//在进入pending前会个swap_return_value设置为_k_neg_eagain,该值就是-EAGAIN
	_current->arch.swap_return_value = _k_neg_eagain;

	//触发pendsv,将在这里进入pendsv exception
	//在exception中保存该thread上下文,然后切到其它thread运行
	SCB->ICSR |= SCB_ICSR_PENDSVSET_Msk;

	//当该thread重新被调度时,会从这里继续执行,由于超时恢复调度,然后pend返回

	/* clear mask or enable all irqs to take a pendsv */
	irq_unlock(0);

	//恢复调度后的返回值,如果是超时解除等待swap_return_value不会被其它thread用arch_thread_return_value_set设置
	//因此这里讲返回_k_neg_eagain,也就是-EAGAIN
	return _current->arch.swap_return_value;
}

k_sleep 链接到标题

Thread可以通过k_sleep进入睡眠,通过k_wakeup唤醒睡眠的thread. k_sleep的本质和pending一样,就是将thread从ready_q中移除,让kernel不对齐进行调度,当timeout后或者其它thread对其进行wakeup时,再将该thread放回ready_q.

超时睡眠 链接到标题

流程代码分析如下

s32_t z_impl_k_sleep(int ms)
{
	s32_t ticks;
	//ISR中不能做sleep
	__ASSERT(!arch_is_in_isr(), "");

	//如果是永远sleep,不用创建timer,直接挂起当前thread
	if (ms == K_FOREVER) {
		k_thread_suspend(_current);
		return K_FOREVER;
	}

	//超时睡眠
	ticks = k_ms_to_ticks_ceil32(ms);
	ticks = z_tick_sleep(ticks);
	return k_ticks_to_ms_floor64(ticks);
}

static s32_t z_tick_sleep(s32_t ticks)
{

	u32_t expected_wakeup_time;

	__ASSERT(!arch_is_in_isr(), "");


	//如果是sleep 0,就是k_yield
	if (ticks == 0) {
		k_yield();
		return 0;
	}

	ticks += _TICK_ALIGN;
	expected_wakeup_time = ticks + z_tick_get_32();


	struct k_spinlock local_lock = {};
	k_spinlock_key_t key = k_spin_lock(&local_lock);

	//将thread从ready_q中移除
	z_remove_thread_from_ready_q(_current);

	//为thread添加timer,ticks后超时,会将thread恢复到ready_q中
	//这个机制和超时等待的一样,不再展开分析
	z_add_thread_timeout(_current, ticks);
	z_mark_thread_as_suspended(_current);

	(void)z_swap(&local_lock, key);

	__ASSERT(!z_is_thread_state_set(_current, _THREAD_SUSPENDED), "");

	ticks = expected_wakeup_time - z_tick_get_32();
	if (ticks > 0) {
		return ticks;
	}

	return 0;
}

可以看到k_sleep和超时等待的机制一样,但k_sleep不需要wait_q,thread sleep时将thread从ready_q中移除,将自己加入到timer中,timer到了后又将thread放入ready_q.

永久睡眠 链接到标题

从前面分析看永久睡眠调用的是k_thread_suspend->z_impl_k_thread_suspend

void z_impl_k_thread_suspend(struct k_thread *thread)
{
	//如果thread正在等待超时,放弃等待
	(void)z_abort_thread_timeout(thread);

	LOCKED(&sched_spinlock) {
		if (z_is_thread_queued(thread)) {
			//将thread从ready_q中移除
			_priq_run_remove(&_kernel.ready_q.runq, thread);
			z_mark_thread_as_not_queued(thread);
		}
		z_mark_thread_as_suspended(thread);
		update_cache(thread == _current);
	}

	if (thread == _current) {
		z_reschedule_unlocked();
	}
}

唤醒 链接到标题

在thread被睡眠时可以通过wakeup唤醒

void z_impl_k_wakeup(k_tid_t thread)
{
	if (z_is_thread_pending(thread)) {
		return;
	}

	//如果是timeout的,先删除timer
	if (z_abort_thread_timeout(thread) < 0) {
		/* Might have just been sleeping forever */
		if (thread->base.thread_state != _THREAD_SUSPENDED) {
			return;
		}
	}

	//将thread放入ready_q
	z_mark_thread_as_not_suspended(thread);
	z_ready_thread(thread);


	if (!arch_is_in_isr()) {
		z_reschedule_unlocked();
	}
}

总结 链接到标题

前面说了这么多,可能有点混乱,下面一张图总结一下 waitq kernel只有一个ready_q,可被调度的thread都被放到ready_q中 每个需要等待的内核对象都有一个wait_q, 当thread等待该内核对象时,该thread会从ready_q中移除,放入到等待内核对象的wait_q中 当内核对象有效时,会将thread从wait_q中移除又放回ready_q 进行k_sleep的thread,没有wait_q,会将thread从read_q中移除,并加入到timeout_list中,当timeout到了后,timeout callback又会将thread加入到ready_q中

关于wait_q/ready_q 链接到标题

wait_q和ready_q的管理和调度有关,不同的调度配置会有不同的实现方式,主要是涉及thread加入q和移除q的方法,本文就不再展开。