UNIX网络编程从零起步(6): 高并发非阻塞服务器端

尽管我们已经通过pre-threaded的手段提进一步提高了并发服务器端程序的性能, 但是它们仍然不适用于需要高并发的环境. 由于采用的是同步I/O的模型, 线程池里的每个工作线程一次迭代只能处理一个连接. 但是日志记录服务器又要提供长连接功能, 这就导致当长连接数超过预先定义的工作线程数目时, 服务器无法处理之后的连接. 增加线程数目除了耗费多的内存并不能解决这个问题. 唯一的解决办法就是使用非阻塞的I/O模型, 让工作线程在一次迭代中能处理很多的连接.

检查server4.c中的work_thr函数, 我们发现导致线程阻塞的两个地方是调用accept和read(暂时不考虑write可能导致的阻塞, 认为cfd永远都是write ready的). 因此必须将lfd和cfd都设置为NONBLOCK. POSIX提供I/O复用的两个接口select和poll. 前面提到过多个线程同时调用select监视同一个fd会带来更严重问题. 此外, 无论是select还是poll, 其性能随着监视文件描述符的增加而急剧下降(O(N2)). 要编写能够同时处理大量连接(~10k+)的服务器, 用POSIX提供的接口是无法满足这样的要求的.所幸的是基本上UNIX/UNIX-like的操作系统几乎都各自提供了系统相关的I/O复用系统调用, 如freeBSD的kqueue, Solaris的/dev/poll, Linux的epoll, 也有将这些系统相关的接口包装起来的库, 如libevent. 我们的终极服务器端程序直接采用epoll, 当然它也只能运行在LINUX的操作系统上(内核版本大于等于2.6).

服务器server5.c代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
#include "server.h"
#include <sys/epoll.h>
 
#define MAXEVENTS 10000
#define NTHREADS 4
 
int fd;
int lfd;
pthread_mutex_t lfd_mtx = PTHREAD_MUTEX_INITIALIZER;
 
void *work_thr(void *);
 
int
main(int argc, char *argv[])
{
    int i;
    char *logpath = (char *) malloc(PATHSIZE);
    char *servname = (char *) malloc(SERVNAMESIZE);
    pthread_t tid[NTHREADS];
    int flags;
 
    if (argc == 1) {
        snprintf(logpath, PATHSIZE, "server.log");
        snprintf(servname, SERVNAMESIZE, "5000");
    } else if (argc == 2) {
        snprintf(logpath, PATHSIZE, "server.log");
        snprintf(servname, SERVNAMESIZE, "%s", argv[1]);
    } else if (argc == 3) {
        snprintf(logpath, PATHSIZE, "%s", argv[1]);
        snprintf(servname, SERVNAMESIZE, "%s", argv[2]);
    } else {
        fprintf(stderr, "Usage:\n");
        fprintf(stderr, "%s [log file path] [server port]\n", argv[0]);
        exit(EXIT_FAILURE);
    }
 
    if ((fd = open_logfile(logpath)) < 0) {
        fprintf(stderr, "open_logfd error.\n");
        exit(EXIT_FAILURE);
    }
 
    if ((lfd = tcp_listen(NULL, servname, NULL, NULL)) < 0) {
        fprintf(stderr, "tcp_listen error.\n");
        exit(EXIT_FAILURE);
    }
 
    flags = fcntl(lfd, F_GETFL);
    fcntl(lfd, F_SETFL, flags | O_NONBLOCK);
 
    for (i = 0; i < NTHREADS; i++)
        pthread_create(&tid[i], NULL, work_thr, NULL);
 
    for (i = 0; i < NTHREADS; i++) 
        pthread_join(tid[i], NULL);
 
    free(logpath);
    free(servname);
    return 0;
}
 
void *
work_thr(void *arg)
{
    int cfd, efd;
    ssize_t n;
    int epfd;
    struct epoll_event event;
    struct epoll_event *events = (struct epoll_event *) malloc(sizeof(struct epoll_event));
    char *logbuf = (char *) malloc(LOGSIZE);
    int ret;
    int i;
    int flags;
    int ncfds = 0;
    int timeout;
 
    epfd = epoll_create(10000);
    event.data.fd = lfd;
    event.events = EPOLLIN | EPOLLONESHOT;
    epoll_ctl(epfd, EPOLL_CTL_ADD, lfd, &event);
 
    for (; ;) {
        if (ncfds == 0)
            timeout = 10000000;
        else
            timeout = 1;
 
        ret = epoll_wait(epfd, events, MAXEVENTS, timeout);
        for (i = 0; i < ret; i++) {
            efd = events[i].data.fd;
            if (lfd == efd) {
                pthread_mutex_lock(&lfd_mtx);
                for (; ;) {
                    if ((cfd = accept(lfd, NULL, NULL)) >= 0) {
                        flags = fcntl(cfd, F_GETFL);
                        fcntl(cfd, F_SETFL, flags | O_NONBLOCK);
                        event.data.fd = cfd;
                        event.events = EPOLLIN;
                        epoll_ctl(epfd, EPOLL_CTL_ADD, cfd, &event);
                        ncfds++;
                    } else {
                        if (errno == EAGAIN || errno == EWOULDBLOCK) {
                            break;
                        }
                    }
                }
                event.data.fd = lfd;
                event.events = EPOLLIN | EPOLLONESHOT;
                epoll_ctl(epfd, EPOLL_CTL_MOD, lfd, &event);
                pthread_mutex_unlock(&lfd_mtx);
            } else {
                cfd = efd;
                memset(logbuf, '\0', LOGSIZE);
                n = readn(cfd, logbuf, LOGSIZE);
                if (n == LOGSIZE) {
                    write(fd, logbuf, strlen(logbuf));
                } else {
                    ncfds--;
                    epoll_ctl(epfd, EPOLL_CTL_DEL, cfd, &event);
                    close(cfd);
                }
            }
        }
    }
 
    free(events);
    free(logbuf);
    close(epfd);
    return NULL;
}

