1.1 1.2 1.3 1.3.1 1.3.2 1.3.3 1.3.4 1.3.5 1.3.6 1.4 1.4.1 1.4.2 1.5 1.5.1 1.5.2 1.5.3 1.5.4 1.5.5 1.6 1.6.1 1.6.2 1.6.3 1.7 1.7.1 1.7.2 1.7.3 1.7.4 Libevent深入浅出
1 Libevent官方 2 epoll
2.1 流-IO操作-阻塞
2.2 解决阻塞死等待的办法 2.3 什么是epoll
2.4 epollAPI 2.5 触发模式
2.6 简单的epoll服务器 3 epoll和reactor
3.1 reactor反应堆模式 3.2 epoll的反应堆模式实现 4 event_base
4.1 创建event_base 4.2 检查event_base后端 4.3 释放event_base 4.4 event_base优先级 4.5 event_base和fork 5 事件循环event_loop
5.1 运行循环 5.2 停止循环
5.3 转储event_base的状态 6 事件event
6.1 创建事件
6.2 事件的未决和非未决 6.3 事件的优先级
6.4 检查事件状态
1.7.7 1.8 1.8.1 1.8.2 1.8.3 1.8.4 1.8.5 1.8.5.1 1.8.5.2 1.8.5.3 1.8.5.4 1.9 1.9.1 1.9.2 1.9.3 1.9.4 1.9.5 1.9.6 1.10 1.10.1 1.10.2 1.10.3 1.10.4 1.10.5 1.11 1.11.1 1.11.2 1.11.3 6.7 事件状态之间的转换
7 数据缓冲Bufferevent 7.1 回调和水位 7.2 延迟回调
7.3 bufferevent 选项标志 7.4 使用bufferevent 7.5 通用bufferevent操作
7.5.1 释放bufferevent操作
7.5.2 操作回调、水位和启用/禁用 7.5.3 操作bufferevent中的数据 7.5.4 bufferevent的清空操作 8 数据封装evBuffer
8.1 创建和释放evbuffer 8.2 evbuffer与线程安全 8.3 检查evbuffer
8.4 向evbuffer添加数据 8.5 evbuffer数据移动 8.6 添加数据到evbuffer前 8 链接监听器evconnlistener
8.1 创建和释放 evconnlistener 8.2 启用和禁用 evconnlistener 8.3 调整 evconnlistener 的回调函数 8.4 检测 evconnlistener
8.5 侦测错误 9 libevent常用设置
9.1 日志消息回调设置 9.2 致命错误回调设置 9.3 内存管理回调设置
1.12 1.12.1 1.12.2 1.12.3 1.12.4 1.12.5 10 基于libevent服务器
10.1 Hello_World服务器(基于信号) 10.2 基于事件服务器
10.3 回显服务器
10.3 libevent实现http服务器 10.4 libevent实现TCP/IP服务器
Libevent
本教程要求有一定的服务并发编程基础,了解select和epoll等多路I/O复用机制。
教程目的主要是快速建立libevent的认知,了解libevent的常用数据结构和编程方 法。
达到可以使用libevent写出自己的高并发服务器处理模型。
作者:刘丹冰
传智播客C++学院
1 Libevent官方
官方网站:http://libevent.org/
libevent版本一共有1.4系列和2.0系列两个稳定版本。
1.4系列比较古老,但是源码简单,适合源码的学习 2.0系列比较新,见识直接使用2.0
需要注意的是,1.4系列和2.0系列两个版本的接口并不兼容,就是2.0将一些接口的 原型发>生了改变,所以将1.4升级到2.0需要重新编码。
1.1 libevent 特点
事件驱动,高性能;
轻量级,专注于网络;
跨平台,支持 Windows、Linux、Mac Os等;
支持多种 I/O多路复用技术, epoll、poll、dev/poll、select 和kqueue 等;
支持 I/O,定时器和信号等事件;
1.2 libevent下载与安装
在官网上找到 libevent-2.0.22-stable.tar.gz 下载地址。
tar -zxvf libevent-2.0.22-stable.tar.gz
cd libevent-2.0.22-stable/
./configure
make
sudo make install
注意
如果在libevent安装目录make之后会生成一个.libs/, 里面如果没有
libevent_openssl.so说明系统没有安装openssl库。 但是如果安装了,依然没有 这个文件生成,可能需要制定openssl路径
ln -s /usr/local/ssl/include/openssl /usr/include/openssl
1.3 libevent开源包
在 .libs 隐藏文件中包含全部libevent已经编译好的so文件。
其中core为libevent的核心文件,libevent.so为主链接文件,会关联到其他全部so文 件。
在sample目录下会有已经编译好的服务器应用程序。
可以拿 hello-world 程序用来测试。
服务端:
./hello-world
客户端:
netcat 192.168.2.105 9995
如果客户端收到“hello world”字符串,表示libevent在本机可以正常使用。
2 EPOLL
本章主要简述epoll的基础理论知识。方便理解libevent。
2.1 流?I\/O操作\/阻塞
2.1.1 流
可以进行I\/O操作的内核对象 文件、管道、套接字……
流的入口:文件描述符(fd)
2.1.2 I\/O操作
所有对流的读写操作,我们都可以称之为IO操作。
那么当一个流中再没有数据,read的时候,或者说 在流中已经写满了数据,再write,我们的IO操作就 会出现一种现象,就是阻塞现象。
2.1.2 阻塞
阻塞等待
空出大脑可以安心睡觉。(不占用CPU宝贵的时间片)
非阻塞,忙轮询
浪费时间,浪费电话费,占用快递员时间(占用CPU,系统资源)
2.2 解决阻塞死等待的办法 2.2.1 阻塞死等待的缺点
2.2.2 办法一:非阻塞、忙轮询
while true {
for i in 流[] { if i has 数据 { 读 或者 其他处理 }
} }
2.2.3 办法二:select
select 代收员 比较懒,她只会告诉你快递到了,但是是谁到的,你需要挨个快递员 问一遍。
while true {
select(流[]); //阻塞
for i in 流[] { if i has 数据 { 读 或者 其他处理 }
} }
2.2.3 办法三:epoll
while true {
可处理的流[] = epoll_wait(epoll_fd); //阻塞
for i in 可处理的流[] { 读 或者 其他处理 }
}
2.3 什么是epoll
与select,poll一样,对I\/O多路复用的技术
只关心“活跃”的链接,无需遍历全部描述符集合 能够处理大量的链接请求(系统可以打开的文件数目)
2.4 epoll API
2.4.1 创建EPOLL
/**
* @param size 告诉内核监听的数目 *
* @returns 返回一个epoll句柄(即一个文件描述符)
*/
int epoll_create(int size);
int epfd = epoll_create(1000);
2.4.2 控制EPOLL
/**
* @param epfd 用epoll_create所创建的epoll句柄 * @param op 表示对epoll监控描述符控制的动作 *
* EPOLL_CTL_ADD(注册新的fd到epfd)
* EPOLL_CTL_MOD(修改已经注册的fd的监听事件) * EPOLL_CTL_DEL(epfd删除一个fd)
*
* @param fd 需要监听的文件描述符 * @param event 告诉内核需要监听的事件 *
* @returns 成功返回0,失败返回-1, errno查看错误信息 */
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
struct epoll_event {
__uint32_t events; /* epoll 事件 */
epoll_data_t data; /* 用户传递的数据 */
}
/*
* events : {EPOLLIN, EPOLLOUT, EPOLLPRI, EPOLLHUP, EPOLLET, EPOLLONESHOT}
*/
typedef union epoll_data { void *ptr;
int fd;
uint32_t u32;
uint64_t u64;
} epoll_data_t;
struct epoll_event new_event;
new_event.events = EPOLLIN | EPOLLOUT;
new_event.data.fd = 5;
epoll_ctl(epfd, EPOLL_CTL_ADD, 5, &new_event);
2.4.3 等待EPOLL
/**
*
* @param epfd 用epoll_create所创建的epoll句柄 * @param event 从内核得到的事件集合
* @param maxevents 告知内核这个events有多大,
* 注意: 值 不能大于创建epoll_create()时的size.
* @param timeout 超时时间 * -1: 永久阻塞
* 0: 立即返回,非阻塞 * >0: 指定微秒
*
* @returns 成功: 有多少文件描述符就绪,时间到时返回0 * 失败: -1, errno 查看错误
*/
int epoll_wait(int epfd, struct epoll_event *event, int maxevents, int timeout);
struct epoll_event my_event[1000];
int event_cnt = epoll_wait(epfd, my_event, 1000, -1);
2.4.4 epoll编程框架
//创建 epoll
int epfd = epoll_crete(1000);
//将 listen_fd 添加进 epoll 中
epoll_ctl(epfd, EPOLL_CTL_ADD, listen_fd,&listen_event);
while (1) {
//阻塞等待 epoll 中 的fd 触发
int active_cnt = epoll_wait(epfd, events, 1000, -1);
for (i = 0 ; i < active_cnt; i++) {
if (evnets[i].data.fd == listen_fd) {
//accept. 并且将新accept 的fd 加进epoll中.
}
else if (events[i].events & EPOLLIN) { //对此fd 进行读操作
}
else if (events[i].events & EPOLLOUT) { //对此fd 进行写操作
} } }
2.5 触发模式
2.5.1 水平触发
水平触发的主要特点是,如果用户在监听epoll事件,当内核有事件的时候,会拷贝 给用户态事件,但是如果用户只处理了一次,那么剩下没有处理的会在下一次 epoll_wait再次返回该事件。
这样如果用户永远不处理这个事件,就导致每次都会有该事件从内核到用户的拷 贝,耗费性能,但是水平触发相对安全,最起码事件不会丢掉,除非用户处理完 毕。
2.5.2 边缘触发
边缘触发,相对跟水平触发相反,当内核有事件到达, 只会通知用户一次,至于用 户处理还是不处理,以后将不会再通知。这样减少了拷贝过程,增加了性能,但是 相对来说,如果用户马虎忘记处理,将会产生事件丢的情况。
2.6 epoll服务器 2.6.1 服务端
#include <stdio.h>
#include <stdlib.h>
#include <ctype.h>
#include <string.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <sys/epoll.h>
#define SERVER_PORT (7778)
#define EPOLL_MAX_NUM (2048)
#define BUFFER_MAX_LEN (4096)
char buffer[BUFFER_MAX_LEN];
void str_toupper(char *str) {
int i;
for (i = 0; i < strlen(str); i ++) { str[i] = toupper(str[i]);
} }
int main(int argc, char **argv) {
int listen_fd = 0;
int client_fd = 0;
struct sockaddr_in server_addr;
struct sockaddr_in client_addr;
socklen_t client_len;
int epfd = 0;
struct epoll_event event, *my_events;
// socket
listen_fd = socket(AF_INET, SOCK_STREAM, 0);
// bind
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
server_addr.sin_port = htons(SERVER_PORT);
bind(listen_fd, (struct sockaddr*)&server_addr, sizeof(serve r_addr));
// listen
listen(listen_fd, 10);
// epoll create
epfd = epoll_create(EPOLL_MAX_NUM);
if (epfd < 0) {
perror("epoll create");
goto END;
}
// listen_fd -> epoll event.events = EPOLLIN;
event.data.fd = listen_fd;
if (epoll_ctl(epfd, EPOLL_CTL_ADD, listen_fd, &event) < 0) { perror("epoll ctl add listen_fd ");
goto END;
}
my_events = malloc(sizeof(struct epoll_event) * EPOLL_MAX_NU M);
while (1) {
// epoll wait
int active_fds_cnt = epoll_wait(epfd, my_events, EPOLL_M
AX_NUM, -1);
int i = 0;
for (i = 0; i < active_fds_cnt; i++) { // if fd == listen_fd
if (my_events[i].data.fd == listen_fd) { //accept
client_fd = accept(listen_fd, (struct sockaddr*)
&client_addr, &client_len);
if (client_fd < 0) { perror("accept");
continue;
}
char ip[20];
printf("new connection[%s:%d]\n", inet_ntop(AF_I NET, &client_addr.sin_addr, ip, sizeof(ip)), ntohs(client_addr.s in_port));
event.events = EPOLLIN | EPOLLET;
event.data.fd = client_fd;
epoll_ctl(epfd, EPOLL_CTL_ADD, client_fd, &event );
}
else if (my_events[i].events & EPOLLIN) { printf("EPOLLIN\n");
client_fd = my_events[i].data.fd;
// do read
buffer[0] = '\0';
int n = read(client_fd, buffer, 5);
if (n < 0) {
perror("read");
continue;
}
else if (n == 0) {
epoll_ctl(epfd, EPOLL_CTL_DEL, client_fd, &e vent);
close(client_fd);
}
else {
printf("[read]: %s\n", buffer);
buffer[n] = '\0';
#if 1
str_toupper(buffer);
write(client_fd, buffer, strlen(buffer));
printf("[write]: %s\n", buffer);
memset(buffer, 0, BUFFER_MAX_LEN);
#endif
/*
event.events = EPOLLOUT;
event.data.fd = client_fd;
epoll_ctl(epfd, EPOLL_CTL_MOD, client_fd, &e vent);
*/
} }
else if (my_events[i].events & EPOLLOUT) { printf("EPOLLOUT\n");
/*
client_fd = my_events[i].data.fd;
str_toupper(buffer);
write(client_fd, buffer, strlen(buffer));
printf("[write]: %s\n", buffer);
memset(buffer, 0, BUFFER_MAX_LEN);
event.events = EPOLLIN;
event.data.fd = client_fd;
epoll_ctl(epfd, EPOLL_CTL_MOD, client_fd, &event );
*/
} } }
END:
close(epfd);
close(listen_fd);
return 0;
}
2.6.2 客户端
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <strings.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <fcntl.h>
#define MAX_LINE (1024)
#define SERVER_PORT (7778)
void setnoblocking(int fd) {
int opts = 0;
opts = fcntl(fd, F_GETFL);
opts = opts | O_NONBLOCK;
fcntl(fd, F_SETFL);
}
int main(int argc, char **argv) {
int sockfd;
char recvline[MAX_LINE + 1] = {0};
struct sockaddr_in server_addr;
if (argc != 2) {
fprintf(stderr, "usage ./client <SERVER_IP>\n");
exit(0);
}
// 创建socket
if ( (sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { fprintf(stderr, "socket error");
exit(0);
}
// server addr 赋值
bzero(&server_addr, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(SERVER_PORT);
if (inet_pton(AF_INET, argv[1], &server_addr.sin_addr) <= 0) {
fprintf(stderr, "inet_pton error for %s", argv[1]);
exit(0);
}
// 链接服务端
if (connect(sockfd, (struct sockaddr*) &server_addr, sizeof(
server_addr)) < 0) {
perror("connect");
fprintf(stderr, "connect error\n");
exit(0);
}
setnoblocking(sockfd);
char input[100];
int n = 0;
int count = 0;
// 不断的从标准输入字符串
while (fgets(input, 100, stdin) != NULL)
{
printf("[send] %s\n", input);
n = 0;
// 把输入的字符串发送 到 服务器中去
n = send(sockfd, input, strlen(input), 0);
if (n < 0) {
perror("send");
}
n = 0;
count = 0;
// 读取 服务器返回的数据 while (1)
{
n = read(sockfd, recvline + count, MAX_LINE);
if (n == MAX_LINE) {
count += n;
continue;
}
else if (n < 0){
perror("recv");
break;
}
else {
count += n;
recvline[count] = '\0';
printf("[recv] %s\n", recvline);
break;
} } }
return 0;
}
3.1 reactor反应堆模式
对每一个构架模式的分析,我们都使用参考文献的分析风格,着重分析意图、上下 文、问题、解决方案、结构和实现 6个方面的内容。
1. 意图
在事件驱动的应用中,将一个或多个客户的服务请求分离(demultiplex)和调度
(dispatch)给应用程序。
2. 上下文
在事件驱动的应用中,同步地、有序地处理同时接收的多个服务请求。
3. 问题
在分布式系统尤其是服务器这一类事件驱动应用中,虽然这些请求最终会被序列化 地处理,但是必须时刻准备着处理多个同时到来的服务请求。在实际应用 中,这些 请求总是通过一个事件(如CONNECTOR、READ、WRITE等)来表示的。在有 序地处理这些服务请求之前,应用程序必须先分离和调度这些 同时到达的事件。为 了有效地解决这个问题,我们需要做到以下4方面:
为了提高系统的可测量性和反应时间,应用程序不能长时间阻塞在某个事件源上而 停止对其他事件的处理,这样会严重降低对客户端的响应度。 为了提高吞吐量,任 何没有必要的上下文切换、同步和CPU之间的数据移动都要避免。 引进新的服务或 改良已有的服务都要对既有的事件分离和调度机制带来尽可能小的影响。 大量的应 用程序代码需要隐藏在复杂的多线程和同步机制之后。
4. 解决方案
在一个或多个事件源上等待事件的到来,例如,一个已经连接的Socket描述符就是 一个事件源。将事件的分离和调度整合到处理它的服务中,而将分离和调度机制从 应用程序对特定事件的处理中分离开,也就是说分离和调度机制与特定的应用程序
无关。
具体来说,每个应用程序提供的每个服务都有一个独立的事件处理器与之对应。由 事件处理器处理来自事件源的特定类型的事件。每个事件处理器都事先注册 到 Reactor管理器中。Reactor管理器使用同步事件分离器在一个或多个事件源中等待 事件的发生。当事件发生后,同步事件分离器通知 Reactor管理器,最后由Reactor 管理器调度和该事件相关的事件处理器来完成请求的服务。
5. 结构
在Reactor模式中,有5个关键的参与者。
描述符(handle):由操作系统提供,用于识别每一个事件,如Socket描述符、文 件描述符等。在Linux中,它用一个整数来表示。事件可以来自外部,如来自客户端 的连接请求、数据等。事件也可以来自内部,如定时器事件。 同步事件分离器
(demultiplexer):是一个函数,用来等待一个或多个事件的发生。调用者会被阻 塞,直到分离器分离的描述符集上有事件发生。Linux的select函数是一个经常被使 用的分离器。 事件处理器接口(event handler):是由一个或多个模板函数组成的 接口。这些模板函数描述了和应用程序相关的对某个事件的操作。 具体的事件处理 器:是事件处理器接口的实现。它实现了应用程序提供的某个服务。每个具体的事 件处理器总和一个描述符相关。它使用描述符来识别事件、识别应用程序提供的服 务。 Reactor 管理器(reactor):定义了一些接口,用于应用程序控制事件调度,
以及应用程序注册、删除事件处理器和相关的描述符。它是事件处理器的调度核 心。 Reactor管理器使用同步事件分离器来等待事件的发生。一旦事件发生,
Reactor管理器先是分离每个事件,然后调度事件处理器,最后调用相关的模 板函 数来处理这个事件。 通过上述分析,我们注意到,是Reactor管理器而不是应用程 序负责等待事件、分离事件和调度事件。实际上,Reactor管理器并没有被具体的 事件处理器调用,而是管理器调度具体的事件处理器,由事件处理器对发生的事件 做出处理。这就是类似Hollywood原则的“反向控制”。应用程序要做的 仅仅是实现 一个具体的事件处理器,然后把它注册到Reactor管理器中。接下来的工作由管理 器来完成。这些参与者的相互关系如图2-1所示。
现在结合第1章分析的框架五元素来看一下Reactor构架模式的参与者与框架五元素 之间的关系:Reactor构架模式的具体实现对应了元素1; 事件处理器接口对应元素 2;具体的事件处理器对应元素3;Reactor管理器使用了Hollywood原则,可以认为 和元素5对应;元素4的功能相对不 明显,没有明确的对应关系。
如果还是没有理解Reactor构架模式,没有关系,源代码会说明所有问题。此时可 再分析一遍Reactor构架模式,然后继续以下内容。
3.2 epoll的反应堆模式实现
epoll反应堆模式的实现-也就是libevent的实现原理。
#include <stdlib.h>
#include <stdio.h>
#include <stdio.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <time.h>
#define MAX_EVENTS 1024
#define BUFLEN 128
#define SERV_PORT 8080
/*
* status:1表示在监听事件中,0表示不在
* last_active:记录最后一次响应时间,做超时处理 */
struct myevent_s {
int fd; //cfd listenfd int events; //EPOLLIN EPLLOUT void *arg; //指向自己结构体指针
void (*call_back)(int fd, int events, void *arg);
int status;
char buf[BUFLEN];
int len;
long last_active;
};
int g_efd; /* epoll_create返回的句柄 */
struct myevent_s g_events[MAX_EVENTS+1]; /* +1 最后一个用于 list en fd */
void eventset(struct myevent_s *ev, int fd, void (*call_back)(int , int, void *), void *arg)
{
ev->fd = fd;
ev->call_back = call_back;
ev->events = 0;
ev->arg = arg;
ev->status = 0;
//memset(ev->buf, 0, sizeof(ev->buf));
//ev->len = 0;
ev->last_active = time(NULL);
return;
}
void recvdata(int fd, int events, void *arg);
void senddata(int fd, int events, void *arg);
void eventadd(int efd, int events, struct myevent_s *ev) {
struct epoll_event epv = {0, {0}};
int op;
epv.data.ptr = ev;
epv.events = ev->events = events;
if (ev->status == 1) { op = EPOLL_CTL_MOD;
} else {
op = EPOLL_CTL_ADD;
ev->status = 1;
}
if (epoll_ctl(efd, op, ev->fd, &epv) < 0)
printf("event add failed [fd=%d], events[%d]\n", ev->fd, events);
else
printf("event add OK [fd=%d], op=%d, events[%0X]\n", ev-
>fd, op, events);
return;
}
void eventdel(int efd, struct myevent_s *ev) {
struct epoll_event epv = {0, {0}};
if (ev->status != 1) return;
epv.data.ptr = ev;
ev->status = 0;
epoll_ctl(efd, EPOLL_CTL_DEL, ev->fd, &epv);
return;
}
void acceptconn(int lfd, int events, void *arg) {
struct sockaddr_in cin;
socklen_t len = sizeof(cin);
int cfd, i;
if ((cfd = accept(lfd, (struct sockaddr *)&cin, &len)) == -1 ) {
if (errno != EAGAIN && errno != EINTR) { /* 暂时不做出错处理 */
}
printf("%s: accept, %s\n", __func__, strerror(errno));
return;
}
do {
for (i = 0; i < MAX_EVENTS; i++) { if (g_events[i].status == 0) break;
}
if (i == MAX_EVENTS) {
printf("%s: max connect limit[%d]\n", __func__, MAX_
EVENTS);
break;
}
int flag = 0;
if ((flag = fcntl(cfd, F_SETFL, O_NONBLOCK)) < 0) {
printf("%s: fcntl nonblocking failed, %s\n", __func_
_, strerror(errno));
break;
}
eventset(&g_events[i], cfd, recvdata, &g_events[i]);
eventadd(g_efd, EPOLLIN, &g_events[i]);
} while(0);
printf("new connect [%s:%d][time:%ld], pos[%d]\n", inet_ntoa (cin.sin_addr), ntohs(cin.sin_port), g_events[i].last_active, i)
;
return;
}
void recvdata(int fd, int events, void *arg) {
struct myevent_s *ev = (struct myevent_s *)arg;
int len;
len = recv(fd, ev->buf, sizeof(ev->buf), 0);
eventdel(g_efd, ev);
if (len > 0) { ev->len = len;
ev->buf[len] = '\0';
printf("C[%d]:%s\n", fd, ev->buf);
/* 转换为发送事件 */
eventset(ev, fd, senddata, ev);
eventadd(g_efd, EPOLLOUT, ev);
}
else if (len == 0) { close(ev->fd);
/* ev-g_events 地址相减得到偏移元素位置 */
printf("[fd=%d] pos[%d], closed\n", fd, (int)(ev - g_eve nts));
}
else {
close(ev->fd);
printf("recv[fd=%d] error[%d]:%s\n", fd, errno, strerror (errno));
}
return;
}
void senddata(int fd, int events, void *arg) {
struct myevent_s *ev = (struct myevent_s *)arg;
int len;
len = send(fd, ev->buf, ev->len, 0);
//printf("fd=%d\tev->buf=%s\ttev->len=%d\n", fd, ev->buf, ev ->len);
//printf("send len = %d\n", len);
eventdel(g_efd, ev);
if (len > 0) {
printf("send[fd=%d], [%d]%s\n", fd, len, ev->buf);
eventset(ev, fd, recvdata, ev);
eventadd(g_efd, EPOLLIN, ev);
}
else {
close(ev->fd);
printf("send[fd=%d] error %s\n", fd, strerror(errno));
}
return;
}
void initlistensocket(int efd, short port) {
int lfd = socket(AF_INET, SOCK_STREAM, 0);
fcntl(lfd, F_SETFL, O_NONBLOCK);
eventset(&g_events[MAX_EVENTS], lfd, acceptconn, &g_events[M AX_EVENTS]);
eventadd(efd, EPOLLIN, &g_events[MAX_EVENTS]);
struct sockaddr_in sin;
memset(&sin, 0, sizeof(sin));
sin.sin_family = AF_INET;
sin.sin_addr.s_addr = INADDR_ANY;
sin.sin_port = htons(port);
bind(lfd, (struct sockaddr *)&sin, sizeof(sin));
listen(lfd, 20);
return;
}
int main(int argc, char *argv[]) {
unsigned short port = SERV_PORT;
if (argc == 2)
port = atoi(argv[1]);
g_efd = epoll_create(MAX_EVENTS+1);
if (g_efd <= 0)
printf("create efd in %s err %s\n", __func__, strerror(e rrno));
initlistensocket(g_efd, port);
/* 事件循环 */
struct epoll_event events[MAX_EVENTS+1];
printf("server running:port[%d]\n", port);
int checkpos = 0, i;
while (1) {
/* 超时验证,每次测试100个链接,不测试listenfd 当客户端60秒内没 有和服务器通信,则关闭此客户端链接 */
long now = time(NULL);
for (i = 0; i < 100; i++, checkpos++) { if (checkpos == MAX_EVENTS)
checkpos = 0;
if (g_events[checkpos].status != 1) continue;
long duration = now - g_events[checkpos].last_active
;
if (duration >= 60) {
close(g_events[checkpos].fd);
printf("[fd=%d] timeout\n", g_events[checkpos].f d);
eventdel(g_efd, &g_events[checkpos]);
} }
/* 等待事件发生 */
int nfd = epoll_wait(g_efd, events, MAX_EVENTS+1, 1000);
if (nfd < 0) {
printf("epoll_wait error, exit\n");
break;
}
for (i = 0; i < nfd; i++) {
struct myevent_s *ev = (struct myevent_s *)events[i]
.data.ptr;
if ((events[i].events & EPOLLIN) && (ev->events & EP OLLIN)) {
ev->call_back(ev->fd, events[i].events, ev->arg)
;
}
if ((events[i].events & EPOLLOUT) && (ev->events & E POLLOUT)) {
ev->call_back(ev->fd, events[i].events, ev->arg)
;
} }
}
/* 退出前释放所有资源 */
return 0;
}
4 event_base
本章主要来源《libevent参考手册(中文版)》。
使用 libevent 函数之前需要分配一个或者多个 event_base 结构体。每个 event_base 结构 体持有一个事件集合,可以检测以确定哪个事件是激活的。
如果设置 event_base 使用锁,则可以安全地在多个线程中访问它 。然而,其事件循 环只能 运行在一个线程中。如果需要用多个线程检测 IO,则需要为每个线程使用一 个 event_base。
每个 event_base 都有一种用于检测哪种事件已经就绪的 “方法”,或者说后端。可以 识别的方法有:
select poll epoll kqueue devpoll evport win32
4.1 创建event_base
4.1.1 创建默认的event_base
event_base_new()函数分配并且返回一个新的具有默认设置的 event_base。函数 会检测环境变量,返回一个到 event_base 的指针。如果发生错误,则返回 NULL。选 择各种方法时,函数会选择 OS 支持的最快方法。
struct event_base *event_base_new(void);
大多数程序使用这个函数就够了。
event_base_new()函数声明在中,首次出现在 libevent 1.4.3版。
4.1.2 创建复杂的event_base
要对取得什么类型的 event_base 有更多的控制,就需要使用 event_config。
event_config 是一个容纳 event_base 配置信息的不透明结构体。需要 event_base 时,将 event_config 传递给event_base_new_with_config ()。
创建接口
struct event_config *event_config_new(void);
struct event_base *
event_base_new_with_config(const struct event_config *cfg);
void event_config_free(struct event_config *cfg);
要使用这些函数分配 event_base,先调用 event_config_new()分配一个
event_config。 然后,对 event_config 调用其它函数,设置所需要的 event_base 特 征。最后,调用 event_base_new_with_config()获取新的 event_base。完成工作后, 使用 event_config_free ()释放 event_config。
int event_config_avoid_method(struct event_config *cfg, const ch ar *method);
enum event_method_feature { EV_FEATURE_ET = 0x01, EV_FEATURE_O1 = 0x02, EV_FEATURE_FDS = 0x04, };
int event_config_require_features(struct event_config *cfg,
enum event_method_feature feat ure);
enum event_base_config_flag {
EVENT_BASE_FLAG_NOLOCK = 0x01, EVENT_BASE_FLAG_IGNORE_ENV = 0x02, EVENT_BASE_FLAG_STARTUP_IOCP = 0x04, EVENT_BASE_FLAG_NO_CACHE_TIME = 0x08,
EVENT_BASE_FLAG_EPOLL_USE_CHANGELIST = 0x10, EVENT_BASE_FLAG_PRECISE_TIMER = 0x20
};
int event_config_set_flag(struct event_config *cfg, enum event_base_config_flag flag);
调用 event_config_avoid_method ()可以通过名字让 libevent 避免使用特定的可用 后端 。 调用 event_config_require_feature ()让 libevent 不使用不能提供所有指定 特征的后端。 调用 event_config_set_flag()让 libevent 在创建 event_base 时设置 一个或者多个将在下面介绍的运行时标志。
event_config_require_features()可识别的特征值有:
EV_FEATURE_ET:要求支持边沿触发的后端
EV_FEATURE_O1:要求添加、删除单个事件,或者确定哪个事件激活的操作是 O(1)复杂度的后端
EV_FEATURE_FDS:要求支持任意文件描述符,而不仅仅是套接字的后端 event_config_set_flag()可识别的选项值有:
EVENT_BASE_FLAG_NOLOCK :不要为 event_base 分配锁。设置这个选项 可以 为 event_base 节省一点用于锁定和解锁的时间,但是让在多个线程中访问
event_base 成为不安全的。
EVENTBASE_FLAG_IGNORE_ENV :选择使用的后端时,不要检测 EVENT* 环 境 变量。使用这个标志需要三思:这会让用户更难调试你的程序与 libevent 的 交互。
EVENT_BASE_FLAG_STARTUP_IOCP:仅用于 Windows,让 libevent 在启动 时就 启用任何必需的 IOCP 分发逻辑,而不是按需启用。
EVENT_BASE_FLAG_NO_CACHE_TIME :不是在事件循环每次准备执行超时 回调时 检测当前时间,而是在每次超时回调后进行检测。注意:这会消耗更多的 CPU时间。
EVENT_BASE_FLAG_EPOLL_USE_CHANGELIST :告诉 libevent ,如果决定 使 用 epoll 后端,可以安全地使用更快的基于 changelist 的后端。epoll-
changelist 后端可以 在后端的分发函数调用之间,同样的 fd 多次修改其状态的 情况下,避免不必要的系统 调用。但是如果传递任何使用 dup()或者其变体克隆 的 fd 给 libevent,epoll-changelist 后端会触发一个内核 bug,导致不正确的结 果。在不使用 epoll 后端的情况下,这个标 志是没有效果的。也可以通过设置 EVENT_EPOLL_USE_CHANGELIST 环境变量来 打开 epoll-changelist 选 项。
上述操作 event_config 的函数都在成功时返回0,失败时返回-1。
设置 event_config,请求 OS 不能提供的后端是很容易的 。比如说,对于 libevent 2.0.1-alpha, 在 Windows 中是没有 O(1)后端的;在 Linux 中也没有同时提供 EV_FEATURE_FDS 和 EV_FEATURE_O1 特征的后端。如果创建了 libevent 不能满足的配置, event_base_new_with_config ()会返回 NULL。
4.2 检查event_base后端
有时候需要检查 event_base 支持哪些特征,或者当前使用哪种方法。
接口1
const char **event_get_supported_methods(void);
event_get_supported_methods()函数返回一个指针 ,指向 libevent 支持的方法名字 数组 。 这个数组的最后一个元素是 NULL。
实例:
int i;
const char **methods = event_get_supported_methods();
printf("Starting Libevent %s. Available methods are:\n", event_get_version());
for (i=0; methods[i] != NULL; ++i) { printf(" %s\n", methods[i]);
}
这个函数返回 libevent 被编译以支持的方法列表 。然而 libevent 运行的时候, 操作系统可能 不能支持所有方法。比如说,可能 OS X 版本中的 kqueue 的 bug 太多,无法使用。
接口2
const char *
event_base_get_method(const struct event_base *base);
enum event_method_feature
event_base_get_features(const struct event_base *base);
event_base_get_method()返回 event_base 正在使用的方法。
event_base_get_features ()返回 event_base 支持的特征的比特掩码。
实例
struct event_base *base;
enum event_method_feature f;
base = event_base_new();
if (!base) {
puts("Couldn't get an event_base!");
} else {
printf("Using Libevent with backend method %s.", event_base_get_method(base));
f = event_base_get_features(base);
if ((f & EV_FEATURE_ET))
printf(" Edge-triggered events are supported.");
if ((f & EV_FEATURE_O1))
printf(" O(1) event notification is supported.");
if ((f & EV_FEATURE_FDS))
printf(" All FD types are supported.");
puts("");
}
4.3 释放event_base
使用完 event_base 之后,使用 event_base_free()进行释放。
void event_base_free(struct event_base *base);
注意:这个函数不会释放当前与 event_base 关联的任何事件,或者关闭他们的套 接字 ,或 者释放任何指针。
event_base_free()定义在中,首次由 libevent 1.2实现。
4.4 event_base优先级
libevent支持为事件设置多个优先级。然而, event_base默认只支持单个优先级。可 以调用 event_base_priority_init()设置 event_base 的优先级数目。
int event_base_priority_init(struct event_base *base, int n_prio rities);
成功时这个函数返回 0,失败时返回 -1。base 是要修改的 event_base,n_priorities 是要支 持的优先级数目,这个数目至少是 1 。每个新的事件可用的优先级将从 0 (最 高) 到 n_priorities-1(最低)。
常量 EVENT_MAX_PRIORITIES 表示 n_priorities 的上限。调用这个函数时为 n_priorities 给出更大的值是错误的。
必须在任何事件激活之前调用这个函数,最好在创建 event_base 后立刻调用。
4.5 event_base和fork
不是所有事件后端都在调用 fork()之后可以正确工作。所以,如果在使用 fork()或者 其 他相关系统调用启动新进程之后,希望在新进程中继续使用 event_base,就需要进 行重新初始化。
int event_reinit(struct event_base *base);
成功时这个函数返回 0,失败时返回 -1。
实例
struct event_base *base = event_base_new();
/* ... add some events to the event_base ... */
if (fork()) {
/* In parent */
continue_running_parent(base); /*...*/
} else {
/* In child */
event_reinit(base);
continue_running_child(base); /*...*/
}
event_reinit()定义在中,在 libevent 1.4.3-alpha 版中首次可用。
5 事件循环event_loop
5.1 运行循环
一旦有了一个已经注册了某些事件的 event_base(关于如何创建和注册事件请看下 一节 ), 就需要让 libevent 等待事件并且通知事件的发生。
#define EVLOOP_ONCE 0x01
#define EVLOOP_NONBLOCK 0x02
#define EVLOOP_NO_EXIT_ON_EMPTY 0x04
int event_base_loop(struct event_base *base, int flags);
默认情况下,event_base_loop()函数运行 event_base 直到其中没有已经注册的事件 为止。执行循环的时候 ,函数重复地检查是否有任何已经注册的事件被触发 (比如 说,读事件 的文件描述符已经就绪,可以读取了;或者超时事件的超时时间即将到达 )。如果有事件被触发,函数标记被触发的事件为 “激活的”,并且执行这些事件。
在 flags 参数中设置一个或者多个标志就可以改变 event_base_loop()的行为。如果 设置了 EVLOOP_ONCE ,循环将等待某些事件成为激活的 ,执行激活的事件直到没 有更多的事件可以执行,然会返回。如果设置了 EVLOOP_NONBLOCK,循环不会等 待事件被触发: 循环将仅仅检测是否有事件已经就绪,可以立即触发,如果有,则执行事 件的回调。
完成工作后,如果正常退出, event_base_loop()返回0;如果因为后端中的某些未处理 错误而退出,则返回 -1。
为帮助理解,这里给出 event_base_loop()的算法概要:
while (any events are registered with the loop, or EVLOOP_NO_EXIT_ON_EMPTY was set) {
if (EVLOOP_NONBLOCK was set, or any events are already activ e)
If any registered events have triggered, mark them activ e.
else
Wait until at least one event has triggered, and mark it active.
for (p = 0; p < n_priorities; ++p) {
if (any event with priority of p is active) { Run all active events with priority of p.
break; /* Do not run any events of a less important pr iority */
} }
if (EVLOOP_ONCE was set or EVLOOP_NONBLOCK was set) break;
}
为方便起见,也可以调用
int event_base_dispatch(struct event_base *base);
event_base_dispatch ()等同于没有设置标志的 event_base_loop ( )。所以, event_base_dispatch ()将一直运行,直到没有已经注册的事件了,或者调用 了 event_base_loopbreak()或者 event_base_loopexit()为止。
这些函数定义在中,从 libevent 1.0版就存在了。
5.2 停止循环
如果想在移除所有已注册的事件之前停止活动的事件循环,可以调用两个稍有不同的 函数 。
int event_base_loopexit(struct event_base *base, const struct timeval *tv);
int event_base_loopbreak(struct event_base *base);
event_base_loopexit()让 event_base 在给定时间之后停止循环。如果 tv 参数为 NULL, event_base 会立即停止循环,没有延时。
如果 event_base 当前正在执行任何激活事件的回调,则回调会继续运行,直到运行完 所有激活事件的回调之才退出。
event_base_loopbreak ()让 event_base 立即退出循环。它与
event_base_loopexit (base,NULL)的不同在于,如果 event_base 当前正在执行激活 事件的回调 ,它将在执行完当前正在处理的事件后立即退出。
注意 event_base_loopexit(base,NULL) 和 event_base_loopbreak(base) 在事 件循环没有运行时的行为不同:前者安排下一次事件循环在下一轮回调完成后立 即停止(就好像带 EVLOOP_ONCE 标志调用一样);后者却仅仅停止当前正在运 行的循环,如果事件循环没 有运行,则没有任何效果。
这两个函数都在成功时返回 0,失败时返回 -1。
实例:
#include <event2/event.h>
/* Here's a callback function that calls loopbreak */
void cb(int sock, short what, void *arg) {
struct event_base *base = arg;
event_base_loopbreak(base);
}
void main_loop(struct event_base *base, evutil_socket_t watchdog _fd)
{
struct event *watchdog_event;
/* Construct a new event to trigger whenever there are any b ytes to
read from a watchdog socket. When that happens, we'll ca ll the
cb function, which will make the loop exit immediately wi thout
running any other active events at all.
*/
watchdog_event = event_new(base, watchdog_fd, EV_READ, cb, b ase);
event_add(watchdog_event, NULL);
event_base_dispatch(base);
}
实例2:执行事件循环10秒,然后退出
#include <event2/event.h>
void run_base_with_ticks(struct event_base *base) {
struct timeval ten_sec;
ten_sec.tv_sec = 10;
ten_sec.tv_usec = 0;
/* Now we run the event_base for a series of 10-second interva ls, printing
"Tick" after each. For a much better way to implement a 10 -second
timer, see the section below about persistent timer events.
*/
while (1) {
/* This schedules an exit ten seconds from now. */
event_base_loopexit(base, &ten_sec);
event_base_dispatch(base);
puts("Tick");
} }
有时候需要知道对 event_base_dispatch()或者 event_base_loop()的调用是正常退 出 的,还是因为调用 event_base_loopexit()或者 event_base_break()而退出的。可 以调 用下述函数来确定是否调用了 loopexit 或者 break函数。
int event_base_got_exit(struct event_base *base);
int event_base_got_break(struct event_base *base);
这两个函数分别会在循环是因为调用 event_base_loopexit()或者
event_base_break()而退出的时候返回 true,否则返回 false。下次启动事件循环的 时候,这些值会被重设。
这些函数声明在中。
event_break_loopexit()函数首次在 libevent 1.0c 版本 中实现;
event_break_loopbreak()首次在 libevent 1.4.3版本中实现。
5.3 转储event_base的状态
为帮助调试程序(或者调试 libevent),有时候可能需要加入到 event_base 的事件及其 状态 的完整列表。调用 event_base_dump_events()可以将这个列表输出到指定的 文件中。
void event_base_dump_events(struct event_base *base, FILE *f);
这个列表是人可读的,未来版本的 libevent 将会改变其格式。
6 事件event
libevent 的基本操作单元是事件。每个事件代表一组条件的集合,这些条件包括:
文件描述符已经就绪,可以读取或者写入
文件描述符变为就绪状态,可以读取或者写入(仅对于边沿触发 IO) 超时事件
发生某信号 用户触发事件
所有事件具有相似的生命周期。调用 libevent 函数设置事件并且关联到
event_base 之后, 事件进入“已初始化(initialized)”状态。此时可以将事件添加到 event_base 中,这使之进入“未决(pending)”状态。在未决状态下,如果触发事件的条 件发生(比如说,文件描述 符的状态改变,或者超时时间到达 ),则事件进入“激活 (active)”状态,(用户提供的)事件回调函数将被执行。如果配置为“持久的
(persistent)”,事件将保持为未决状态。否则, 执行完回调后,事件不再是未决的。删 除操作可以让未决事件成为非未决(已初始化)的 ; 添加操作可以让非未决事件再次 成为未决的。
6.1 创建事件
6.1.1 生成新事件
使用 event_new()接口创建事件。
#define EV_TIMEOUT 0x01
#define EV_READ 0x02
#define EV_WRITE 0x04
#define EV_SIGNAL 0x08
#define EV_PERSIST 0x10
#define EV_ET 0x20
typedef void (*event_callback_fn)(evutil_socket_t, short, void *)
;
struct event *event_new(struct event_base *base, evutil_socket_t fd,
short what, event_callback_fn cb, void *arg);
void event_free(struct event *event);
event_new()试图分配和构造一个用于 base 的新的事件。 what 参数是上述标志的 集合。
如果 fd 非负,则它是将被观察其读写事件的文件。
事件被激活时, libevent 将调用 cb 函数,
传递这些参数:文件描述符 fd,表示所有被触发事件的位字段 ,以及构造事件时的 arg 参数。
发生内部错误,或者传入无效参数时, event_new()将返回 NULL。
所有新创建的事件都处于已初始化和非未决状态 ,调用 event_add()可以使其成 为未决的。
要释放事件,调用 event_free()。对未决或者激活状态的事件调用 event_free()是安 全 的:在释放事件之前,函数将会使事件成为非激活和非未决的。
实例:
#include <event2/event.h>
void cb_func(evutil_socket_t fd, short what, void *arg) {
const char *data = arg;
printf("Got an event on socket %d:%s%s%s%s [%s]", (int) fd,
(what&EV_TIMEOUT) ? " timeout" : "", (what&EV_READ) ? " read" : "", (what&EV_WRITE) ? " write" : "", (what&EV_SIGNAL) ? " signal" : "", data);
}
void main_loop(evutil_socket_t fd1, evutil_socket_t fd2) {
struct event *ev1, *ev2;
struct timeval five_seconds = {5,0};
struct event_base *base = event_base_new();
/* The caller has already set up fd1, fd2 somehow, and m ake them
nonblocking. */
ev1 = event_new(base, fd1, EV_TIMEOUT|EV_READ|EV_PERSIST , cb_func,
(char*)"Reading event");
ev2 = event_new(base, fd2, EV_WRITE|EV_PERSIST, cb_func, (char*)"Writing event");
event_add(ev1, &five_seconds);
event_add(ev2, NULL);
event_base_dispatch(base);
}
上述函数定义在 中,首次出现在 libevent 2.0.1-alpha 版本中。 event_callback_fn 类 型首次在2.0.4-alpha 版本中作为 typedef 出现。
6.1.2 事件标志
EV_TIMEOUT
这个标志表示某超时时间流逝后事件成为激活的。构造事件的时候,EV_TIMEOUT 标志是 被忽略的:可以在添加事件的时候设置超时 ,也可以不设置。超时发生时,回调 函数的 what 参数将带有这个标志。
EV_READ
表示指定的文件描述符已经就绪,可以读取的时候,事件将成为激活的。
EV_WRITE
表示指定的文件描述符已经就绪,可以写入的时候,事件将成为激活的。
EV_SIGNAL 用于实现信号检测,请看下面的 “构造信号事件”节。
EV_PERSIST 表示事件是“持久的”,请看下面的“关于事件持久性”节。
EV_ET
表示如果底层的 event_base 后端支持边沿触发事件,则事件应该是边沿触发的。这 个标志 影响 EV_READ 和 EV_WRITE 的语义。
从2.0.1-alpha 版本开始,可以有任意多个事件因为同样的条件而未决。比如说,可以 有两 个事件因为某个给定的 fd 已经就绪,可以读取而成为激活的。这种情况下,多个 事件回调 被执行的次序是不确定的。
这些标志定义在中。除了 EV_ET 在2.0.1-alpha 版本中引入外,所有标志 从1.0 版本开始就存在了。
6.1.3 关于事件持久性
默认情况下,每当未决事件成为激活的(因为 fd 已经准备好读取或者写入,或者因为超 时), 事件将在其回调被执行前成为非未决的。如果想让事件再次成为未决的 ,可以 在回调函数中 再次对其调用 event_add()。
然而,如果设置了 EV_PERSIST 标志,事件就是持久的。这意味着即使其回调被激活 ,事件还是会保持为未决状态 。如果想在回调中让事件成为非未决的 ,可以对其调用 event_del ()。
每次执行事件回调的时候,持久事件的超时值会被复位。因此,如果具有
EV_READ|EV_PERSIST 标志,以及5秒的超时值,则事件将在以下情况下成为激活 的:
套接字已经准备好被读取的时候
从最后一次成为激活的开始,已经逝去 5秒
6.1.4 信号事件
libevent 也可以监测 POSIX 风格的信号。要构造信号处理器,使用:
#define evsignal_new(base, signum, cb, arg) \
event_new(base, signum, EV_SIGNAL|EV_PERSIST, cb, arg)
除了提供一个信号编号代替文件描述符之外,各个参数与 event_new()相同。
实例
struct event *hup_event;
struct event_base *base = event_base_new();
/* call sighup_function on a HUP signal */
hup_event = evsignal_new(base, SIGHUP, sighup_function, NULL);
注意 :信号回调是信号发生后在事件循环中被执行的,所以可以安全地调用通常 不能 在 POSIX 风格信号处理器中使用的函数。
警告 :不要在信号事件上设置超时,这可能是不被支持的。 [待修正:真是这样的吗?]
libevent 也提供了一组方便使用的宏用于处理信号事件: