tinyhttpd
https://github.com/ithewei/libhv/blob/master/examples/tinyhttpd.c
这是一个基于 libhv 事件循环的极简 HTTP 服务器实现,采用了多线程 Reactor 模式。
Reactor 模式
多线程 Reactor(Reactor Pattern with Thread Pool / Multi-Reactor)是一种高性能网络服务器架构模式,常用于构建像 Nginx、libhv、Netty、Redis、Memcached 这样的高并发网络程序。
它是在 单线程 Reactor 的基础上,为了更好地利用多核 CPU,将 事件分发和事件处理拆分到多个线程中。
有这样几种 Reactor 模式
单线程 Reactor
+-------------------+ | Reactor 主线程 | 监听全部 socket 事件(epoll) | | → 收到事件 | | → 执行回调处理(读取/写入/业务) +-------------------+实现起来简单、无锁;缺点就是所有操作都在一个线程,CPU 利用率低,不适合高并发业务
Reactor + 工作线程池
[Reactor 主线程] | epoll_wait 等待事件 | 当 socket 可读/可写 ↓ 分发给 ↓ 工作线程池(Worker Threads) | 每个线程执行业务逻辑 / 处理数据Reactor 只负责检测事件,不做重活;实际业务交给线程池,适合高并发业务型服务器
主从 Reactor
[主 Reactor] (只负责接收连接 accept) ↓ 分发连接 +-----------------------------+ | 从 Reactor 1 | 从 Reactor 2 | ... 每个维护自己的 epoll 线程 | | | | IO 读写 | IO 读写 | +--------------+--------------+主线程处理连接建立(accept),多个子 Reactor 线程分别处理 IO 事件,Nginx 就是此架构
这里的 Reactor 模式采用上述第二种方式。
┌─────────────────────────────────────────────────────────────┐
│ Accept Thread (主线程) │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ accept_loop │ │
│ │ ├─ listenfd (监听 8000 端口) │ │
│ │ └─ on_accept: 接收新连接 │ │
│ └──────────────────┬──────────────────────────────────┘ │
└─────────────────────┼──────────────────────────────────────┘
│
│ 负载均衡分发
↓
┌─────────────────────────────────────────────────────────────┐
│ Worker Threads │
├─────────────────┬─────────────────┬─────────────────────────┤
│ Worker 0 │ Worker 1 │ Worker N │
│ ┌─────────────┐ │ ┌─────────────┐ │ ┌─────────────┐ │
│ │worker_loop 0│ │ │worker_loop 1│ │ │worker_loop N│ │
│ │ ├─ conn1 │ │ │ ├─ conn3 │ │ │ ├─ conn5 │ │
│ │ ├─ conn2 │ │ │ └─ conn4 │ │ │ └─ conn6 │ │
│ └─────────────┘ │ └─────────────┘ │ └─────────────┘ │
└─────────────────┴─────────────────┴─────────────────────────┘
对应到代码,是这样创建的,将主线程作为 accept 线程,另外创建 N 个 worker 线程用于跑工作线程事件循环
static hloop_t* accept_loop = NULL; // 接收线程的事件循环
static hloop_t** worker_loops = NULL; // 工作线程的事件循环数组
int main(int argc, char** argv) {
// 1. 创建 N 个 worker 线程
worker_loops = (hloop_t**)malloc(sizeof(hloop_t*) * thread_num);
for (int i = 0; i < thread_num; ++i) {
worker_loops[i] = hloop_new(HLOOP_FLAG_AUTO_FREE);
hthread_create(worker_thread, worker_loops[i]); // 启动工作线程
}
// 2. 主线程作为 accept 线程
accept_loop = hloop_new(HLOOP_FLAG_AUTO_FREE);
accept_thread(accept_loop);
}
完整流程
上述main方法里的逻辑执行完了,此时当前线程应该有一个 tcp_server,且运行了事件循环,它就是 Accept Thread,其中的 on_accept 回调函数的内容就是实现负载均衡分发:
static void on_accept(hio_t* io) {
// 1. 从 accept_loop 中剥离连接
hio_detach(io);
// ↓ 将 io 从 accept_loop 的管理中移除
// 2. 选择一个 worker 线程 (Round-Robin 轮询)
hloop_t* worker_loop = get_next_loop();
// 3. 跨线程投递事件到 worker
hevent_t ev;
memset(&ev, 0, sizeof(ev));
ev.loop = worker_loop;
ev.cb = new_conn_event; // worker 线程的回调
ev.userdata = io; // 传递 io 对象
hloop_post_event(worker_loop, &ev);
// ↓ 线程安全地投递事件到 worker_loop
}
// Round-Robin 负载均衡
static hloop_t* get_next_loop() {
static int s_cur_index = 0;
if (s_cur_index == thread_num) {
s_cur_index = 0;
}
return worker_loops[s_cur_index++];
// 示例: 0 → 1 → 2 → 3 → 0 → 1 → ...
}
当把事件投到 Workder 线程之后 (对应 new_conn_event 函数),Worker 线程会开始处理。执行的内容大致是,将 io 关联到当前工作线程的 workder_loop,做一些准备工作,然后开始读取HTTP请求内容,直到遇到 \r\n。
之后是HTTP 解析状态机 (on_recv) 执行,对应代码和流程如下
static void on_recv(hio_t* io, void* buf, int readbytes) {
char* str = (char*)buf;
http_conn_t* conn = (http_conn_t*)hevent_userdata(io);
http_msg_t* req = &conn->request;
switch (conn->state) {
// ========== 状态1: 解析请求行 ==========
case s_first_line:
// 输入: "GET /ping HTTP/1.1\r\n"
str[readbytes - 2] = '\0'; // 去掉 \r\n
// 解析: 方法、路径、版本
if (!parse_http_request_line(conn, str, readbytes - 2)) {
hio_close(io);
return;
}
// 结果: method="GET", path="/ping", major=1, minor=1
// 继续读取 HTTP 头部
conn->state = s_head;
hio_readline(io);
break;
// ========== 状态2: 解析 HTTP 头 ==========
case s_head:
// 输入: "Content-Length: 12\r\n"
// "Content-Type: text/plain\r\n"
// "\r\n" ← 空行表示头部结束
if (readbytes == 2 && str[0] == '\r' && str[1] == '\n') {
// 头部结束
conn->state = s_head_end;
} else {
str[readbytes - 2] = '\0';
if (!parse_http_head(conn, str, readbytes - 2)) {
hio_close(io);
return;
}
hio_readline(io); // 继续读下一行
break;
}
// ========== 状态3: 头部结束,检查 Body ==========
case s_head_end:
if (req->content_length == 0) {
// 无 Body,直接处理请求
conn->state = s_end;
goto s_end;
} else {
// 有 Body,读取指定长度
conn->state = s_body;
hio_readbytes(io, req->content_length);
// ↓ 精确读取 content_length 字节
break;
}
// ========== 状态4: 读取 Body ==========
case s_body:
req->body = str;
req->body_len += readbytes;
if (req->body_len == req->content_length) {
// Body 读取完成
conn->state = s_end;
} else {
// Body 太大,需要多次读取
break;
}
// ========== 状态5: 请求完成,处理业务 ==========
case s_end:
s_end:
// 调用业务处理函数
on_request(conn);
if (hio_is_closed(io)) return;
if (req->keepalive) {
// HTTP/1.1 Keep-Alive: 重用连接
memset(&conn->request, 0, sizeof(http_msg_t));
memset(&conn->response, 0, sizeof(http_msg_t));
conn->state = s_first_line;
hio_readline(io); // 等待下一个请求
} else {
// HTTP/1.0 或 Connection: close
hio_close(io);
}
break;
}
}
s_begin
↓
s_first_line ← 读取 "GET /ping HTTP/1.1\r\n"
↓ parse_http_request_line
s_head ← 读取 "Content-Length: 12\r\n"
↓ parse_http_head
↓ 遇到 "\r\n"
s_head_end
↓ 检查 content_length
├─ 0 → s_end (无 Body)
└─ > 0 → s_body
↓ hio_readbytes(content_length)
s_body ← 读取 Body 数据
↓ body_len == content_length
s_end
↓ on_request
├─ keepalive=1 → 重置状态,等待下一个请求
└─ keepalive=0 → hio_close
再往后就是业务处理(on_request)了
static int on_request(http_conn_t* conn) {
http_msg_t* req = &conn->request;
// ========== 路由表 ==========
if (strcmp(req->method, "GET") == 0) {
// GET /ping
if (strcmp(req->path, "/ping") == 0) {
http_reply(conn, 200, "OK", TEXT_PLAIN, "pong", 4);
return 200;
}
// GET / 或 GET /index.html
return http_serve_file(conn);
} else if (strcmp(req->method, "POST") == 0) {
// POST /echo
if (strcmp(req->path, "/echo") == 0) {
// 回显请求 Body
http_reply(conn, 200, "OK", req->content_type,
req->body, req->content_length);
return 200;
}
}
// 未实现的方法
http_reply(conn, 501, NOT_IMPLEMENTED, TEXT_HTML,
HTML_TAG_BEGIN NOT_IMPLEMENTED HTML_TAG_END, 0);
return 501;
}
然后是发送响应,通过调用 hio_write 实现
static int http_reply(http_conn_t* conn,
int status_code, const char* status_message,
const char* content_type,
const char* body, int body_len) {
http_msg_t* req = &conn->request;
http_msg_t* resp = &conn->response;
// 1. 构造响应头
resp->major_version = req->major_version;
resp->minor_version = req->minor_version;
resp->status_code = status_code;
strncpy(resp->status_message, status_message, 63);
strncpy(resp->content_type, content_type, 63);
resp->keepalive = req->keepalive;
if (body) {
if (body_len <= 0) body_len = strlen(body);
resp->content_length = body_len;
resp->body = (char*)body;
}
// 2. 序列化 HTTP 响应
char* buf = NULL;
STACK_OR_HEAP_ALLOC(buf, HTTP_MAX_HEAD_LENGTH + resp->content_length,
HTTP_MAX_HEAD_LENGTH + 1024);
int msglen = http_response_dump(resp, buf,
HTTP_MAX_HEAD_LENGTH + resp->content_length);
// 3. 发送响应
int nwrite = hio_write(conn->io, buf, msglen);
STACK_OR_HEAP_FREE(buf);
return nwrite < 0 ? nwrite : msglen;
}
响应格式像这样
HTTP/1.1 200 OK
Server: libhv/1.3.0
Connection: keep-alive
Content-Length: 4
Content-Type: text/plain
Date: Mon, 13 Oct 2025 12:34:56 GMT
pong