主函数main和server4.c一模一样, 只是在第47行将lfd设置成非阻塞模式. 当客户端发起连接时, lfd会被标记为可读, 因此在第78行添加监视lfd可读. 同时也设置了EPOLLONESHOT, 这是因为epoll默认水平触发(level-triger), 只要有一个连接发起, 不管后续是否还有连接, lfd永远可读. EPOLLONESHOT让lfd可读事件触发一次后就从epoll中撤销(不是删除). epoll通知有事件发生后, 从第88行开始处理事件. 如果有连接发生, 那么我们就调用accept. 需要提醒的是, 从epoll返回(87行)到探测到有连接发起(90行), 可能连接被发起了很多次, 为了不丢掉任何一个发起的连接, 在第92行需要循环调用accept, 直到完成所有的连接, 也就是accept非阻塞lfd返回-1并且errno被设置为EAGAIN或者EWOULDBLOCK为止. 同时, 如果cfd不小于0, 把cfd也把设置成非阻塞并加到epoll监视中(94行). cfd同样也是默认水平触发, 这是因为要支持长连接. 如果采用边缘触发, 就要一直读到客户端终止连接, 这又回到了阻塞模型上去了. accept完了不要忘了lfd重新加到epoll中(108行, 注意不是EPOLL_CTL_ADD而是EPOLL_CTL_MOD). epoll同样有惊群现象, 它的帮助手册明确说明如果多个线程用epoll同时监视同一个fd的同一个事件, 有当事件发生时, 所有的epoll_wait都会返回. 这里, 我们忽略掉epoll的惊群, 但是为了性能, 仍然在accept上加锁. 处理客户端连接部分比较简单, 可读的时候就读取, 客户端连接关闭, 服务器端也被动关闭. 在第118行将要关闭的cfd从epoll中删除. 这步并不是必须, 因为关闭一个文件描述符后, 它会被自动从epoll重删除.
编译server5.c

gcc server5.c tcp_listen.c readn.c open_logfile.c -lpthread -Wall -O2 -o serv5

至此我们最后的多线程异步日志记录服务器程序完成. 这个程序是在目前操作系统运行的最高性能的服务器端. 接下来要做的事情只是在它的基础上添添补补, 加一些辅助的功能让它更为实用一些.

此外, 日志一部服务器程序也用kqueue实现, 并在MAC OS X系统下测试, server5b.c:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
#include "server.h"
#include <sys/event.h>
#include <sys/types.h>
#include <sys/time.h>
 
#define MAXEVENTS 10000
#define NTHREADS 4
 
#define IN_USE     1
#define NOT_IN_USE 0
 
int fd;
int lfd;
pthread_mutex_t lfd_mtx = PTHREAD_MUTEX_INITIALIZER;
 
void *work_thr(void *);
 
int
main(int argc, char *argv[])
{
    int i;
    char *logpath = (char *) malloc(PATHSIZE);
    char *servname = (char *) malloc(SERVNAMESIZE);
    pthread_t tid[NTHREADS];
    int flags;
 
    if (argc == 1) {
        snprintf(logpath, PATHSIZE, "server.log");
        snprintf(servname, SERVNAMESIZE, "5000");
    } else if (argc == 2) {
        snprintf(logpath, PATHSIZE, "server.log");
        snprintf(servname, SERVNAMESIZE, "%s", argv[1]);
    } else if (argc == 3) {
        snprintf(logpath, PATHSIZE, "%s", argv[1]);
        snprintf(servname, SERVNAMESIZE, "%s", argv[2]);
    } else {
        fprintf(stderr, "Usage:\n");
        fprintf(stderr, "%s [log file path] [server port]\n", argv[0]);
        exit(EXIT_FAILURE);
    }
 
    if ((fd = open_logfile(logpath)) < 0) {
        fprintf(stderr, "open_logfd error.\n");
        exit(EXIT_FAILURE);
    }
 
    if ((lfd = tcp_listen(NULL, servname, NULL, NULL)) < 0) {
        fprintf(stderr, "tcp_listen error.\n");
        exit(EXIT_FAILURE);
    }
 
    flags = fcntl(lfd, F_GETFL);
    fcntl(lfd, F_SETFL, flags | O_NONBLOCK);
 
    for (i = 0; i < NTHREADS; i++)
        pthread_create(&tid[i], NULL, work_thr, NULL);
 
    for (i = 0; i < NTHREADS; i++) 
        pthread_join(tid[i], NULL);
 
    free(logpath);
    free(servname);
    return 0;
}
 
