用 epoll 的 echo 程序

完整代码放在最前面,只是一个简单的 echo 程序,没有业务逻辑,收到数据之后立即发送回去。然而,还是有些细节值得记录一下,关于 TCP Server 通常需要设置的 TCP 套接字属性,屏蔽 SIGPIPE 信号,以及最重要的:如何使用 epoll,ET 模式和 LT 模式是怎样的。

#include <stdio.h>
#include <unistd.h>
#include <stdint.h>
#include <stdlib.h>
#include <fcntl.h>
#include <string.h>
#include <sys/socket.h>
#include <signal.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>
#include <sys/epoll.h>
#include <errno.h>
#include <assert.h>

#define MAX_EVENT 1024
#define BACKLOG 128
#define MAX_DATALEN 1024

typedef struct sockaddr SA;

int set_tcp_reuse(int sock)
{
    int opt = 1;
    return setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (void *)&opt, sizeof(opt));
}

int set_tcp_nodelay(int sock)
{
    int opt = 1;
    return setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (void *)&opt, sizeof(opt));
}

int set_non_block(int fd)
{
    int flags;
    flags = fcntl(fd, F_GETFL, NULL);
    if (flags == -1) {
        perror("fcntl F_GETFL error");
        return -1;
    }
    flags |= O_NONBLOCK;
    if (fcntl(fd, F_SETFL, flags) == -1) {
        perror("fcntl F_SETFL error");
        return -1;
    }
    return 0;
}

int ignore_sigpipe()
{
    struct sigaction sa;
    memset(&sa, 0, sizeof(struct sigaction));
    sa.sa_handler = SIG_IGN;
    if (sigaction(SIGPIPE, &sa, NULL)) {
        perror("sigaction error");
        return -1;
    }
    return 0;
}

int create_tcp_server(const char *ip, uint16_t port, int backlog)
{
    int ret = -1;
    socklen_t len = 0;

    if (ignore_sigpipe()) {
        printf("setting ignore sigpipe failed\n");
        return -1;
    }

    int sock = socket(AF_INET, SOCK_STREAM, 0);
    if (sock < 0) {
        perror("create socket error");
        return -1;
    }
    struct sockaddr_in addr;
    addr.sin_port = htons(port);
    addr.sin_family = AF_INET;
    if (!ip) {
        addr.sin_addr.s_addr = htonl(INADDR_ANY);
    } else {
        ret = inet_pton(AF_INET, ip, (SA *)&addr.sin_addr);
        if (ret <= 0) {
            if (ret == 0) {
                fprintf(stderr, "not invalid ip: [%s]\n", ip);
            } else {
                perror("inet_pton error");
            }
            return -1;
        }
    }

    if (set_tcp_reuse(sock) == -1) {
        perror("setsockopt SO_REUSEADDR error");
        return -1;
    }

    if (set_tcp_nodelay(sock) == -1) {
        perror("setsockopt TCP_NODELAY error");
        return -1;
    }

    len = sizeof(SA);
    if (bind(sock, (SA *)&addr, len) == -1) {
        perror("bind error");
        return -1;
    }

    if (listen(sock, backlog) == -1) {
        perror("listen error");
        return -1;
    }

    return sock;
}

int set_read_event(int epfd, int ctl, int fd)
{
    struct epoll_event ev;
    ev.data.u64 = 0;
    ev.data.fd = fd;
    ev.events = EPOLLIN | EPOLLET;

    if (epoll_ctl(epfd, ctl, fd, &ev) == -1) {
        perror("epoll_ctl error");
        return -1;
    }
    return 0;
}

int handle_recv(int fd)
{
    int read = 0;
    int n = 0;
    int ret = 0;
    char buf[MAX_DATALEN + 1] = {0};
    int space = 0;
    size_t cap = MAX_DATALEN;
    while (1) {
        n = recv(fd, buf + read, cap - read, 0);
        if (n < 0) {
            if (errno == EWOULDBLOCK || errno == EAGAIN) {
                break;
            } else {
                ret = -1;
                break;
            }
        } else if (n == 0) {
            ret = -1;
            break;
        } else {
            space = cap - read;
            read += n;
            if (n == space) {
                if (cap - read == 0) {
                    break;
                }
                continue;
            } else {
                break;
            }
        }
    }

    buf[read] = '\0';

    if (ret < 0) {
        return -1;
    }

    // not a good way, send can fail.
    send(fd, buf, read, 0);
    return 0;
}

