hloop

hloop

hloop 的作用是对并发连接进行管理和调度,采用了单线程事件驱动 + 非阻塞IO的模型。

hloop 不是通过多线程处理并发,而是通过IO多路复用(epoll/select) 在单个线程中同时监听多个连接。

先看一下 hloop_s 数据结构

struct hloop_s {
    // IO对象数组:用fd作为索引
    struct io_array ios;        // ios.ptr[fd] = hio_t*
    uint32_t nios;              // 当前IO对象数量
    
    // 待处理事件队列
    hevent_t* pendings[11];     // 按优先级分组
    uint32_t npendings;
    
    // IO观察器(epoll实例)
    void* iowatcher;            // epoll_fd
};

hloop 中是怎么对多个连接进行存储的?答曰:采用基于 fd 索引的数组。可以看一下是怎么分配IO对象并存入数组的:

hio_t* hio_get(hloop_t* loop, int fd) {
    hio_t* io = __hio_get(loop, fd); // 动态扩容数组
    if (io == NULL) {
        // 1. 分配IO对象
        HV_ALLOC_SIZEOF(io);
        hio_init(io);
        
        // 2. 存储到数组中,fd作为索引
        io->loop = loop;
        io->fd = fd;
        loop->ios.ptr[fd] = io;  // ← 直接用fd索引
    }
    return io;
}
点击查看 __hio_get 动态扩容实现
static inline hio_t* __hio_get(hloop_t* loop, int fd) {
    // 动态扩容数组
    if (fd >= loop->ios.maxsize) {
        int newsize = ceil2e(fd);  // 向上取2的幂
        newsize = MAX(newsize, IO_ARRAY_INIT_SIZE);  // 最小1024
        io_array_resize(&loop->ios, newsize > fd ? newsize : 2*fd);
    }
    return loop->ios.ptr[fd];
}

hloop 中是怎么对连接进行调度的?答曰:采用 epoll 方式,首先调用epoll_wait阻塞等待有事件变化,然后遍历就绪事件处理,设置就绪标志,加入待等待队列中。

// iowatcher.c
int iowatcher_poll_events(hloop_t* loop, int timeout) {
    // 1. 调用epoll_wait阻塞等待
    int nevents = epoll_wait(
        loop->iowatcher->epfd,    // epoll实例
        loop->iowatcher->events,  // 就绪事件数组
        loop->iowatcher->maxevents, // 最大事件数
        timeout                    // 超时时间(ms)
    );
    
    // 2. 遍历就绪事件
    for (int i = 0; i < nevents; i++) {
        int fd = events[i].data.fd;
        hio_t* io = loop->ios.ptr[fd];  // O(1)查找
        
        // 3. 设置就绪标志
        if (events[i].events & EPOLLIN) {
            io->revents |= HV_READ;  // 可读
        }
        if (events[i].events & EPOLLOUT) {
            io->revents |= HV_WRITE; // 可写
        }
        
        // 4. 加入待处理队列
        EVENT_PENDING(io);
    }
    
    return nevents;
}

事件调度谁先谁后?优先级规则?答曰:设置一个优先级队列,设置不同事件的优先级,从高优先级到低优先级处理。

static int hloop_process_pendings(hloop_t* loop) {
    hevent_t* cur = NULL;
    hevent_t* next = NULL;
    int ncbs = 0;
    
    // 从高优先级到低优先级处理
    for (int i = HEVENT_PRIORITY_SIZE-1; i >= 0; --i) {
        cur = loop->pendings[i];
        while (cur) {
            next = cur->pending_next;
            if (cur->pending && cur->loop == loop) {
                if (cur->active && cur->cb) {
                    cur->cb(cur);  // ← 调用回调
                    ++ncbs;
                }
                cur->pending = 0;
                if (cur->destroy) {
                    EVENT_DEL(cur);
                }
            }
            cur = next;
        }
        loop->pendings[i] = NULL;
    }
    
    loop->npendings = 0;
    return ncbs;
}

优先级定义

#define HEVENT_LOWEST_PRIORITY    (-5)  // 空闲事件
#define HEVENT_LOW_PRIORITY       (-3)
#define HEVENT_NORMAL_PRIORITY      0   // 普通IO事件
#define HEVENT_HIGH_PRIORITY        3   // 监听socket
#define HEVENT_HIGHEST_PRIORITY     5   // 定时器、信号

#define HEVENT_PRIORITY_SIZE  11
#define HEVENT_PRIORITY_INDEX(priority) (priority - HEVENT_LOWEST_PRIORITY)

优先级队列结构

loop->pendings[0]  → 空闲事件 (优先级 -5)
loop->pendings[1]  → 
loop->pendings[2]  → 低优先级 (优先级 -3)
loop->pendings[3]  → 
loop->pendings[4]  → 
loop->pendings[5]  → 普通IO事件 (优先级 0)  ← 客户端连接
loop->pendings[6]  → 
loop->pendings[7]  → 
loop->pendings[8]  → 高优先级 (优先级 3)     ← 监听socket
loop->pendings[9]  → 
loop->pendings[10] → 最高优先级 (优先级 5)   ← 定时器

完整的事件处理流程

主循环,在while循环中,一直处理事件

int hloop_run(hloop_t* loop) {
    while (loop->status != HLOOP_STATUS_STOP) {
        ++loop->loop_cnt;
        
        // 处理事件
        hloop_process_events(loop, HLOOP_MAX_BLOCK_TIME);
    }
    return 0;
}

事件处理的步骤

int hloop_process_events(hloop_t* loop, int timeout_ms) {
    // ===== 步骤1: 计算阻塞时间 =====
    int32_t blocktime_ms = timeout_ms;  // 默认100ms
    if (loop->ntimers) {
        // 如果有定时器,计算最近到期时间
        int64_t min_timeout = TIMER_ENTRY(loop->timers.root)->next_timeout 
                            - loop->cur_hrtime;
        blocktime_ms = MIN(blocktime_ms, min_timeout / 1000 + 1);
    }
    
    // ===== 步骤2: IO多路复用 (阻塞等待) =====
    if (loop->nios) {
        nios = hloop_process_ios(loop, blocktime_ms);
        // 内部调用 epoll_wait:
        //   - 如果有IO就绪,立即返回
        //   - 如果无IO就绪,最多阻塞 blocktime_ms
        //   - 其他线程调用 hloop_wakeup 可提前唤醒
    } else {
        hv_msleep(blocktime_ms);  // 无IO,直接sleep
    }
    
    hloop_update_time(loop);  // 更新时间戳
    
    // ===== 步骤3: 处理定时器 =====
    if (loop->ntimers) {
        ntimers = hloop_process_timers(loop);
    }
    
    // ===== 步骤4: 处理空闲事件 =====
    if (loop->npendings == 0 && loop->nidles) {
        nidles = hloop_process_idles(loop);
    }
    
    // ===== 步骤5: 分发待处理事件 (调用回调) =====
    int ncbs = hloop_process_pendings(loop);
    
    return ncbs;
}