/ TCP

用 epoll 的 echo 程序

完整代码放在最前面,只是一个简单的 echo 程序,没有业务逻辑,收到数据之后立即发送回去。然而,还是有些细节值得记录一下,关于 TCP Server 通常需要设置的 TCP 套接字属性,屏蔽 SIGPIPE 信号,以及最重要的:如何使用 epoll,ET 模式和 LT 模式是怎样的。

#include <stdio.h>
#include <unistd.h>
#include <stdint.h>
#include <stdlib.h>
#include <fcntl.h>
#include <string.h>
#include <sys/socket.h>
#include <signal.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>
#include <sys/epoll.h>
#include <errno.h>
#include <assert.h>

#define MAX_EVENT 1024
#define BACKLOG 128
#define MAX_DATALEN 1024

typedef struct sockaddr SA;

int set_tcp_reuse(int sock)
{
    int opt = 1;
    return setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (void *)&opt, sizeof(opt));
}

int set_tcp_nodelay(int sock)
{
    int opt = 1;
    return setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (void *)&opt, sizeof(opt));
}

int set_non_block(int fd)
{
    int flags;
    flags = fcntl(fd, F_GETFL, NULL);
    if (flags == -1) {
        perror("fcntl F_GETFL error");
        return -1;
    }
    flags |= O_NONBLOCK;
    if (fcntl(fd, F_SETFL, flags) == -1) {
        perror("fcntl F_SETFL error");
        return -1;
    }
    return 0;
}

int ignore_sigpipe()
{
    struct sigaction sa;
    memset(&sa, 0, sizeof(struct sigaction));
    sa.sa_handler = SIG_IGN;
    if (sigaction(SIGPIPE, &sa, NULL)) {
        perror("sigaction error");
        return -1;
    }
    return 0;
}

int create_tcp_server(const char *ip, uint16_t port, int backlog)
{
    int ret = -1;
    socklen_t len = 0;

    if (ignore_sigpipe()) {
        printf("setting ignore sigpipe failed\n");
        return -1;
    }

    int sock = socket(AF_INET, SOCK_STREAM, 0);
    if (sock < 0) {
        perror("create socket error");
        return -1;
    }
    struct sockaddr_in addr;
    addr.sin_port = htons(port);
    addr.sin_family = AF_INET;
    if (!ip) {
        addr.sin_addr.s_addr = htonl(INADDR_ANY);
    } else {
        ret = inet_pton(AF_INET, ip, (SA *)&addr.sin_addr);
        if (ret <= 0) {
            if (ret == 0) {
                fprintf(stderr, "not invalid ip: [%s]\n", ip);
            } else {
                perror("inet_pton error");
            }
            return -1;
        }
    }

    if (set_tcp_reuse(sock) == -1) {
        perror("setsockopt SO_REUSEADDR error");
        return -1;
    }

    if (set_tcp_nodelay(sock) == -1) {
        perror("setsockopt TCP_NODELAY error");
        return -1;
    }

    len = sizeof(SA);
    if (bind(sock, (SA *)&addr, len) == -1) {
        perror("bind error");
        return -1;
    }

    if (listen(sock, backlog) == -1) {
        perror("listen error");
        return -1;
    }

    return sock;
}

int set_read_event(int epfd, int ctl, int fd)
{
    struct epoll_event ev;
    ev.data.u64 = 0;
    ev.data.fd = fd;
    ev.events = EPOLLIN | EPOLLET;

    if (epoll_ctl(epfd, ctl, fd, &ev) == -1) {
        perror("epoll_ctl error");
        return -1;
    }
    return 0;
}

int handle_recv(int fd)
{
    int read = 0;
    int n = 0;
    int ret = 0;
    char buf[MAX_DATALEN + 1] = {0};
    int space = 0;
    size_t cap = MAX_DATALEN;
    while (1) {
        n = recv(fd, buf + read, cap - read, 0);
        if (n < 0) {
            if (errno == EWOULDBLOCK || errno == EAGAIN) {
                break;
            } else {
                ret = -1;
                break;
            }
        } else if (n == 0) {
            ret = -1;
            break;
        } else {
            space = cap - read;
            read += n;
            if (n == space) {
                if (cap - read == 0) {
                    break;
                }
                continue;
            } else {
                break;
            }
        }
    }

    buf[read] = '\0';

    if (ret < 0) {
        return -1;
    }

    // not a good way, send can fail.
    send(fd, buf, read, 0);
    return 0;
}

