server.c (3823B)
1 /* See LICENSE file for copyright and license details. */ 2 #include <errno.h> 3 #include <pthread.h> 4 #include <stddef.h> 5 #include <stdlib.h> 6 #include <string.h> 7 8 #include "connection.h" 9 #include "queue.h" 10 #include "server.h" 11 #include "util.h" 12 13 struct worker_data { 14 int insock; 15 size_t nslots; 16 const struct server *srv; 17 }; 18 19 static void * 20 server_worker(void *data) 21 { 22 queue_event *event = NULL; 23 struct connection *connection, *c, *newc; 24 struct worker_data *d = (struct worker_data *)data; 25 int qfd; 26 ssize_t nready; 27 size_t i; 28 29 /* allocate connections */ 30 if (!(connection = calloc(d->nslots, sizeof(*connection)))) { 31 die("calloc:"); 32 } 33 34 /* create event queue */ 35 if ((qfd = queue_create()) < 0) { 36 exit(1); 37 } 38 39 /* add insock to the interest list (with data=NULL) */ 40 if (queue_add_fd(qfd, d->insock, QUEUE_EVENT_IN, 1, NULL) < 0) { 41 exit(1); 42 } 43 44 /* allocate event array */ 45 if (!(event = reallocarray(event, d->nslots, sizeof(*event)))) { 46 die("reallocarray:"); 47 } 48 49 for (;;) { 50 /* wait for new activity */ 51 if ((nready = queue_wait(qfd, event, d->nslots)) < 0) { 52 exit(1); 53 } 54 55 /* handle events */ 56 for (i = 0; i < (size_t)nready; i++) { 57 c = queue_event_get_data(&event[i]); 58 59 if (queue_event_is_error(&event[i])) { 60 if (c != NULL) { 61 queue_rem_fd(qfd, c->fd); 62 c->res.status = 0; 63 connection_log(c); 64 connection_reset(c); 65 } 66 67 continue; 68 } 69 70 if (c == NULL) { 71 /* add new connection to the interest list */ 72 if (!(newc = connection_accept(d->insock, 73 connection, 74 d->nslots))) { 75 /* 76 * the socket is either blocking 77 * or something failed. 78 * In both cases, we just carry on 79 */ 80 continue; 81 } 82 83 /* 84 * add event to the interest list 85 * (we want IN, because we start 86 * with receiving the header) 87 */ 88 if (queue_add_fd(qfd, newc->fd, 89 QUEUE_EVENT_IN, 90 0, newc) < 0) { 91 /* not much we can do here */ 92 continue; 93 } 94 } else { 95 /* serve existing connection */ 96 connection_serve(c, d->srv); 97 98 if (c->fd == 0) { 99 /* we are done */ 100 memset(c, 0, sizeof(struct connection)); 101 continue; 102 } 103 104 /* 105 * rearm the event based on the state 106 * we are "stuck" at 107 */ 108 switch(c->state) { 109 case C_RECV_HEADER: 110 if (queue_mod_fd(qfd, c->fd, 111 QUEUE_EVENT_IN, 112 c) < 0) { 113 connection_reset(c); 114 break; 115 } 116 break; 117 case C_SEND_HEADER: 118 case C_SEND_BODY: 119 if (queue_mod_fd(qfd, c->fd, 120 QUEUE_EVENT_OUT, 121 c) < 0) { 122 connection_reset(c); 123 break; 124 } 125 break; 126 default: 127 break; 128 } 129 } 130 } 131 } 132 133 return NULL; 134 } 135 136 void 137 server_init_thread_pool(int insock, size_t nthreads, size_t nslots, 138 const struct server *srv) 139 { 140 pthread_t *thread = NULL; 141 struct worker_data *d = NULL; 142 size_t i; 143 144 /* allocate worker_data structs */ 145 if (!(d = reallocarray(d, nthreads, sizeof(*d)))) { 146 die("reallocarray:"); 147 } 148 for (i = 0; i < nthreads; i++) { 149 d[i].insock = insock; 150 d[i].nslots = nslots; 151 d[i].srv = srv; 152 } 153 154 /* allocate and initialize thread pool */ 155 if (!(thread = reallocarray(thread, nthreads, sizeof(*thread)))) { 156 die("reallocarray:"); 157 } 158 for (i = 0; i < nthreads; i++) { 159 if (pthread_create(&thread[i], NULL, server_worker, &d[i]) != 0) { 160 if (errno == EAGAIN) { 161 die("You need to run as root or have " 162 "CAP_SYS_RESOURCE set, or are trying " 163 "to create more threads than the " 164 "system can offer"); 165 } else { 166 die("pthread_create:"); 167 } 168 } 169 } 170 171 /* wait for threads */ 172 for (i = 0; i < nthreads; i++) { 173 if ((errno = pthread_join(thread[i], NULL))) { 174 warn("pthread_join:"); 175 } 176 } 177 }