Kernel-14-kfifo

做内核驱动开发的时候,从 Linux收包的时候用到了 kfifo 这种结构接收报文进行报文分析,这里学习一下

kfifo 其实是一个无锁环形队列,主要用来解决进出速度不匹配的情况。当生产速度太快或多生产者的情况,消费者可能处理不过来可能导致服务崩溃

基础结构

源码位置:linux/include/linux/kfifo.h

首先是 kfifo 数据结构

1
2
3
4
5
6
7
struct __kfifo {
unsigned int in; //进队索引
unsigned int out; //出队索引
unsigned int mask; //大小掩码
unsigned int esize; //单个元素大小
void *data; //队列缓存指针
};

上面就是一个普通的环形队列结构,但是重点却是下面这个数据结构

1
2
3
4
5
6
7
8
9
#define __STRUCT_KFIFO_COMMON(datatype, recsize, ptrtype) \
union { \
struct __kfifo kfifo; \
datatype *type; \
const datatype *const_type; \
char (*rectype)[recsize]; \
ptrtype *ptr; \
ptrtype const *ptr_const; \
}

定义了一个 union 允许不同的数据类型共享相同的内存位置。在编译的时候会分配一个最大的结构体内存空间也就是 _kfifo,其他字段的使用会覆盖已有的内容

rectype 表示一个指向字符数组的指针类型

提供了一种灵活的方式来定义环形缓冲区,允许开发者根据需要选择不同的数据类型和指针类型,根据它自带例子 record-example.c 来看看它两种使用方式

动态分配

动态分配的流程步骤

第一步,定义一个 fifo 指针对象

1
2
3
4
5
6
7
8
9
10
//1. 定义一个 fifo 对象
struct kfifo_rec_ptr_1 test;

struct kfifo_rec_ptr_1 __STRUCT_KFIFO_PTR(unsigned char, 1, void);
struct kfifo_rec_ptr_2 __STRUCT_KFIFO_PTR(unsigned char, 2, void);
#define __STRUCT_KFIFO_PTR(type, recsize, ptrtype) \
{ \
__STRUCT_KFIFO_COMMON(type, recsize, ptrtype); \
type buf[0]; \
}

kfifo_rec_ptr_1 完全展开的到的是

1
2
3
4
5
6
7
8
9
10
11
12
13
struct kfifo_rec_ptr_1
{
union
{
struct __kfifo kfifo;
unsigned char *type;
const unsigned char *const_type;
char (*rectype)[1];
void *ptr;
void const *ptr_const;
};
unsigned char buf[0];
};

第二步,为缓冲队列指针分配指定大小内存

1
2
3
4
//2. 初始化 fifo 内存
#define FIFO_SIZE 128

kfifo_alloc(&test, FIFO_SIZE, GFP_KERNEL); //GFP_KERNEL 为内核分配内存的标志,用来指定分配内存时的行为和特征

这里用到了第二个比较特别的宏定义,将其展开得到:

1
2
3
4
5
6
7
#define kfifo_alloc(fifo, size, gfp_mask)                                                                   \
__kfifo_int_must_check_helper( \
({ \
typeof((fifo) + 1) __tmp = (fifo); \
struct __kfifo *__kfifo = &__tmp->kfifo; \
__is_kfifo_ptr(__tmp) ? __kfifo_alloc(__kfifo, size, sizeof(*__tmp->type), gfp_mask) : -EINVAL; \
}))
  • typeof((fifo) + 1) __tmp = (fifo);:使用 typeof 运算符获取 fifo 指针的类型,并创建一个名为 __tmp 的新变量,初始化为 fifo 的值。(fifo) + 1 用于获取 fifo 的类型,而不改变 fifo 的值
  • struct __kfifo *__kfifo = &__tmp->kfifo;:一个指向 __kfifo 结构的指针,将其初始化为 __tmpkfifo 成员的地址
  • __is_kfifo_ptr(__tmp) ?:条件运算符,函数用于检查 __tmp 是否是环形缓冲区指针的有效实例,如果是环形缓冲区有效实例,则分配一个环形缓冲区 __kfifo_alloc

为什么使用 (fifo) + 1 而不是直接使用 fifo 呢?

typeof((a) + 1) 用来保证传递进来的变量 a 为指针,若部位指针则引发编译错误,而不是等运行时才爆发错误

1
2
3
4
5
6
7
8
9
10
11
12
typedef struct demo_t
{
int a;
int b;
} demo_t;

demo_t demo;

typeof((&demo) + 1) b; // 等效 demo_t* b
typeof((&demo)) b; // 等效 demo_t* b
typeof((demo) + 1) b; // 报错,结构体变量不能 + 1
typeof((demo)) b; // 不报错,等效 demo_t b

由于 + 1 不会改变 fifo 的实际值,它只是用于类型推断,所以它是安全的