int main(int argc, char *argv[])
{
    const char *ip = NULL;
    uint16_t port = 32667;
    int srv = create_tcp_server(ip, port, BACKLOG);
    if (srv < 0) {
        fprintf(stderr, "create tcp server failed\n");
        return -1;
    }

    struct epoll_event ev, events[MAX_EVENT];
    int epfd = -1;
    if ((epfd = epoll_create(1)) == -1) {
        perror("epoll_create error");
        return -1;
    }
    memset(&ev, 0, sizeof(ev));
    ev.events = EPOLLIN;
    ev.data.fd = srv;
    if (epoll_ctl(epfd, EPOLL_CTL_ADD, srv, &ev) == -1) {
        perror("epoll_ctl add srv error");
        close(srv);
        return -1;
    }
    int i, ret, cli, fd;
    struct sockaddr_in addr;
    socklen_t addrlen = sizeof(SA);
    while (1) {
        ret = epoll_wait(epfd, events, MAX_EVENT, -1);
        if (ret > 0) {
            for (i = 0; i < ret; i++) {
                fd = events[i].data.fd;
                if (events[i].events & EPOLLIN) {
                    if (fd == srv) {
                        cli = accept(srv, (SA *)&addr, &addrlen);
                        if (cli == -1) {
                            perror("accept error");
                        } else {
                            if (set_non_block(cli) == -1) {
                                fprintf(stderr, "set cli non block failed\n");
                                close(cli);
                                continue;
                            }
                            if (set_read_event(epfd, EPOLL_CTL_ADD, cli) != 0) {
                                close(cli);
                                continue;
                            }
                        }
                    } else {
                        if (handle_recv(fd) == -1) {
                            if (epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL)) {
                                perror("epoll_ctl EPOLL_CTL_DEL cli error");
                            }
                            close(fd);
                            continue;
                        }
                    }
                }
            }
        } else if (ret == 0) {
            continue;
        } else {
            perror("epoll_wait error");
            continue;
        }
    }
    return 0;
}

下面是对代码的一些解释。

TCP Server 的三个“必要”的设置

set_tcp_reuse 函数设置了 TCP 的 SO_REUSEADDR,以免进程重启时一直无法重新监听端口。

set_tcp_nodelay 函数设置了 TCP_NODELAY 属性,关闭 TCP 的 Nagle 算法,是为了让信息尽快地真正发送出去。如果开启 Nagle 算法,有可能会多等一会才把数据真正发出去。这里的代码需要 #include &lt;netinet/tcp.h&gt; 头文件。

ignore_sigpipe 函数用来忽略 SIGPIPE 信号,对于 SIGPIPE 信号,默认的处理是退出当前进程。当客户端关闭了连接,服务端仍然在调用 send/write 发送数据,就会产生 SIGPIPE 信号,这显然是不能接受的。

epoll 的 ET 模式

使用 epoll 的默认 LT 模式,基本上不用担心 recv/read 读不全数据,内核缓冲区里只要有数据,就会得到 EPOLLIN 事件,通知进程去读数据,不需要循环调用 recv 保证读到所有数据。

使用 ET 模式的工作方式 (EPOLLET) 跟 LT 有一点区别,当 socket 可读时,只会通知一次,如果收到通知之后没有读完数据,不会再通知。为了保证数据读完,需要循环读一直到某些条件再跳出循环。

如何保证数据全部读完

下面一段代码是个例子:

// fd 为非阻塞套接字
while (1) {
    n = read(fd, buf, bufsiz, 0);
    if (n < 0) {
        if (errno == EAGAIN || errno == EWOULDBLOCK) {
            break;
        } else {
            // .....
        }
    } else if (n == 0) {
        break;    
    } else {
        if (n < bufsiz) {
            break;
        } else {
            continue;
        }
    }
}