int main(int argc, char *argv[])
{
    const char *ip = NULL;
    uint16_t port = 32667;
    int srv = create_tcp_server(ip, port, BACKLOG);
    if (srv < 0) {
        fprintf(stderr, "create tcp server failed\n");
        return -1;
    }

    struct epoll_event ev, events[MAX_EVENT];
    int epfd = -1;
    if ((epfd = epoll_create(1)) == -1) {
        perror("epoll_create error");
        return -1;
    }
    memset(&ev, 0, sizeof(ev));
    ev.events = EPOLLIN;
    ev.data.fd = srv;
    if (epoll_ctl(epfd, EPOLL_CTL_ADD, srv, &ev) == -1) {
        perror("epoll_ctl add srv error");
        close(srv);
        return -1;
    }
    int i, ret, cli, fd;
    struct sockaddr_in addr;
    socklen_t addrlen = sizeof(SA);
    while (1) {
        ret = epoll_wait(epfd, events, MAX_EVENT, -1);
        if (ret > 0) {
            for (i = 0; i < ret; i++) {
                fd = events[i].data.fd;
                if (events[i].events & EPOLLIN) {
                    if (fd == srv) {
                        cli = accept(srv, (SA *)&addr, &addrlen);
                        if (cli == -1) {
                            perror("accept error");
                        } else {
                            if (set_non_block(cli) == -1) {
                                fprintf(stderr, "set cli non block failed\n");
                                close(cli);
                                continue;
                            }
                            if (set_read_event(epfd, EPOLL_CTL_ADD, cli) != 0) {
                                close(cli);
                                continue;
                            }
                        }
                    } else {
                        if (handle_recv(fd) == -1) {
                            if (epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL)) {
                                perror("epoll_ctl EPOLL_CTL_DEL cli error");
                            }
                            close(fd);
                            continue;
                        }
                    }
                }
            }
        } else if (ret == 0) {
            continue;
        } else {
            perror("epoll_wait error");
            continue;
        }
    }
    return 0;
}

下面是对代码的一些解释。

TCP Server 的三个“必要”的设置

set_tcp_reuse 函数设置了 TCP 的 SO_REUSEADDR,以免进程重启时一直无法重新监听端口。

set_tcp_nodelay 函数设置了 TCP_NODELAY 属性,关闭 TCP 的 Nagle 算法,是为了让信息尽快地真正发送出去。如果开启 Nagle 算法,有可能会多等一会才把数据真正发出去。这里的代码需要 #include &lt;netinet/tcp.h&gt; 头文件。

ignore_sigpipe 函数用来忽略 SIGPIPE 信号,对于 SIGPIPE 信号,默认的处理是退出当前进程。当客户端关闭了连接,服务端仍然在调用 send/write 发送数据,就会产生 SIGPIPE 信号,这显然是不能接受的。

epoll 的 ET 模式

使用 epoll 的默认 LT 模式,基本上不用担心 recv/read 读不全数据,内核缓冲区里只要有数据,就会得到 EPOLLIN 事件,通知进程去读数据,不需要循环调用 recv 保证读到所有数据。

使用 ET 模式的工作方式 (EPOLLET) 跟 LT 有一点区别,当 socket 可读时,只会通知一次,如果收到通知之后没有读完数据,不会再通知。为了保证数据读完,需要循环读一直到某些条件再跳出循环。

如何保证数据全部读完

下面一段代码是个例子:

// fd 为非阻塞套接字
while (1) {
    n = read(fd, buf, bufsiz, 0);
    if (n < 0) {
        if (errno == EAGAIN || errno == EWOULDBLOCK) {
            break;
        } else {
            // .....
        }
    } else if (n == 0) {
        break;    
    } else {
        if (n < bufsiz) {
            break;
        } else {
            continue;
        }
    }
}

