hloop
hloop 的作用是对并发连接进行管理和调度,采用了单线程事件驱动 + 非阻塞IO的模型。
hloop 不是通过多线程处理并发,而是通过IO多路复用(epoll/select) 在单个线程中同时监听多个连接。
先看一下 hloop_s 数据结构
struct hloop_s {
// IO对象数组:用fd作为索引
struct io_array ios; // ios.ptr[fd] = hio_t*
uint32_t nios; // 当前IO对象数量
// 待处理事件队列
hevent_t* pendings[11]; // 按优先级分组
uint32_t npendings;
// IO观察器(epoll实例)
void* iowatcher; // epoll_fd
};
hloop 中是怎么对多个连接进行存储的?答曰:采用基于 fd 索引的数组。可以看一下是怎么分配IO对象并存入数组的:
hio_t* hio_get(hloop_t* loop, int fd) {
hio_t* io = __hio_get(loop, fd); // 动态扩容数组
if (io == NULL) {
// 1. 分配IO对象
HV_ALLOC_SIZEOF(io);
hio_init(io);
// 2. 存储到数组中,fd作为索引
io->loop = loop;
io->fd = fd;
loop->ios.ptr[fd] = io; // ← 直接用fd索引
}
return io;
}
点击查看 __hio_get 动态扩容实现
static inline hio_t* __hio_get(hloop_t* loop, int fd) {
// 动态扩容数组
if (fd >= loop->ios.maxsize) {
int newsize = ceil2e(fd); // 向上取2的幂
newsize = MAX(newsize, IO_ARRAY_INIT_SIZE); // 最小1024
io_array_resize(&loop->ios, newsize > fd ? newsize : 2*fd);
}
return loop->ios.ptr[fd];
}
hloop 中是怎么对连接进行调度的?答曰:采用 epoll 方式,首先调用epoll_wait阻塞等待有事件变化,然后遍历就绪事件处理,设置就绪标志,加入待等待队列中。
// iowatcher.c
int iowatcher_poll_events(hloop_t* loop, int timeout) {
// 1. 调用epoll_wait阻塞等待
int nevents = epoll_wait(
loop->iowatcher->epfd, // epoll实例
loop->iowatcher->events, // 就绪事件数组
loop->iowatcher->maxevents, // 最大事件数
timeout // 超时时间(ms)
);
// 2. 遍历就绪事件
for (int i = 0; i < nevents; i++) {
int fd = events[i].data.fd;
hio_t* io = loop->ios.ptr[fd]; // O(1)查找
// 3. 设置就绪标志
if (events[i].events & EPOLLIN) {
io->revents |= HV_READ; // 可读
}
if (events[i].events & EPOLLOUT) {
io->revents |= HV_WRITE; // 可写
}
// 4. 加入待处理队列
EVENT_PENDING(io);
}
return nevents;
}
事件调度谁先谁后?优先级规则?答曰:设置一个优先级队列,设置不同事件的优先级,从高优先级到低优先级处理。
static int hloop_process_pendings(hloop_t* loop) {
hevent_t* cur = NULL;
hevent_t* next = NULL;
int ncbs = 0;
// 从高优先级到低优先级处理
for (int i = HEVENT_PRIORITY_SIZE-1; i >= 0; --i) {
cur = loop->pendings[i];
while (cur) {
next = cur->pending_next;
if (cur->pending && cur->loop == loop) {
if (cur->active && cur->cb) {
cur->cb(cur); // ← 调用回调
++ncbs;
}
cur->pending = 0;
if (cur->destroy) {
EVENT_DEL(cur);
}
}
cur = next;
}
loop->pendings[i] = NULL;
}
loop->npendings = 0;
return ncbs;
}
优先级定义
#define HEVENT_LOWEST_PRIORITY (-5) // 空闲事件
#define HEVENT_LOW_PRIORITY (-3)
#define HEVENT_NORMAL_PRIORITY 0 // 普通IO事件
#define HEVENT_HIGH_PRIORITY 3 // 监听socket
#define HEVENT_HIGHEST_PRIORITY 5 // 定时器、信号
#define HEVENT_PRIORITY_SIZE 11
#define HEVENT_PRIORITY_INDEX(priority) (priority - HEVENT_LOWEST_PRIORITY)
优先级队列结构
loop->pendings[0] → 空闲事件 (优先级 -5)
loop->pendings[1] →
loop->pendings[2] → 低优先级 (优先级 -3)
loop->pendings[3] →
loop->pendings[4] →
loop->pendings[5] → 普通IO事件 (优先级 0) ← 客户端连接
loop->pendings[6] →
loop->pendings[7] →
loop->pendings[8] → 高优先级 (优先级 3) ← 监听socket
loop->pendings[9] →
loop->pendings[10] → 最高优先级 (优先级 5) ← 定时器
完整的事件处理流程
主循环,在while循环中,一直处理事件
int hloop_run(hloop_t* loop) {
while (loop->status != HLOOP_STATUS_STOP) {
++loop->loop_cnt;
// 处理事件
hloop_process_events(loop, HLOOP_MAX_BLOCK_TIME);
}
return 0;
}
事件处理的步骤
int hloop_process_events(hloop_t* loop, int timeout_ms) {
// ===== 步骤1: 计算阻塞时间 =====
int32_t blocktime_ms = timeout_ms; // 默认100ms
if (loop->ntimers) {
// 如果有定时器,计算最近到期时间
int64_t min_timeout = TIMER_ENTRY(loop->timers.root)->next_timeout
- loop->cur_hrtime;
blocktime_ms = MIN(blocktime_ms, min_timeout / 1000 + 1);
}
// ===== 步骤2: IO多路复用 (阻塞等待) =====
if (loop->nios) {
nios = hloop_process_ios(loop, blocktime_ms);
// 内部调用 epoll_wait:
// - 如果有IO就绪,立即返回
// - 如果无IO就绪,最多阻塞 blocktime_ms
// - 其他线程调用 hloop_wakeup 可提前唤醒
} else {
hv_msleep(blocktime_ms); // 无IO,直接sleep
}
hloop_update_time(loop); // 更新时间戳
// ===== 步骤3: 处理定时器 =====
if (loop->ntimers) {
ntimers = hloop_process_timers(loop);
}
// ===== 步骤4: 处理空闲事件 =====
if (loop->npendings == 0 && loop->nidles) {
nidles = hloop_process_idles(loop);
}
// ===== 步骤5: 分发待处理事件 (调用回调) =====
int ncbs = hloop_process_pendings(loop);
return ncbs;
}