从代码中可以看到跳出循环的条件:

  1. recv 返回错误是 EAGAINEWOULDBLOCK 时,说明已经没有数据可读。当设置套接字为非阻塞模式,或者设置了超时时间,才会有这样的错误。
  2. recv 返回了 0,说明对方已经关闭连接,服务端自然也不再继续读它,而是要关闭对应的套接字。
  3. recv 返回的值小于预定的 buffer 的大小(n &lt; bufsiz),说明内核缓冲区里还剩下 n 个字节数据,都已经读完了,不需要继续读了。

如果 n == bufsiz 说明还可能有数据没读完,就要进入下一个循环。也有可能恰好这次读了 bufsiz 字节的数据,而且再没有数据了,这样下一个循环调用 recv 时就会遇到上述的第一个条件: EAGAINEWOULDBLOCK 错误。

设想一下,如果是用的默认的阻塞套接字,也没有设置超时时间,这次 recv 是得不到 EAGAINEWOULDBLOCK 错误的,程序被一直阻塞在这里,所以 EPOLLET 要和非阻塞套接字配合使用

没读完数据会怎样

使用 ET 模式,一次 EPOLLIN 时间之后,即使没读完数据,也不再通知程序去读数据。一直到下一次网卡又接收到了数据,程序又收到通知,然后调用 recv,这里先读到的是上一次没读完的在内核缓冲区里地数据,或许会产生一些预料之外的问题。

具体的一个试验

场景(S 表示服务端,C 表示客户端)

  • 监听 EPOLLIN|EPOLLET 事件(使用 ET 模式)
  • S 端 每次 recv 接收 5 字节的数据

过程:

  1. C 发送 10 字节数据 “helloworld” 到 S,S 网卡收到这 10 字节数据,EPOLLIN 触发,S recv 5 字节数据”hello”,仍然有 5 字节数据在内核缓冲区内,不再通知
  2. C 又发送 5 字节数据 “12345” 到 S,S 网卡收到这 5 字节数据,EPOLLIN 触发,S 继续 recv 5 字节,收到的是 “world”,上一次发送没从内核缓冲区取完的数据。
  3. C 又发送 3 字节数据 “xyz” 到 S,S 网卡收到这 3 字节数据,EPOLLIN 触发,S 继续 recv 5 字节,收到 “12345”
  4. C 又发生 1 字节数据 “a” 到 S,S 网卡收到这 1 字节数据,EPOLLIN 触发,S recv 之后,recv 返回值为 4,收到的是 “xyza”
  5. C 又发送 10 字节数据 “helloworld” 到 S,S 收到 “hello”
  6. C 关闭连接退出,再次触发 EPOLLIN 事件,S 调用 recv,收到 “world”。

如果是一个 echo 程序,那么客户端每次发送了消息之后,“反射”回来的内容都不是预期的了,并且在上面这个试验中,最后一步,S 错过了关闭套接字的好机会。如果程序有更复杂的业务逻辑已经把人搞得焦头烂额,再出现这个例子的情况,或许会使人痛苦不堪。

另外两点

继续以上面的试验为例,如果因为某些原因,S 一直没去读 C 发送的数据, C 仍然按照上面的方式发送了几次数据,总共 23 个字节的数据,都被放在 S 的内核缓冲区内,这样就只会被触发一次,S 只会收到最初的 5 个字节。

还有一点要提一下。当触发一次 EPOLLIN 之后,在处理过程中,而且这次处理有没有读完的数据,又调用了 epoll_ctl,(重复)设置监听 EPOLLIN 事件,那么 EPOLLIN 是会被再次触发的。

比如在最开始的代码中这样修改

int handle_recv(int epfd, int fd)
{
    ....
    ....
    set_read_event(epfd, fd);
    return send(fd, buf, read, 0);
}

处理完这一次 EPOLLIN 事件,又重新设置去监听 EPOLLIN 事件,内核缓冲区里有数据,那就会再来一次 EPOLLIN(好像 LT 模式),这是个奇怪的用法。这里只是为了描述重复设置 EPOLLIN 事件会出现的情况。

Subscribe to Atom

Don’t miss out on the latest issues. Sign up now to get access to the library of members-only issues.
jamie@example.com
Subscribe