从代码中可以看到跳出循环的条件:

  1. recv 返回错误是 EAGAINEWOULDBLOCK 时,说明已经没有数据可读。当设置套接字为非阻塞模式,或者设置了超时时间,才会有这样的错误。
  2. recv 返回了 0,说明对方已经关闭连接,服务端自然也不再继续读它,而是要关闭对应的套接字。
  3. recv 返回的值小于预定的 buffer 的大小(n &lt; bufsiz),说明内核缓冲区里还剩下 n 个字节数据,都已经读完了,不需要继续读了。

如果 n == bufsiz 说明还可能有数据没读完,就要进入下一个循环。也有可能恰好这次读了 bufsiz 字节的数据,而且再没有数据了,这样下一个循环调用 recv 时就会遇到上述的第一个条件: EAGAINEWOULDBLOCK 错误。

设想一下,如果是用的默认的阻塞套接字,也没有设置超时时间,这次 recv 是得不到 EAGAINEWOULDBLOCK 错误的,程序被一直阻塞在这里,所以 EPOLLET 要和非阻塞套接字配合使用

没读完数据会怎样

使用 ET 模式,一次 EPOLLIN 时间之后,即使没读完数据,也不再通知程序去读数据。一直到下一次网卡又接收到了数据,程序又收到通知,然后调用 recv,这里先读到的是上一次没读完的在内核缓冲区里地数据,或许会产生一些预料之外的问题。

具体的一个试验

场景(S 表示服务端,C 表示客户端)

  • 监听 EPOLLIN|EPOLLET 事件(使用 ET 模式)
  • S 端 每次 recv 接收 5 字节的数据

过程:

  1. C 发送 10 字节数据 “helloworld” 到 S,S 网卡收到这 10 字节数据,EPOLLIN 触发,S recv 5 字节数据”hello”,仍然有 5 字节数据在内核缓冲区内,不再通知
  2. C 又发送 5 字节数据 “12345” 到 S,S 网卡收到这 5 字节数据,EPOLLIN 触发,S 继续 recv 5 字节,收到的是 “world”,上一次发送没从内核缓冲区取完的数据。
  3. C 又发送 3 字节数据 “xyz” 到 S,S 网卡收到这 3 字节数据,EPOLLIN 触发,S 继续 recv 5 字节,收到 “12345”
  4. C 又发生 1 字节数据 “a” 到 S,S 网卡收到这 1 字节数据,EPOLLIN 触发,S recv 之后,recv 返回值为 4,收到的是 “xyza”
  5. C 又发送 10 字节数据 “helloworld” 到 S,S 收到 “hello”
  6. C 关闭连接退出,再次触发 EPOLLIN 事件,S 调用 recv,收到 “world”。

如果是一个 echo 程序,那么客户端每次发送了消息之后,“反射”回来的内容都不是预期的了,并且在上面这个试验中,最后一步,S 错过了关闭套接字的好机会。如果程序有更复杂的业务逻辑已经把人搞得焦头烂额,再出现这个例子的情况,或许会使人痛苦不堪。

另外两点

继续以上面的试验为例,如果因为某些原因,S 一直没去读 C 发送的数据, C 仍然按照上面的方式发送了几次数据,总共 23 个字节的数据,都被放在 S 的内核缓冲区内,这样就只会被触发一次,S 只会收到最初的 5 个字节。

还有一点要提一下。当触发一次 EPOLLIN 之后,在处理过程中,而且这次处理有没有读完的数据,又调用了 epoll_ctl,(重复)设置监听 EPOLLIN 事件,那么 EPOLLIN 是会被再次触发的。

比如在最开始的代码中这样修改

int handle_recv(int epfd, int fd)
{
    ....
    ....
    set_read_event(epfd, fd);
    return send(fd, buf, read, 0);
}

处理完这一次 EPOLLIN 事件,又重新设置去监听 EPOLLIN 事件,内核缓冲区里有数据,那就会再来一次 EPOLLIN(好像 LT 模式),这是个奇怪的用法。这里只是为了描述重复设置 EPOLLIN 事件会出现的情况。