Nuttx - IPC

从信号量到消息队列、从共享内存到管道,全面解析 NuttX KERNEL 模式下的 IPC 机制,深入剖析消息队列、共享内存和管道的内核实现。

本文回答以下问题:NuttX 提供了哪些 IPC 机制,各自适用什么场景?消息队列如何实现带优先级的消息排序和阻塞等待?共享内存在 KERNEL 模式下如何通过 MMU 将同一物理页映射到多个进程?管道的环形缓冲区如何实现读写阻塞?读完后,你将能够选择合适的 IPC 机制,并从源码级别理解其内核实现。


1. 开篇:为什么需要多种 IPC 机制?

在 KERNEL 模式下,每个进程有独立的虚拟地址空间——进程 A 的指针对进程 B 毫无意义。进程间要通信,必须通过内核提供的受控通道。不同场景需要不同特征的通道:

需求 适合的 IPC
互斥访问共享资源 信号量 / 互斥锁
传递结构化消息(带优先级) 消息队列
高性能大数据共享(零拷贝) 共享内存
流式字节数据传递 管道 / FIFO
异步通知 信号(已在前文覆盖)

NuttX 实现了 POSIX 标准(IEEE Std 1003.1)定义的全部 IPC 机制。NuttX 官方文档(https://nuttx.apache.org/docs/latest/reference/user/index.html)提供了各 API 的用户手册。本文先概览各机制的使用方式,再深入消息队列、共享内存和管道的内核实现。


2. IPC 机制使用概览

以下代码示例为用户应用层代码(非内核源码),展示各 IPC 机制的标准 POSIX API 用法。

2.1 信号量(Semaphore)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
#include <semaphore.h>

sem_t sem;
sem_init(&sem, 0, 1); /* 初始值 1(互斥锁用法)*/
sem_wait(&sem); /* P 操作:计数-1,若 <0 则阻塞 */
/* ... 临界区 ... */
sem_post(&sem); /* V 操作:计数+1,唤醒等待者 */
sem_destroy(&sem);

/* 命名信号量(跨进程)*/
sem_t *sem = sem_open("/mysem", O_CREAT, 0666, 1);
sem_wait(sem);
sem_post(sem);
sem_close(sem);

NuttX 信号量支持优先级继承(已在调度博客中讲解),阻塞的任务按优先级排序在 g_waitingforsemaphore 链表中。

2.2 互斥锁(Mutex)

互斥锁用于同一进程内的线程间互斥,不支持跨进程共享。NuttX 的 mutex 接受 PTHREAD_PROCESS_SHARED 属性但内部忽略它——该属性存在是为了 POSIX 兼容性,不改变实际行为。

1
2
3
4
5
6
7
#include <pthread.h>

pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER;

pthread_mutex_lock(&mtx);
/* ... 临界区 ... */
pthread_mutex_unlock(&mtx);

NuttX 的 mutex 在内部基于信号量实现(sched/mutex/),增加了所有权检查(按 PID 判断)、递归计数(PTHREAD_MUTEX_RECURSIVE)和优先级继承。

如果需要在不同进程间做互斥同步,请用命名信号量 sem_open()——它是 NuttX 官方支持的跨进程 IPC 机制。

2.3 消息队列(Message Queue)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#include <mqueue.h>

/* 创建/打开 */
struct mq_attr attr = { .mq_maxmsg = 10, .mq_msgsize = 256 };
mqd_t mq = mq_open("/myqueue", O_CREAT | O_RDWR, 0666, &attr);

/* 发送(带优先级,0-255,高优先级先被接收)*/
mq_send(mq, "hello", 6, 100); /* priority = 100 */
mq_send(mq, "urgent", 7, 200); /* priority = 200,将排在前面 */

/* 接收(总是取出最高优先级的消息)*/
char buf[256];
unsigned int prio;
mq_receive(mq, buf, 256, &prio); /* 先收到 "urgent"(prio=200) */

mq_close(mq);
mq_unlink("/myqueue");

2.4 共享内存(Shared Memory)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#include <sys/shm.h>

/* System V 风格 */
int shmid = shmget(1234, 4096, IPC_CREAT | 0666);
void *ptr = shmat(shmid, NULL, 0);
/* 直接读写 ptr,另一个进程 shmat 同一 shmid 可看到相同内容 */
shmdt(ptr);
shmctl(shmid, IPC_RMID, NULL);

/* POSIX 风格(通过 VFS)*/
int fd = shm_open("/myshm", O_CREAT | O_RDWR, 0666);
ftruncate(fd, 4096);
void *ptr = mmap(NULL, 4096, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0);
/* ... use ptr ... */
munmap(ptr, 4096);
shm_unlink("/myshm");

注意:共享内存仅在 CONFIG_BUILD_KERNEL 模式下可用——它需要 MMU 和地址环境支持。

2.5 管道与 FIFO

1
2
3
4
5
6
7
8
9
10
11
12
13
#include <unistd.h>

/* 匿名管道(父子进程间)*/
int pipefd[2];
pipe(pipefd);
/* pipefd[0] = 读端, pipefd[1] = 写端 */
write(pipefd[1], "data", 5);
read(pipefd[0], buf, 5);

/* 命名管道 FIFO(任意进程间)*/
mkfifo("/tmp/myfifo", 0666);
int fd = open("/tmp/myfifo", O_WRONLY); /* 阻塞直到有读端打开 */
write(fd, "data", 5);

概览完毕。下面深入消息队列的内核实现。


3. 深入:消息队列内核实现

消息队列提供三个关键特性:消息边界保持(每条消息完整交付,不会像管道那样被截断)、优先级排序(紧急消息可以插队)、内核托管(发送方不需要等接收方准备好)。

本章将逐一回答四个核心问题:

  1. mq_open 做了什么?——如何通过 VFS inode 创建和查找消息队列
  2. 用户空间怎么到内核空间?——SVC 系统调用从触发到分派的完整链路
  3. 消息发送和接收怎么实现?——mq_send/mq_receive 的逐步骤分析
  4. 如何管理阻塞与唤醒?——优先级等待队列 + 超时看门狗的协作机制

3.1 核心数据结构

消息队列由三层数据结构组成:消息节点(mqueue_msg_s)存储单条消息;队列本体(mqueue_inode_s)管理消息链表和容量;公共头部(mqueue_cmn_s)管理阻塞任务。mqueue_inode_s 内嵌 mqueue_cmn_s,而 mqueue_cmn_s 的两个 dq_queue_t 头各自链接着按优先级排序的 TCB。mqueue_inode_s 作为 inode->i_private 挂在 VFS 中,是内核操作消息队列的唯一入口。

下图展示了这些结构体的嵌套关系、字段布局以及”通过 VFS inode 找到 msgq → 通过 waitfornotempty/waitfornotfull 管理阻塞 TCB → 通过 msglist 存取消息”的完整数据流:

mq_data_struct

文件:sched/mqueue/mqueue.h:68-79

1
2
3
4
5
6
7
8
struct mqueue_msg_s
{
struct list_node node; /* 链表节点 */
uint8_t type; /* 分配类型:预分配/动态/中断保留 */
uint8_t priority; /* 消息优先级(0 ~ MQ_PRIO_MAX-1)*/
uint16_t msglen; /* 消息数据长度 */
char mail[1]; /* 可变长消息体(flexible array)*/
};

文件:include/nuttx/mqueue.h:103-131

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
struct mqueue_cmn_s
{
dq_queue_t waitfornotempty; /* 等待队列非空的任务链表(接收者阻塞在此)*/
dq_queue_t waitfornotfull; /* 等待队列非满的任务链表(发送者阻塞在此)*/
int16_t nwaitnotfull; /* 等待非满的任务数 */
int16_t nwaitnotempty; /* 等待非空的任务数 */
};

struct mqueue_inode_s
{
struct mqueue_cmn_s cmn; /* 公共头部(阻塞队列在此)*/
FAR struct inode *inode; /* 对应的 VFS inode */
struct list_node msglist; /* 按优先级降序的消息链表 */
int16_t maxmsgs; /* 队列最大消息数 */
int16_t nmsgs; /* 当前消息数 */
uint16_t maxmsgsize; /* 单条消息最大字节数 */
FAR struct pollfd *fds[CONFIG_FS_MQUEUE_NPOLLWAITERS]; /* poll 等待者 */
};

消息链表 msglist 始终按优先级降序排列(高优先级在前),mq_receive() 从链表头取即可得到最高优先级消息——O(1)取出。

3.2 从用户空间到内核空间:SVC 系统调用路径

在 KERNEL 或 PROTECTED 构建模式下,用户进程运行在非特权态。mq_openmq_sendmq_receive 等 POSIX API 由 tools/mksyscall.c 自动生成代理函数,通过 sys_call4 宏触发 svc 指令陷入内核。内核侧的 arm_syscall() 根据系统调用号查 g_stublookup[] 表,找到对应的 STUB 函数做类型转换后调用真正的内核实现。

完整调用链路(以 mq_send 为例):

1
2
3
4
5
用户进程 mq_send()
→ sys_call4(SYS_mq_send, ...) [用户态代理,mksyscall 自动生成]
→ "svc" 指令 [触发 SVC 异常,切换到特权态]
→ arm_syscall() → dispatch_syscall()→ g_stublookup[] → STUB_mq_send()
→ mq_send() [内核实现,sched/mqueue/mq_send.c]

下图展示了从用户态到内核态的完整桥接过程:

svc

3.3 mq_open():消息队列的创建与打开

mq_open 的核心工作是:将用户给定的名字(如 "/myqueue")映射到 VFS 中的一个 inode,该 inode 的类型为消息队列(FSNODEFLAG_TYPE_MQUEUE),其 i_private 指向一个 mqueue_inode_s 实例。

完整流程(文件:fs/mqueue/mq_open.c:160-344,函数 file_mq_vopen):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
static int file_mq_vopen(FAR struct file *mq, FAR const char *mq_name,
int oflags, mode_t umask, va_list ap, int *created)
{
FAR struct inode *inode;
FAR struct mqueue_inode_s *msgq;
struct inode_search_s desc;
char fullpath[MAX_MQUEUE_PATH];
mode_t mode = 0;
FAR struct mq_attr *attr = NULL;

/* Step 1:如果指定了 O_CREAT,从 va_list 中提取 mode 和 attr */
if ((oflags & O_CREAT) != 0) {
mode = va_arg(ap, mode_t);
attr = va_arg(ap, FAR struct mq_attr *);
}

/* Step 2:构造完整路径。所有消息队列统一在 CONFIG_FS_MQUEUE_VFS_PATH 下 */
while (*mq_name == '/') mq_name++; /* 跳过前导 '/' */
snprintf(fullpath, MAX_MQUEUE_PATH,
CONFIG_FS_MQUEUE_VFS_PATH "/%s", mq_name); /* 例如 /var/mqueue/myqueue */

/* Step 3:进入临界区——保证 inode 查找和创建是原子的 */
flags = enter_critical_section();

/* Step 4:在 VFS 中查找该路径 */
SETUP_SEARCH(&desc, fullpath, false);
ret = inode_find(&desc);

if (ret >= 0) {
/* 队列已存在:验证类型是 MQUEUE,检查 O_EXCL,建立 file 关联 */
inode = desc.node;
if (!INODE_IS_MQUEUE(inode)) return -ENXIO;
if ((oflags & (O_CREAT | O_EXCL)) == (O_CREAT | O_EXCL)) return -EEXIST;
mq->f_inode = inode;
mq->f_oflags = oflags;
} else {
/* 不存在 + O_CREAT:创建新 inode 和新 mqueue_inode_s */
if ((oflags & O_CREAT) == 0) return -ENOENT;

inode_reserve(fullpath, mode, &inode); /* 在 VFS 中预留 inode 节点 */
nxmq_alloc_msgq(attr, &msgq); /* 分配并初始化 mqueue_inode_s */

/* 绑定关系:file → inode → msgq */
mq->f_inode = inode;
mq->f_oflags = oflags;
INODE_SET_MQUEUE(inode); /* 标记 inode 类型为消息队列 */
inode->u.i_ops = &g_nxmq_fileops; /* 设置文件操作表 */
inode->i_private = msgq; /* 关键:msgq 挂在 inode 下 */
msgq->inode = inode; /* 反向引用 */
atomic_fetch_add(&inode->i_crefs, 1); /* 初始引用计数 = 1 */
}

leave_critical_section(flags);
return OK;
}

Step 6:分配 fdfile_mq_vopen 返回后,调用方 nxmq_vopenmq_open.c:346-374)通过 file_dup() 为这个 struct file 分配用户态文件描述符 fd,返回给用户。因此 mq_open 的返回值 mqd_t 本质上就是一个 fd(整数)。