int
find_idx(const struct kevent *changelist, int nchanges, int ident)
{
    int i, ret = nchanges;
    for (i = 1; i < nchanges; i++) {
        if (changelist[i].ident == ident) {
            ret = i;
            break;
        }
    }
    return ret;
}
 
void *
work_thr(void *arg)
{
    int cfd, kfd;
    ssize_t n;
    int kqfd;
    struct kevent *changelist = (struct kevent *) malloc(sizeof(struct kevent) * MAXEVENTS);
    struct kevent *eventlist = (struct kevent *) malloc(sizeof(struct kevent) * MAXEVENTS);
    char *logbuf = (char *) malloc(LOGSIZE);
    int ret;
    int i;
    int flags;
    int ncfds = 0;
    struct timespec timeout;
 
    int idx;
    kqfd = kqueue();
    for (i = 0; i < MAXEVENTS; i++)
        EV_SET(&changelist[i], lfd, EVFILT_READ, EV_ADD | EV_ONESHOT, EV_ADD | EV_ONESHOT, 0, NULL);
 
    for (; ;) {
        if (ncfds == 0) {
            timeout.tv_sec = 10000;
            timeout.tv_nsec = 0;
        }
        else {
            timeout.tv_sec = 1;
            timeout.tv_nsec = 1000000;
        }
 
        ret = kevent(kqfd, changelist, MAXEVENTS, eventlist, MAXEVENTS, &timeout);
 
        for (i = 0; i < ret; i++) {
            kfd = eventlist[i].ident;
            if (lfd == kfd) {
                pthread_mutex_lock(&lfd_mtx);
                for (; ;) {
                    if ((cfd = accept(lfd, NULL, NULL)) >= 0) {
                        flags = fcntl(cfd, F_GETFL);
                        fcntl(cfd, F_SETFL, flags | O_NONBLOCK);
                        ncfds++;
                        idx = find_idx(changelist, MAXEVENTS, lfd);
                        EV_SET(&changelist[idx], cfd, EVFILT_READ, EV_ADD, 0, 0, NULL);
                        if (ncfds > MAXEVENTS - 1)
                            break;
                    } else {
                        if (errno == EAGAIN || errno == EWOULDBLOCK || errno == ECONNABORTED) {
                            break;
                        }
                    }
                }
                EV_SET(&changelist[0], lfd, EVFILT_READ, EV_ADD | EV_ONESHOT, 0, 0, NULL);
                pthread_mutex_unlock(&lfd_mtx);
            } else {
                cfd = kfd;
                memset(logbuf, '\0', LOGSIZE);
                n = readn(cfd, logbuf, LOGSIZE);
                if (n == LOGSIZE) {
                    write(fd, logbuf, strlen(logbuf));
                } else {
                    ncfds--;
                    idx = find_idx(changelist, MAXEVENTS, cfd);
                    EV_SET(&changelist[idx], cfd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
                    close(cfd);
                    EV_SET(&changelist[idx], lfd, EVFILT_READ, EV_ADD | EV_ONESHOT, 0, 0, NULL);
                }
            }
        }
    }
 
    free(changelist);
    free(eventlist);
    free(logbuf);
    close(kqfd);
    return NULL;
}

kqueue的功能要比epoll丰富一些, 但是API也要复杂一些. 和epoll一样, kqueue默认水平触发(flags字段设置EV_CLEAR则为边缘触发), 也支持ONESHOT. 大体上, server5.c和server5b.c的代码几乎一样. 但是由于kevent(等价于epoll_wait)需要把监视事件列表做为数组(一块连续的内存区域)作为输入, 而明显的由于客户端断开连接是乱序的, 无法保证输入changlist是连续. 解决办法就是客户端cfd断开后用lfd填充(见143行). 能够用这种方法的原因是, 在changelist中如果有多个元素有相同的(ident, filter)对, kevent把他们当做同一个时间处理, 并且只返回最后注册的那个. epoll的API在这点上设计比kqueue要合理一些, kqueue应该设置一个类似kqueue_ctl的接口来注册和注销监视事件.

此条目发表在计算机与网络技术分类目录,贴了, , 标签。将固定链接加入收藏夹。

发表评论

电子邮件地址不会被公开。 必填项已用*标注

This site uses Akismet to reduce spam. Learn how your comment data is processed.