回到分配上,核心逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
int __kfifo_alloc(struct __kfifo *fifo, unsigned int size,
size_t esize, gfp_t gfp_mask)
{
/*
* round up to the next power of 2, since our 'let the indices
* wrap' technique works only in this case.
*/
size = roundup_pow_of_two(size);

fifo->in = 0;
fifo->out = 0;
fifo->esize = esize;

if (size < 2) {
fifo->data = NULL;
fifo->mask = 0;
return -EINVAL;
}

fifo->data = kmalloc_array(esize, size, gfp_mask);

if (!fifo->data) {
fifo->mask = 0;
return -ENOMEM;
}
fifo->mask = size - 1;

return 0;
}

静态分配

首先声明一个缓冲队列对象

1
2
3
4
5
6
7
8
9
10
11
12
#define STRUCT_KFIFO_REC_1(size) \
struct __STRUCT_KFIFO(unsigned char, size, 1, void)

#define __STRUCT_KFIFO(type, size, recsize, ptrtype) \
{ \
__STRUCT_KFIFO_COMMON(type, recsize, ptrtype); \
type buf[((size < 2) || (size & (size - 1))) ? -1 : size]; \
}

typedef STRUCT_KFIFO_REC_1(FIFO_SIZE) mytest;

static mytest test;

静态分配与动态分配不一样的地方声明了一个 unsigned char 类型的 buf数组,数组大小为 ((size < 2) || (size & (size - 1))) ? -1 : size

这里有两点需要注意:

  1. FIFO_SIZE 默认是128,如果指定为的1,那么将编译失败
  2. 为什么要使用 size & (size - 1) 来判断大小是否是 2 的 n 次幂
    • 直接使用 in/out 进行&运算,避免进行取余计算
    • 既然是 2 的 n 次幂,那么使用的时候需要注意,比如申请 大小为 100,实际其实是 128 字节,每次出对入队都是 10 字节,那么可能出现最后一次只保存 10 字节的情况!!!!

所以在进出队列的时候需要对比进入队列大小!!!!

1
2
3
4
5
6
7
8
if(kfifo_in(&test, &hello, sizeof(hello)) != sizeof(hello))
{
//...
}

if(kfifo_out(&test, buf, sizeof(buf) != sizeof(buf)){
//...
}

静态分配会简单简单一些

1
2
3
4
5
6
7
8
9
10
11
12
INIT_KFIFO(test);

#define INIT_KFIFO(fifo) \
(void)({ \
typeof(&(fifo)) __tmp = &(fifo); \
struct __kfifo *__kfifo = &__tmp->kfifo; \
__kfifo->in = 0; \
__kfifo->out = 0; \
__kfifo->mask = __is_kfifo_ptr(__tmp) ? 0 : ARRAY_SIZE(__tmp->buf) - 1;\
__kfifo->esize = sizeof(*__tmp->buf); \
__kfifo->data = __is_kfifo_ptr(__tmp) ? NULL : __tmp->buf; \
})

自旋锁

在 API 接口中还提供了使用自旋锁保护的 kfifo 保护的接口

1
2
kfifo_in_spinlocked
kfifo_out_spinlocked

为什么要使用自旋锁保护的接口,他这里有一段解释

1
2
3
4
5
6
7
8
9
/*
* Note about locking: There is no locking required until only one reader
* and one writer is using the fifo and no kfifo_reset() will be called.
* kfifo_reset_out() can be safely used, until it will be only called
* in the reader thread.
* For multiple writer and one reader there is only a need to lock the writer.
* And vice versa for only one writer and multiple reader there is only a need
* to lock the reader.
*/

根据读写者的数量来判断是否需要使用自旋接口,它的实现如下

1
2
3
4
5
6
7
8
9
#define	kfifo_in_spinlocked(fifo, buf, n, lock) \
({ \
unsigned long __flags; \
unsigned int __ret; \
spin_lock_irqsave(lock, __flags); \
__ret = kfifo_in(fifo, buf, n); \
spin_unlock_irqrestore(lock, __flags); \
__ret; \
})

spin_lock_irqsave 自旋锁且禁用本地中断,自旋锁能保证在多 CPU 系统中,一次只有一个 CPU 可以执行临界区代码。而禁用中断可以防止临界区代码执行期间发生上下文切换

其他 API

1
2
3
4
5
6
7
kfifo_is_empty(fifo)	//是否为空
kfifo_len(fifo) //当前元素数量
kfifo_reset(fifo) //重置索引位置
kfifo_size(fifo) //队列大小
kfifo_initialized(fifo) //检查是否初始化,返回 mask 的值
kfifo_in(fifo, buf, n) //进队
kfifo_out(fifo, buf, n) //出对

总结

  1. 声明的大小为 大于 size的最小的 2 的幂等数,所以在进出队列的时候需要判断返回值,防止截断
  2. 根据读写者数量来判断是否使用 spinlock 接口

参考文档

  1. https://zhuanlan.zhihu.com/p/500070147
  2. https://blog.csdn.net/fangye945a/article/details/86617551
  3. https://blog.csdn.net/qq_30027083/article/details/120697813