UNIX网络编程从零起步(4): 预先创建线程(pre-threaded)服务器端之主线程accept

虽然per thread的并发模型对中小程度的应用来说是足够, 但是我们需要考虑更大规模的应用. 很显然, 每个连接都创建一个线程, 就会有创建线程的延迟. 当链接数目过多时, 这种延迟就显现出来了. 而且每一个线程在默认下系统都会分配2M(32位的Linux系统如此)的线程栈空间, 当服务器内存有限时, 服务器也不可能同时运行太多的线程.

延迟的问题, 解决的办法就是在客户端发起连接之前就预先创建好一个工作线程池, 当有连接发生时, 线程池中的一个线程来处理这个连接. 我们的第一个pre-threaded的模型是最简单的主线程accept, 然后将cfd分配一个工作线程. 代码如下:
server3.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
#include "server.h"
 
#define MAXCLI 512
#define NTHREADS 128
 
int fd;
int cfd_queue[MAXCLI], head = 0, tail = 0;
pthread_mutex_t cfd_queue_mtx = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cfd_queue_cond = PTHREAD_COND_INITIALIZER;
 
void *work_thr(void *);
 
 
int
main(int argc, char *argv[])
{
    int i;
    int lfd, cfd;
    char *logpath = (char *) malloc(PATHSIZE);
    char *servname = (char *) malloc(SERVNAMESIZE);
    pthread_t tid;
 
    if (argc == 1) {
        snprintf(logpath, PATHSIZE, "server.log");
        snprintf(servname, SERVNAMESIZE, "5000");
    } else if (argc == 2) {
        snprintf(logpath, PATHSIZE, "server.log");
        snprintf(servname, SERVNAMESIZE, "%s", argv[1]);
    } else if (argc == 3) {
        snprintf(logpath, PATHSIZE, "%s", argv[1]);
        snprintf(servname, SERVNAMESIZE, "%s", argv[2]);
    } else {
        fprintf(stderr, "Usage:\n");
        fprintf(stderr, "%s [log file path] [server port]\n", argv[0]);
        exit(EXIT_FAILURE);
    }
 
    if ((fd = open_logfile(logpath)) < 0) {
        fprintf(stderr, "open_logfd error.\n");
        exit(EXIT_FAILURE);
    }
 
    if ((lfd = tcp_listen(NULL, servname, NULL, NULL)) < 0) {
        fprintf(stderr, "tcp_listen error.\n");
        exit(EXIT_FAILURE);
    }
 
    for (i = 0; i < NTHREADS; i++)
        pthread_create(&tid, NULL, work_thr, NULL);
 
    for (;;) {
        cfd = accept(lfd, NULL, NULL);
        pthread_mutex_lock(&cfd_queue_mtx);
        cfd_queue[tail] = cfd;
        if (++tail == MAXCLI)
            tail = 0;
        if (tail == head) {
            fprintf(stderr, "socket connecting queue is full.\n");
            exit(EXIT_FAILURE);
        }
        pthread_cond_signal(&cfd_queue_cond);
        pthread_mutex_unlock(&cfd_queue_mtx);
    }
    free(logpath);
    free(servname);
    return 0;
}
 
void *
work_thr(void *arg)
{
    pthread_detach(pthread_self()); 
    int cfd;
    ssize_t n;
 
    char *logbuf = (char *) malloc(LOGSIZE);
 
    for (; ;) {
        pthread_mutex_lock(&cfd_queue_mtx);
        while (head == tail)
            pthread_cond_wait(&cfd_queue_cond, &cfd_queue_mtx);
        cfd = cfd_queue[head];
        if (++head == MAXCLI)
            head = 0;
        pthread_mutex_unlock(&cfd_queue_mtx);
 
        for (; ;) {
            memset(logbuf, '\0', LOGSIZE);
            n = readn(cfd, logbuf, LOGSIZE);
            if (n != LOGSIZE && n != 0) {
                fprintf(stderr, "readn error.\n");
                break;
            } else if (n == 0) {
                fprintf(stderr, "connection is closed.\n");
                break;
            } else { 
                write(fd, logbuf, strlen(logbuf));
            }
        }
        close(cfd);
    }
    free(logbuf);
    return NULL;
}

线程是在连接之前创建, 所以在第49行和per thread不一样, cfd不可能在创建线程的时候通过pthread_create传递. 在第52行连接发生的时候, 主线程必须要将处理连接委派给一个工作子线程. 我们希望大致上先发起的连接先处理, 所以通过全局的共享队列的数据结构来委派任务最为合适. 我们使用最简单的用数组实现队列(见第7行). 主线程是生产者, 向cfd队列里添加任务;线程池的工作线程是消费者, 从cfd队列里取出任务并处理. 当队列为空时, 子线程就挂起等待新的任务(见81行); 而当连接发起时, 若还有子线程挂起, 主线程只唤醒其中的一个(见61行).

编译server3.c

gcc server3.c tcp_listen.c readn.c open_logfile.c -lpthread -Wall -O2 -o serv3

用测试脚本测试结果与per thread类似.

此条目发表在计算机与网络技术分类目录,贴了, , 标签。将固定链接加入收藏夹。

发表评论

电子邮件地址不会被公开。 必填项已用*标注

This site uses Akismet to reduce spam. Learn how your comment data is processed.