小结mq_open 做了五件事:

  1. 拼接完整 VFS 路径(CONFIG_FS_MQUEUE_VFS_PATH "/" name
  2. 在 VFS 中查找或创建 inode
  3. 分配 mqueue_inode_s,初始化 maxmsgsmaxmsgsize 和阻塞队列
  4. msgq 绑定到 inode->i_private
  5. 返回 fd 给用户

3.4 消息发送:mq_send() 完整链路

一条消息从用户进程到达内核消息队列,经历系统调用(3.2 节)后的内核执行路径如下。

内核入口(文件:sched/mqueue/mq_send.c:686-746):

SYS_syscall 路径进来后,mq_send() 本身只做取消点标记和错误码包装:

1
2
3
4
5
6
7
8
int mq_send(mqd_t mqdes, FAR const char *msg, size_t msglen, unsigned int prio)
{
enter_cancellation_point();
ret = nxmq_send(mqdes, msg, msglen, prio); /* 委托给内核内部函数 */
if (ret < 0) { set_errno(-ret); ret = ERROR; }
leave_cancellation_point();
return ret;
}

nxmq_send() 通过 file_get(mqdes, &filep) 将 fd 转换为 struct file *,再调用核心函数 file_mq_timedsend_internal()

核心发送逻辑(文件:sched/mqueue/mq_send.c:274-370,函数 file_mq_timedsend_internal):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
static int file_mq_timedsend_internal(FAR struct file *mq,
FAR const char *msg, size_t msglen, unsigned int prio, ...)
{
msgq = mq->f_inode->i_private; /* Step 0:从 inode 取得 msgq */

mqmsg = nxmq_alloc_msg(msglen); /* Step 1:分配消息节点 */
memcpy(mqmsg->mail, msg, msglen); /* Step 2:复制数据到内核消息体 */
mqmsg->priority = prio;
mqmsg->msglen = msglen;

flags = enter_critical_section(); /* Step 3:关中断,保护链表 */

if (msgq->nmsgs >= msgq->maxmsgs) { /* Step 4:队列满了? */
if (O_NONBLOCK) { ret = -EAGAIN; goto out; } /* 非阻塞:立即返回 EAGAIN */
ret = nxmq_wait_send(msgq, abstime, ticks); /* 阻塞:挂到 waitfornotfull 等 */
}

nxmq_add_queue(msgq, mqmsg, prio); /* Step 5:按优先级插入消息链表 */

if (msgq->nmsgs++ == 0) /* Step 6:从空变非空 */
nxmq_pollnotify(msgq, POLLIN); /* 通知 poll() 等待者 */

nxmq_notify_send(msgq); /* Step 7:唤醒阻塞的接收者 */

out:
leave_critical_section(flags);
if (ret < 0) nxmq_free_msg(mqmsg); /* 失败则释放已分配的消息 */
return ret;
}

优先级插入(文件:sched/mqueue/mq_send.c:206-240nxmq_add_queue):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
static void nxmq_add_queue(FAR struct mqueue_inode_s *msgq,
FAR struct mqueue_msg_s *mqmsg, unsigned int prio)
{
FAR struct mqueue_msg_s *next;
FAR struct mqueue_msg_s *prev = NULL;

/* 遍历消息链表,找到第一个优先级低于新消息的位置 */
list_for_every_entry(&msgq->msglist, next, struct mqueue_msg_s, node) {
if (prio > next->priority) break; /* 找到了插入点 */
prev = next;
}

if (prev)
list_add_after(&prev->node, &mqmsg->node); /* 插入中间 */
else
list_add_head(&msgq->msglist, &mqmsg->node); /* 插入头部(最高优先级)*/
}

链表是按优先级降序排列的。所以 prio=200 的消息排在 prio=100 的前面,mq_receive() 从链表头取消息即可获得最高优先级消息——O(1)取出,开销仅发生在 mq_send 时的 O(n) 插入。

3.5 消息接收:mq_receive() 完整链路

内核入口(文件:sched/mqueue/mq_receive.c:525-599):

与发送对称,mq_receive() 委托给 nxmq_receive() -> file_mq_receive()

1
2
3
4
5
6
7
8
ssize_t nxmq_receive(mqd_t mqdes, FAR char *msg, size_t msglen,
FAR unsigned int *prio)
{
file_get(mqdes, &filep); /* fd → struct file * */
ret = file_mq_receive(filep, msg, msglen, prio);
file_put(filep);
return ret;
}

核心接收逻辑(文件:sched/mqueue/mq_receive.c:137-239file_mq_timedreceive_internal):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
static ssize_t file_mq_timedreceive_internal(FAR struct file *mq,
FAR char *msg, size_t msglen, FAR unsigned int *prio, ...)
{
msgq = mq->f_inode->i_private;

flags = enter_critical_section(); /* 关中断 */

/* Step 1:从链表头取出消息(最高优先级,O(1))*/
mqmsg = (FAR struct mqueue_msg_s *)list_remove_head(&msgq->msglist);

if (mqmsg == NULL) { /* Step 2:队列为空? */
if (O_NONBLOCK || up_interrupt_context()) /* 不能阻塞的场景 */
{ leave_critical_section(flags); return -EAGAIN; }
ret = nxmq_wait_receive(msgq, &mqmsg, ...); /* 阻塞:挂到 waitfornotempty 等 */
/* 被唤醒后 mqmsg 已由 notifier 放入 */
}

if (msgq->nmsgs-- == msgq->maxmsgs) /* Step 3:从满变非满 */
nxmq_pollnotify(msgq, POLLOUT); /* 通知 poll() 等待者 */

nxmq_notify_receive(msgq); /* Step 4:唤醒阻塞的发送者 */

leave_critical_section(flags);

/* Step 5:复制消息到用户缓冲区 */
if (prio) *prio = mqmsg->priority;
memcpy(msg, mqmsg->mail, mqmsg->msglen);
ret = mqmsg->msglen;

nxmq_free_msg(mqmsg); /* Step 6:释放消息节点 */
return ret;
}

发送与接收的对称性

阻塞条件 阻塞队列 任务状态 对端唤醒函数
发送方 nmsgs >= maxmsgs waitfornotfull TSTATE_WAIT_MQNOTFULL nxmq_notify_receive
接收方 nmsgs == 0 waitfornotempty TSTATE_WAIT_MQNOTEMPTY nxmq_notify_send

这是经典的生产者-消费者同步模式:生产者满则等,消费者取走唤醒之;消费者空则等,生产者放入唤醒之。

3.6 阻塞与唤醒机制详解

消息队列的阻塞不是简单的 sleep()——它涉及调度器、优先级排序、while 循环重试和超时看门狗的四方协作。

3.6.1 发送者阻塞(nxmq_wait_send

文件:sched/mqueue/mq_sndinternal.c:127-218

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
int nxmq_wait_send(FAR struct mqueue_inode_s *msgq,
FAR const struct timespec *abstime, clock_t ticks)
{
FAR struct tcb_s *rtcb = this_task();

/* Step A:设置超时看门狗(如果指定了超时)*/
if (abstime)
wd_start_realtime(&rtcb->waitdog, abstime, nxmq_sndtimeout, (wdparm_t)rtcb);

/* Step B:while 循环——因为醒来后队列可能又满了(竞态)*/
while (msgq->nmsgs >= msgq->maxmsgs)
{
rtcb->waitobj = msgq; /* 记录等待哪个队列 */
msgq->cmn.nwaitnotfull++; /* 等待计数 +1 */

nxsched_remove_self(rtcb); /* 从运行队列移除 */
rtcb->task_state = TSTATE_WAIT_MQNOTFULL; /* 标记阻塞状态 */
nxsched_add_prioritized(rtcb, MQ_WNFLIST(msgq->cmn)); /* 按优先级插入等待队列 */

up_switch_context(this_task(), rtcb); /* 触发上下文切换 */

/* ===== 在此阻塞,CPU 运行其他任务 ===== */

/* 被唤醒后回到此处。检查 errcode 判断唤醒原因 */
if (rtcb->errcode != OK) break; /* 超时(EAGAIN)或信号中断(EINTR) */
}

/* Step C:取消超时看门狗 */
if (abstime || ticks >= 0) wd_cancel(&rtcb->waitdog);
return -rtcb->errcode;
}

关键细节

  • nxsched_add_prioritized() 将 TCB 按优先级插入 waitfornotfull 链表——高优先级任务排前面,唤醒时先被取出。这与管道的 FIFO 唤醒不同。
  • while 循环绝对不能省:另一个更高优先级的等待者可能在当前任务重新运行前抢走了槽位,导致队列再次满了。
  • rtcb->waitobj = msgq 将当前任务与队列绑定——当任务被 mq_close 意外结束或信号杀死时,内核可通过 waitobj 将其从等待链表中移除。

下图展示了发送者阻塞与唤醒的完整时序——从 mq_send 发现队列已满开始,经调度器阻塞、接收者取走消息、notify 唤醒、到最后消息入队的全过程:

mq_send

3.6.2 接收者阻塞(nxmq_wait_receive

文件:sched/mqueue/mq_rcvinternal.c:127-207

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
int nxmq_wait_receive(FAR struct mqueue_inode_s *msgq,
FAR struct mqueue_msg_s **rcvmsg, ...)
{
FAR struct tcb_s *rtcb = this_task();

if (abstime)
wd_start_realtime(&rtcb->waitdog, abstime, nxmq_rcvtimeout, (wdparm_t)rtcb);

/* while 循环体内部也在尝试 list_remove_head()
* ——消息可能在进入 while 之前已被放入,直接返回,不阻塞 */
while ((newmsg = list_remove_head(&msgq->msglist)) == NULL)
{
msgq->cmn.nwaitnotempty++;
rtcb->waitobj = msgq;
nxsched_remove_self(rtcb);
rtcb->task_state = TSTATE_WAIT_MQNOTEMPTY; /* 不同的阻塞状态 */
nxsched_add_prioritized(rtcb, MQ_WNELIST(msgq->cmn)); /* 不同的等待队列 */

up_switch_context(this_task(), rtcb);

if (rtcb->errcode != OK) break;
}

if (abstime || ticks >= 0) wd_cancel(&rtcb->waitdog);
*rcvmsg = newmsg;
return -rtcb->errcode;
}

注意:与 nxmq_wait_send 不同,接收者 while 循环的内部也在尝试 list_remove_head()——消息可能恰好在进入循环的瞬间被放入队列,此时直接拿到消息返回,根本不会阻塞。这是无锁编程中的”乐观尝试”模式。

下图展示了接收者阻塞与唤醒的完整时序——从 mq_receive 发现队列为空开始,经调度器阻塞、发送者放入消息、notify 唤醒,到最终拿到消息的全过程:

mq_receive

3.6.3 唤醒阻塞者(nxmq_notify_send / nxmq_notify_receive

发送者放入消息后 → 唤醒接收者mq_sndinternal.c:239-305):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
void nxmq_notify_send(FAR struct mqueue_inode_s *msgq)
{
if (msgq->cmn.nwaitnotempty > 0)
{
/* dq_remfirst:从优先级等待队列取出队首(最高优先级)任务 */
FAR struct tcb_s *btcb = (FAR struct tcb_s *)dq_remfirst(MQ_WNELIST(msgq->cmn));

wd_cancel(&btcb->waitdog); /* 取消超时看门狗 */
msgq->cmn.nwaitnotempty--;
btcb->waitobj = NULL;

if (nxsched_add_readytorun(btcb)) /* 加入就绪队列 */
up_switch_context(this_task(), rtcb); /* 如果优先级更高,立即切换 */
}
}

接收者取走消息后 → 唤醒发送者mq_rcvinternal.c:231-272):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
void nxmq_notify_receive(FAR struct mqueue_inode_s *msgq)
{
if (msgq->cmn.nwaitnotfull > 0)
{
/* 从 waitfornotfull 等待队列取队首 */
FAR struct tcb_s *btcb = (FAR struct tcb_s *)dq_remfirst(MQ_WNFLIST(msgq->cmn));

wd_cancel(&btcb->waitdog);
msgq->cmn.nwaitnotfull--;
btcb->waitobj = NULL;

if (nxsched_add_readytorun(btcb))
up_switch_context(this_task(), rtcb);
}
}

两个 notify 函数的对称性

触发操作 调用函数 扫描的队列 被唤醒的任务状态 唤醒后的效果
mq_send 放入消息 nxmq_notify_send waitfornotempty TSTATE_WAIT_MQNOTEMPTY 接收者 while 循环重试,拿到消息
mq_receive 取走消息 nxmq_notify_receive waitfornotfull TSTATE_WAIT_MQNOTFULL 发送者 while 循环重试,队列有空位

3.6.4 超时机制

阻塞等待支持两种超时方式:

  1. 绝对时间mq_timedsend / mq_timedreceive):通过 wd_start_realtime() 设置截止时间。
  2. 相对 tick(内部接口):通过 wd_start() 设置 tick 数。

超时回调函数(mq_sndinternal.c:68-93):

1
2
3
4
5
6
7
8
9
10
11
static void nxmq_sndtimeout(wdparm_t arg)
{
FAR struct tcb_s *wtcb = (FAR struct tcb_s *)(uintptr_t)arg;

flags = enter_critical_section();

if (wtcb->task_state == TSTATE_WAIT_MQNOTFULL) /* 仍在等待中(未被正常唤醒)*/
nxmq_wait_irq(wtcb, ETIMEDOUT); /* 设置 errcode 并唤醒 */

leave_critical_section(flags);
}

nxmq_wait_irq() 将任务的 errcode 设为 ETIMEDOUT,从等待队列中移除,再加入就绪队列。任务被调度执行时,while 循环检查 errcode != OK,跳出循环并返回错误。注意回调中重新检查 task_state——任务可能已在超时触发的瞬间被正常唤醒。

3.7 VFS 集成

消息队列在 VFS 中以 inode 形式存在,路径形如 /var/mqueue/myqueue,类型为 FSNODEFLAG_TYPE_MQUEUE。关联的文件操作表(fs/mqueue/mq_open.c:58-69):

1
2
3
4
5
6
7
8
9
10
11
static const struct file_operations g_nxmq_fileops = {
NULL, /* open — 无需额外 open 操作 */
nxmq_file_close, /* close — 引用计数归零时释放 msgq */
NULL, /* read — 不支持 read(),须用 mq_receive() */
NULL, /* write — 不支持 write(),须用 mq_send() */
NULL, /* seek — 不支持 */
NULL, /* ioctl — 不支持 */
NULL, /* mmap — 不支持 */
NULL, /* truncate — 不支持 */
nxmq_file_poll /* poll — 支持 poll/select */
};

注意 readwrite 操作为 NULL——消息队列必须通过 mq_send() / mq_receive() 专用 API 操作,不能用 read() / write() 系统调用。VFS 集成的真正意义是:支持 poll() 的统一事件通知(POLLIN / POLLOUT)、引用计数管理(close / unlink),以及与其他文件系统类型统一的路径管理。

3.8 对比:消息队列 vs 信号量 vs 管道阻塞机制

特性 消息队列 信号量 管道
阻塞队列类型 按优先级排序的 dq_queue_t 按优先级排序的 dq_queue_t 计数信号量 sem_t
阻塞状态 TSTATE_WAIT_MQNOTEMPTY/FULL TSTATE_WAIT_SEM 信号量内部状态
唤醒顺序 按优先级(高优先级先醒) 按优先级 FIFO(先等先醒)
超时支持 是(watchdog timer) 是(watchdog timer) 是(信号量超时)

消息队列和信号量的阻塞机制都属于”调度器级优先级等待队列”——阻塞时将 TCB 按优先级插入到 dq_queue_t 中,唤醒时 dq_remfirst() 取出最高优先级者。这保证了实时系统中紧急任务能优先获得消息。

消息队列解决了”结构化消息传递”的问题。如果需要更高性能的大数据共享(避免拷贝),则需要共享内存。


4. 深入:共享内存内核实现(KERNEL 模式)

其他 IPC 都需要数据拷贝(用户→内核→用户,至少两次)。共享内存通过将同一物理页映射到多个进程的虚拟地址空间,实现零拷贝——进程 A 写入的数据,进程 B 立即可见,无需内核介入。

代价是:需要额外的同步机制(信号量/互斥锁)来协调读写时序。

下图展示了两个进程通过共享内存共享同一物理页的完整流程:

sm

4.1 核心数据结构

文件:mm/shm/shm.h:83-93

1
2
3
4
5
6
7
8
struct shm_region_s
{
struct shmid_ds sr_ds; /* POSIX shmid_ds(大小、权限、attach 计数)*/
bool sr_flags; /* SRFLAG_INUSE / SRFLAG_UNLINKED */
key_t sr_key; /* 查找键 */
mutex_t sr_lock; /* 保护本区域的互斥锁 */
uintptr_t sr_pages[CONFIG_ARCH_SHM_NPAGES]; /* 物理页地址数组 */
};

全局区域表:

1
2
3
4
5
6
7
8
struct shm_info_s
{
mutex_t si_lock;
struct shm_region_s si_region[CONFIG_ARCH_SHM_MAXREGIONS];
};

/* 单例实例 */
struct shm_info_s g_shminfo;

关键字段 sr_pages[]:存储该共享内存区域所有物理页的地址。多个进程通过各自的页表映射到这些相同的物理页——这就是”共享”的本质。

4.2 shmget():创建/获取共享内存

文件:mm/shm/shmget.c:364-482

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
int shmget(key_t key, size_t size, int shmflg)
{
/* Step 1: 在全局表中查找 key */
regionid = shm_find(key); /* 线性扫描 si_region[] */

if (regionid < 0 && (shmflg & IPC_CREAT))
{
/* Step 2: 找一个空闲 slot */
regionid = shm_reserve();

/* Step 3: 分配物理页 */
shm_extend(regionid, size);
/* → 循环调用 mm_pgalloc(1),每次分配一个 4KB 物理页
* 存入 sr_pages[] 数组 */
}

return regionid; /* 返回 shmid(就是数组索引)*/
}

4.3 shmat():映射到进程地址空间

文件:mm/shm/shmat.c:208-308

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
FAR void *shmat(int shmid, FAR const void *shmaddr, int shmflg)
{
FAR struct shm_region_s *region = &g_shminfo.si_region[shmid];

/* Step 1: 在进程虚拟地址空间中找一块空闲 VA 区域 */
vaddr = vm_alloc_region(group_mm, NULL, region->sr_ds.shm_segsz);

/* Step 2: 将物理页映射到该 VA(写 L1/L2 页表)*/
up_shmat(region->sr_pages, npages, (uintptr_t)vaddr);

/* Step 3: 注册映射关系(用于 shmdt 时查找)*/
mm_map_add(group_mm, &entry);

/* Step 4: 更新元数据 */
region->sr_ds.shm_nattch++;

return (FAR void *)vaddr;
}

4.4 up_shmat():ARMv7-A MMU 页表操作

文件:arch/arm/src/armv7-a/arm_addrenv_shm.c:69-156

这是共享内存的硬件层核心——将物理页写入调用进程的 L2 页表:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
int up_shmat(FAR uintptr_t *pages, unsigned int npages, uintptr_t vaddr)
{
for (i = 0; i < npages; i++)
{
/* 确保该 VA 对应的 L2 页表存在(不存在则分配)*/
l2table = get_pgtable(addrenv, vaddr);

/* 计算 L2 索引 */
index = (vaddr & 0x000ff000) >> 12;

/* 写入 L2 PTE:物理页地址 + 用户数据权限 */
l2table[index] = pages[i] | MMU_L2_UDATAFLAGS;

/* 刷新 D-Cache */
up_flush_dcache(&l2table[index], sizeof(uint32_t));

vaddr += MM_PGSIZE;
}
}

跨进程共享的本质

1
2
3
4
5
6
7
8
Process A L2 page table:            Process B L2 page table:
+---------------------------------+ +---------------------------------+
| VA 0x80400000 -> PA 0x40350000 | | VA 0x80500000 -> PA 0x40350000 | <- same phys page!
| VA 0x80401000 -> PA 0x40351000 | | VA 0x80501000 -> PA 0x40351000 | <- same phys page!
+---------------------------------+ +---------------------------------+

Process A writes to VA 0x80400000 -> actually writes PA 0x40350000
Process B reads from VA 0x80500000 -> actually reads PA 0x40350000 -> sees A's data!

虚拟地址可以不同(每个进程的 VA 由 vm_alloc_region() 独立分配),但物理页相同——这就是零拷贝共享的原理。

虚拟地址可以不同(每个进程的 VA 由 vm_alloc_region() 独立分配),但物理页相同——这就是零拷贝共享的原理。

4.5 shmdt():解除映射

文件:mm/shm/shmdt.c:67-113

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
int shmdt(FAR const void *shmaddr)
{
/* 查找映射记录 */
entry = mm_map_find(group_mm, shmaddr);

/* 从 L2 页表中清除 PTE(不释放物理页!)*/
up_shmdt((uintptr_t)shmaddr, npages);

/* 从映射列表中移除 */
mm_map_remove(group_mm, entry);

/* 减少 attach 计数 */
region->sr_ds.shm_nattch--;

/* 如果标记为删除且无 attach,释放物理页 */
if (nattch == 0 && (sr_flags & SRFLAG_UNLINKED))
shm_destroy(shmid);
}

注意up_shmdt() 清除 PTE 但不释放物理页——物理页由 shm_destroy() 在最后一个 detach 且标记删除后统一释放。这确保了其他进程的映射不会失效。

4.6 POSIX shm_open 风格(通过 VFS)

除了 System V shmget/shmat,NuttX 也支持 POSIX shm_open() + mmap() 风格。它在 VFS 中以 /dev/shm/name 路径注册 inode,通过 shmfs 文件系统(fs/shm/shmfs.c)提供 read/write/mmap/truncate 操作。mmap() 内部调用相同的 MMU 映射逻辑。

共享内存实现了零拷贝高性能共享。如果只需简单的流式字节传输,管道是更轻量的选择。


5. 深入:管道内核实现

管道是最简单的 IPC——一端写入字节流,另一端读取。没有消息边界、没有优先级、没有MMU操作。核心就是一个环形缓冲区 + 两个信号量。开销极低,适合大量小数据的顺序传递(如 shell 管道 ls | grep)。

5.1 核心数据结构

文件:drivers/pipes/pipe_common.h:118-140

1
2
3
4
5
6
7
8
9
10
struct pipe_dev_s
{
rmutex_t d_bflock; /* 缓冲区互斥锁 */
sem_t d_rdsem; /* 读阻塞信号量(缓冲区空时读者等待)*/
sem_t d_wrsem; /* 写阻塞信号量(缓冲区满时写者等待)*/
pipe_ndx_t d_bufsize; /* 缓冲区大小 */
uint8_t d_nwriters; /* 写端引用计数 */
uint8_t d_nreaders; /* 读端引用计数 */
struct circbuf_s d_buffer; /* 环形缓冲区 */
};

文件:include/nuttx/circbuf.h:52-59

1
2
3
4
5
6
7
struct circbuf_s
{
FAR void *base; /* 缓冲区基地址 */
size_t size; /* 缓冲区总大小 */
size_t head; /* 写指针 */
size_t tail; /* 读指针 */
};

环形缓冲区的可用空间 = size - (head - tail) - 1(保留 1 字节区分满/空)。

5.2 pipe() 创建流程

文件:drivers/pipes/pipe.c:258-311

1
2
3
4
5
6
7
8
9
10
11
12
13
int pipe2(int pipefd[2], int flags)
{
/* Step 1: 在 VFS 注册一个临时设备节点(如 /dev/pipe/0)*/
pipe_register(CONFIG_DEV_PIPE_SIZE, flags, devname);

/* Step 2: 打开写端和读端 */
pipefd[1] = open(devname, O_WRONLY);
pipefd[0] = open(devname, O_RDONLY);

/* Step 3: 从 VFS 注销路径(匿名管道,打开后路径消失)*/
unregister_pipedriver(devname);
/* fd 仍然有效,因为 inode 的引用计数 > 0 */
}

FIFO 的区别mkfifo() 注册的设备节点不会被注销——它持久存在于 VFS 中,任何进程都可以通过路径名打开。

5.3 写入:pipecommon_write()

文件:drivers/pipes/pipe_common.c:506-656(关键逻辑摘录)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
static ssize_t pipecommon_write(FAR struct file *filep,
FAR const char *buffer, size_t len)
{
FAR struct pipe_dev_s *dev = filep->f_inode->i_private;
ssize_t nwritten = 0;

nxrmutex_lock(&dev->d_bflock);

while (nwritten < len)
{
/* 如果没有读者,返回 EPIPE(broken pipe)*/
if (dev->d_nreaders == 0) { ret = -EPIPE; break; }

/* 尝试写入环形缓冲区 */
nwrite = circbuf_write(&dev->d_buffer, buffer + nwritten, len - nwritten);

if (nwrite > 0)
{
nwritten += nwrite;
/* 唤醒等待的读者 */
pipecommon_wakeup(&dev->d_rdsem);
}

if (nwritten < len) /* 缓冲区满,需要等待 */
{
if (filep->f_oflags & O_NONBLOCK)
break; /* 非阻塞:返回已写字节数 */

/* 解锁缓冲区,在写信号量上等待 */
nxrmutex_unlock(&dev->d_bflock);
nxsem_wait(&dev->d_wrsem); /* 阻塞直到读者取走数据 */
nxrmutex_lock(&dev->d_bflock);
}
}

nxrmutex_unlock(&dev->d_bflock);
return nwritten;
}

5.4 读取:pipecommon_read()

文件:drivers/pipes/pipe_common.c:415-500(关键逻辑摘录)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
static ssize_t pipecommon_read(FAR struct file *filep,
FAR char *buffer, size_t len)
{
FAR struct pipe_dev_s *dev = filep->f_inode->i_private;

nxrmutex_lock(&dev->d_bflock);

while (circbuf_is_empty(&dev->d_buffer))
{
/* 缓冲区空 + 无写者 = EOF */
if (dev->d_nwriters == 0) { ret = 0; goto out; }

/* 非阻塞模式 */
if (filep->f_oflags & O_NONBLOCK) { ret = -EAGAIN; goto out; }

/* 解锁,等待写者写入数据 */
nxrmutex_unlock(&dev->d_bflock);
nxsem_wait(&dev->d_rdsem);
nxrmutex_lock(&dev->d_bflock);
}

/* 从环形缓冲区读取 */
nread = circbuf_read(&dev->d_buffer, buffer, len);

/* 唤醒等待空间的写者 */
pipecommon_wakeup(&dev->d_wrsem);

nxrmutex_unlock(&dev->d_bflock);
return nread;
}

5.5 阻塞机制

管道使用两个计数信号量(不是按优先级排序的等待队列):

信号量 阻塞条件 唤醒时机
d_rdsem 缓冲区空,读者等待 写者写入数据后 post
d_wrsem 缓冲区满,写者等待 读者取走数据后 post

与消息队列阻塞的区别:消息队列用调度器级别的优先级等待队列(高优先级任务先被唤醒),管道用简单的计数信号量(FIFO 唤醒顺序)。

5.6 关闭语义

  • 最后一个写端关闭:读者收到 EOF(read() 返回 0)+ POLLHUP
  • 最后一个读端关闭:写者收到 -EPIPE + POLLERR + 可能收到 SIGPIPE

6. 选择指南:何时用哪种 IPC

场景 推荐 IPC 理由
保护临界区 mutex / semaphore 最低开销,无数据传输
小消息传递(< 1KB) mqueue 带优先级、带边界、阻塞语义完善
大数据共享(缓冲区/帧) shared memory + mutex 零拷贝,性能最高
流式字节传输 pipe / FIFO 简单、符合 Unix 哲学
跨进程通知(无数据) signal 异步、不需要接收方主动等待
生产者/消费者队列 mqueue 或 pipe mqueue 有优先级;pipe 更简单

7. 对比分析

特性 Semaphore Mqueue Shared Memory Pipe
数据传输 有(结构化消息) 有(任意内存) 有(字节流)
拷贝次数 0 2(用户→内核→用户) 0(零拷贝) 2(用户→内核→用户)
优先级排序 有(消息按 prio 排)
消息边界
跨进程可用 命名信号量可 是(仅 KERNEL 模式) FIFO 可
VFS 集成 /sem/name /var/mqueue/name /dev/shm/name /dev/pipe/N
阻塞机制 调度器优先级队列 调度器优先级队列 无(需用户同步) 计数信号量
KERNEL 模式要求 必须

8. 关键要点

  1. 消息队列按优先级排序——mq_send() 将消息插入到有序链表中正确位置,mq_receive() 总是取出链表头(最高优先级)。

  2. 共享内存是唯一的零拷贝 IPC——通过将同一物理页映射到多个进程的 L2 页表实现,仅在 KERNEL 模式(有 MMU)下可用。

  3. 管道用环形缓冲区 + 两个信号量实现读写阻塞——缓冲区空时读者在 d_rdsem 等待,满时写者在 d_wrsem 等待。

  4. 消息队列的阻塞使用调度器级优先级等待队列TSTATE_WAIT_MQNOTEMPTY/FULL),高优先级任务优先被唤醒;管道用简单信号量(FIFO 顺序)。

  5. 共享内存的 shmat() 不分配物理页——物理页在 shmget() 时已分配。shmat() 只是将已有物理页映射到调用进程的页表中。

  6. 匿名管道在 pipe() 后立即从 VFS 注销路径——只有持有 fd 的进程能访问,体现了”匿名”语义。FIFO 的路径持久存在。

  7. KERNEL 模式下所有 IPC 调用都走 SVC 系统调用路径——sys_call4 宏触发 svc 指令,arm_syscall()dispatch_syscall()g_stublookup[] → STUB → 内核实现,形成完整的用户态到内核态的桥接。

  8. 所有 IPC 的阻塞等待都支持超时——mq_timedreceive()sem_timedwait() 通过 watchdog timer 实现。


9. 参考文件索引

文件路径 关键内容 引用行号
include/nuttx/mqueue.h mqueue_inode_s, mqueue_cmn_s 103-131
sched/mqueue/mqueue.h mqueue_msg_s, 全局消息池 68-105
sched/mqueue/mq_send.c mq_send(), nxmq_add_queue() 206-369
sched/mqueue/mq_receive.c mq_receive(), file_mq_timedreceive_internal() 137-239
sched/mqueue/mq_sndinternal.c nxmq_wait_send(), nxmq_notify_send() 127-306
sched/mqueue/mq_rcvinternal.c nxmq_wait_receive(), nxmq_notify_receive() 127-273
fs/mqueue/mq_open.c mq_open() VFS 集成, file_mq_vopen() 160-520
arch/arm/include/syscall.h sys_call4 宏(SVC 内联汇编) 306-323
arch/arm/src/armv7-a/arm_syscall.c arm_syscall(), dispatch_syscall() 123-556
syscall/syscall_stublookup.c g_stublookup[] 存根表 88-95
syscall/syscall.csv 系统调用签名定义(mksyscall 输入) 74-76
mm/shm/shm.h shm_region_s, shm_info_s 83-103
mm/shm/shmget.c shmget(), shm_extend(), g_shminfo 47-482
mm/shm/shmat.c shmat(), munmap_shm() 208-308
mm/shm/shmdt.c shmdt() 67-113
mm/shm/shmctl.c shmctl(), shm_destroy() 108-248
arch/arm/src/armv7-a/arm_addrenv_shm.c up_shmat(), up_shmdt() (MMU 操作) 69-242
fs/shm/shmfs.c shmfs 文件操作(read/write/mmap) 65-368
drivers/pipes/pipe_common.h pipe_dev_s, circbuf_s 118-140
drivers/pipes/pipe_common.c pipecommon_read/write/open/close 134-952
drivers/pipes/pipe.c pipe2(), pipe_register() 60-311
drivers/pipes/fifo.c nx_mkfifo() 100-120
include/nuttx/circbuf.h circbuf_s, circbuf_read/write 52-320

外部参考文档:

文档 说明
IEEE Std 1003.1 (POSIX.1-2017) mqueue, shm, semaphore, pipe 标准接口定义
NuttX Documentation: https://nuttx.apache.org/docs/latest/ 官方 API 参考和配置指南
ARM Architecture Reference Manual (ARMv7-A/R) L2 页表映射(共享内存 MMU 操作)