s3-prefork: Add parent->client messaging
[metze/samba/wip.git] / source3 / lib / server_prefork.c
1 /*
2    Unix SMB/CIFS implementation.
3    Common server globals
4
5    Copyright (C) Simo Sorce <idra@samba.org> 2011
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16
17    You should have received a copy of the GNU General Public License
18    along with this program.  If not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "serverid.h"
23 #include "messages.h"
24 #include "system/time.h"
25 #include "system/shmem.h"
26 #include "system/filesys.h"
27 #include "server_prefork.h"
28 #include "../lib/util/util.h"
29 #include "../lib/util/tevent_unix.h"
30
31 struct prefork_pool {
32
33         int listen_fd_size;
34         int *listen_fds;
35
36         prefork_main_fn_t *main_fn;
37         void *private_data;
38
39         int pool_size;
40         struct pf_worker_data *pool;
41
42         int allowed_clients;
43
44         prefork_sigchld_fn_t *sigchld_fn;
45         void *sigchld_data;
46 };
47
48 static bool prefork_setup_sigchld_handler(struct tevent_context *ev_ctx,
49                                             struct prefork_pool *pfp);
50
51 static int prefork_pool_destructor(struct prefork_pool *pfp)
52 {
53         anonymous_shared_free(pfp->pool);
54         return 0;
55 }
56
57 bool prefork_create_pool(TALLOC_CTX *mem_ctx,
58                          struct tevent_context *ev_ctx,
59                          struct messaging_context *msg_ctx,
60                          int listen_fd_size, int *listen_fds,
61                          int min_children, int max_children,
62                          prefork_main_fn_t *main_fn, void *private_data,
63                          struct prefork_pool **pf_pool)
64 {
65         struct prefork_pool *pfp;
66         pid_t pid;
67         time_t now = time(NULL);
68         size_t data_size;
69         int ret;
70         int i;
71         bool ok;
72
73         pfp = talloc_zero(mem_ctx, struct prefork_pool);
74         if (!pfp) {
75                 DEBUG(1, ("Out of memory!\n"));
76                 return false;
77         }
78         pfp->listen_fd_size = listen_fd_size;
79         pfp->listen_fds = talloc_array(pfp, int, listen_fd_size);
80         if (!pfp->listen_fds) {
81                 DEBUG(1, ("Out of memory!\n"));
82                 return false;
83         }
84         for (i = 0; i < listen_fd_size; i++) {
85                 pfp->listen_fds[i] = listen_fds[i];
86         }
87         pfp->main_fn = main_fn;
88         pfp->private_data = private_data;
89
90         pfp->pool_size = max_children;
91         data_size = sizeof(struct pf_worker_data) * max_children;
92
93         pfp->pool = anonymous_shared_allocate(data_size);
94         if (pfp->pool == NULL) {
95                 DEBUG(1, ("Failed to mmap memory for prefork pool!\n"));
96                 talloc_free(pfp);
97                 return false;
98         }
99         talloc_set_destructor(pfp, prefork_pool_destructor);
100
101         for (i = 0; i < min_children; i++) {
102
103                 pfp->pool[i].allowed_clients = 1;
104                 pfp->pool[i].started = now;
105
106                 pid = sys_fork();
107                 switch (pid) {
108                 case -1:
109                         DEBUG(1, ("Failed to prefork child n. %d !\n", i));
110                         break;
111
112                 case 0: /* THE CHILD */
113
114                         pfp->pool[i].status = PF_WORKER_ALIVE;
115                         ret = pfp->main_fn(ev_ctx, msg_ctx,
116                                            &pfp->pool[i], i + 1,
117                                            pfp->listen_fd_size,
118                                            pfp->listen_fds,
119                                            pfp->private_data);
120                         exit(ret);
121
122                 default: /* THE PARENT */
123                         pfp->pool[i].pid = pid;
124                         break;
125                 }
126         }
127
128         ok = prefork_setup_sigchld_handler(ev_ctx, pfp);
129         if (!ok) {
130                 DEBUG(1, ("Failed to setup SIGCHLD Handler!\n"));
131                 talloc_free(pfp);
132                 return false;
133         }
134
135         *pf_pool = pfp;
136         return true;
137 }
138
139 /* Provide the new max children number in new_max
140  * (must be larger than current max).
141  * Returns: 0 if all fine
142  *          ENOSPC if mremap fails to expand
143  *          EINVAL if new_max is invalid
144  */
145 int prefork_expand_pool(struct prefork_pool *pfp, int new_max)
146 {
147         struct prefork_pool *pool;
148         size_t old_size;
149         size_t new_size;
150         int ret;
151
152         if (new_max <= pfp->pool_size) {
153                 return EINVAL;
154         }
155
156         old_size = sizeof(struct pf_worker_data) * pfp->pool_size;
157         new_size = sizeof(struct pf_worker_data) * new_max;
158
159         pool = anonymous_shared_resize(&pfp->pool, new_size, false);
160         if (pool == NULL) {
161                 ret = errno;
162                 DEBUG(3, ("Failed to mremap memory (%d: %s)!\n",
163                           ret, strerror(ret)));
164                 return ret;
165         }
166
167         memset(&pool[pfp->pool_size], 0, new_size - old_size);
168
169         pfp->pool_size = new_max;
170
171         return 0;
172 }
173
174 int prefork_add_children(struct tevent_context *ev_ctx,
175                          struct messaging_context *msg_ctx,
176                          struct prefork_pool *pfp,
177                          int num_children)
178 {
179         pid_t pid;
180         time_t now = time(NULL);
181         int ret;
182         int i, j;
183
184         for (i = 0, j = 0; i < pfp->pool_size && j < num_children; i++) {
185
186                 if (pfp->pool[i].status != PF_WORKER_NONE) {
187                         continue;
188                 }
189
190                 pfp->pool[i].allowed_clients = 1;
191                 pfp->pool[i].started = now;
192
193                 pid = sys_fork();
194                 switch (pid) {
195                 case -1:
196                         DEBUG(1, ("Failed to prefork child n. %d !\n", j));
197                         break;
198
199                 case 0: /* THE CHILD */
200
201                         pfp->pool[i].status = PF_WORKER_ALIVE;
202                         ret = pfp->main_fn(ev_ctx, msg_ctx,
203                                            &pfp->pool[i], i + 1,
204                                            pfp->listen_fd_size,
205                                            pfp->listen_fds,
206                                            pfp->private_data);
207
208                         pfp->pool[i].status = PF_WORKER_EXITING;
209                         exit(ret);
210
211                 default: /* THE PARENT */
212                         pfp->pool[i].pid = pid;
213                         j++;
214                         break;
215                 }
216         }
217
218         DEBUG(5, ("Added %d children!\n", j));
219
220         return j;
221 }
222
223 struct prefork_oldest {
224         int num;
225         time_t started;
226 };
227
228 /* sort in inverse order */
229 static int prefork_sort_oldest(const void *ap, const void *bp)
230 {
231         const struct prefork_oldest *a = (const struct prefork_oldest *)ap;
232         const struct prefork_oldest *b = (const struct prefork_oldest *)bp;
233
234         if (a->started == b->started) {
235                 return 0;
236         }
237         if (a->started < b->started) {
238                 return 1;
239         }
240         return -1;
241 }
242
243 int prefork_retire_children(struct prefork_pool *pfp,
244                             int num_children, time_t age_limit)
245 {
246         time_t now = time(NULL);
247         struct prefork_oldest *oldest;
248         int i, j;
249
250         oldest = talloc_array(pfp, struct prefork_oldest, pfp->pool_size);
251         if (!oldest) {
252                 return -1;
253         }
254
255         for (i = 0; i < pfp->pool_size; i++) {
256                 oldest[i].num = i;
257                 if (pfp->pool[i].status == PF_WORKER_ALIVE ||
258                     pfp->pool[i].status == PF_WORKER_ACCEPTING) {
259                         oldest[i].started = pfp->pool[i].started;
260                 } else {
261                         oldest[i].started = now;
262                 }
263         }
264
265         qsort(oldest, pfp->pool_size,
266                 sizeof(struct prefork_oldest),
267                 prefork_sort_oldest);
268
269         for (i = 0, j = 0; i < pfp->pool_size && j < num_children; i++) {
270                 if ((pfp->pool[i].status == PF_WORKER_ALIVE ||
271                      pfp->pool[i].status == PF_WORKER_ACCEPTING) &&
272                     pfp->pool[i].started <= age_limit) {
273                         /* tell the child it's time to give up */
274                         DEBUG(5, ("Retiring pid %d!\n", pfp->pool[i].pid));
275                         pfp->pool[i].cmds = PF_SRV_MSG_EXIT;
276                         kill(pfp->pool[i].pid, SIGHUP);
277                         j++;
278                 }
279         }
280
281         return j;
282 }
283
284 int prefork_count_children(struct prefork_pool *pfp, int *active)
285 {
286         int i, a, t;
287
288         a = 0;
289         t = 0;
290         for (i = 0; i < pfp->pool_size; i++) {
291                 if (pfp->pool[i].status == PF_WORKER_NONE) {
292                         continue;
293                 }
294
295                 t++;
296
297                 if ((pfp->pool[i].status == PF_WORKER_EXITING) ||
298                     (pfp->pool[i].num_clients <= 0)) {
299                         continue;
300                 }
301
302                 a++;
303         }
304
305         if (active) {
306                 *active = a;
307         }
308         return t;
309 }
310
311 static void prefork_cleanup_loop(struct prefork_pool *pfp)
312 {
313         int status;
314         pid_t pid;
315         int i;
316
317         /* TODO: should we use a process group id wait instead of looping ? */
318         for (i = 0; i < pfp->pool_size; i++) {
319                 if (pfp->pool[i].status == PF_WORKER_NONE ||
320                     pfp->pool[i].pid == 0) {
321                         continue;
322                 }
323
324                 pid = sys_waitpid(pfp->pool[i].pid, &status, WNOHANG);
325                 if (pid > 0) {
326
327                         if (pfp->pool[i].status != PF_WORKER_EXITING) {
328                                 DEBUG(3, ("Child (%d) terminated abnormally:"
329                                           " %d\n", (int)pid, status));
330                         } else {
331                                 DEBUG(10, ("Child (%d) terminated with status:"
332                                            " %d\n", (int)pid, status));
333                         }
334
335                         /* reset all fields,
336                          * this makes status = PF_WORK_NONE */
337                         memset(&pfp->pool[i], 0,
338                                 sizeof(struct pf_worker_data));
339                 }
340         }
341
342 }
343
344 int prefork_count_allowed_connections(struct prefork_pool *pfp)
345 {
346         int c;
347         int i;
348
349         c = 0;
350         for (i = 0; i < pfp->pool_size; i++) {
351                 if (pfp->pool[i].status == PF_WORKER_NONE ||
352                     pfp->pool[i].status == PF_WORKER_EXITING) {
353                         continue;
354                 }
355
356                 if (pfp->pool[i].num_clients < 0) {
357                         continue;
358                 }
359
360                 c += pfp->pool[i].allowed_clients - pfp->pool[i].num_clients;
361         }
362
363         return c;
364 }
365
366 void prefork_increase_allowed_clients(struct prefork_pool *pfp, int max)
367 {
368         int i;
369
370         for (i = 0; i < pfp->pool_size; i++) {
371                 if (pfp->pool[i].status == PF_WORKER_NONE ||
372                     pfp->pool[i].status == PF_WORKER_EXITING) {
373                         continue;
374                 }
375
376                 if (pfp->pool[i].num_clients < 0) {
377                         continue;
378                 }
379
380                 if (pfp->pool[i].allowed_clients < max) {
381                         pfp->pool[i].allowed_clients++;
382                 }
383         }
384 }
385
386 void prefork_decrease_allowed_clients(struct prefork_pool *pfp)
387 {
388         int i;
389
390         for (i = 0; i < pfp->pool_size; i++) {
391                 if (pfp->pool[i].status == PF_WORKER_NONE ||
392                     pfp->pool[i].status == PF_WORKER_EXITING) {
393                         continue;
394                 }
395
396                 if (pfp->pool[i].num_clients < 0) {
397                         continue;
398                 }
399
400                 if (pfp->pool[i].allowed_clients > 1) {
401                         pfp->pool[i].allowed_clients--;
402                 }
403         }
404 }
405
406 void prefork_reset_allowed_clients(struct prefork_pool *pfp)
407 {
408         int i;
409
410         for (i = 0; i < pfp->pool_size; i++) {
411                 pfp->pool[i].allowed_clients = 1;
412         }
413 }
414
415 void prefork_send_signal_to_all(struct prefork_pool *pfp, int signal_num)
416 {
417         int i;
418
419         for (i = 0; i < pfp->pool_size; i++) {
420                 if (pfp->pool[i].status == PF_WORKER_NONE) {
421                         continue;
422                 }
423
424                 kill(pfp->pool[i].pid, signal_num);
425         }
426 }
427
428 void prefork_warn_active_children(struct messaging_context *msg_ctx,
429                                   struct prefork_pool *pfp)
430 {
431         const DATA_BLOB ping = data_blob_null;
432         int i;
433
434         for (i = 0; i < pfp->pool_size; i++) {
435                 if (pfp->pool[i].status == PF_WORKER_NONE) {
436                         continue;
437                 }
438
439                 messaging_send(msg_ctx,
440                                 pid_to_procid(pfp->pool[i].pid),
441                                 MSG_PREFORK_PARENT_EVENT, &ping);
442         }
443 }
444
445 static void prefork_sigchld_handler(struct tevent_context *ev_ctx,
446                                     struct tevent_signal *se,
447                                     int signum, int count,
448                                     void *siginfo, void *pvt)
449 {
450         struct prefork_pool *pfp;
451
452         pfp = talloc_get_type_abort(pvt, struct prefork_pool);
453
454         /* run the cleanup function to make sure all dead children are
455          * properly and timely retired. */
456         prefork_cleanup_loop(pfp);
457
458         if (pfp->sigchld_fn) {
459                 pfp->sigchld_fn(ev_ctx, pfp, pfp->sigchld_data);
460         }
461 }
462
463 static bool prefork_setup_sigchld_handler(struct tevent_context *ev_ctx,
464                                           struct prefork_pool *pfp)
465 {
466         struct tevent_signal *se;
467
468         se = tevent_add_signal(ev_ctx, pfp, SIGCHLD, 0,
469                                 prefork_sigchld_handler, pfp);
470         if (!se) {
471                 DEBUG(0, ("Failed to setup SIGCHLD handler!\n"));
472                 return false;
473         }
474
475         return true;
476 }
477
478 void prefork_set_sigchld_callback(struct prefork_pool *pfp,
479                                   prefork_sigchld_fn_t *sigchld_fn,
480                                   void *private_data)
481 {
482         pfp->sigchld_fn = sigchld_fn;
483         pfp->sigchld_data = private_data;
484 }
485
486 /* ==== Functions used by children ==== */
487
488 struct pf_listen_state {
489         struct tevent_context *ev;
490         struct pf_worker_data *pf;
491
492         int listen_fd_size;
493         int *listen_fds;
494
495         int accept_fd;
496
497         struct tsocket_address *srv_addr;
498         struct tsocket_address *cli_addr;
499
500         int error;
501 };
502
503 struct pf_listen_ctx {
504         TALLOC_CTX *fde_ctx;
505         struct tevent_req *req;
506         int listen_fd;
507 };
508
509 static void prefork_listen_accept_handler(struct tevent_context *ev,
510                                           struct tevent_fd *fde,
511                                           uint16_t flags, void *pvt);
512
513 struct tevent_req *prefork_listen_send(TALLOC_CTX *mem_ctx,
514                                         struct tevent_context *ev,
515                                         struct pf_worker_data *pf,
516                                         int listen_fd_size,
517                                         int *listen_fds)
518 {
519         struct tevent_req *req;
520         struct pf_listen_state *state;
521         struct pf_listen_ctx *ctx;
522         struct tevent_fd *fde;
523         TALLOC_CTX *fde_ctx;
524         int i;
525
526         req = tevent_req_create(mem_ctx, &state, struct pf_listen_state);
527         if (!req) {
528                 return NULL;
529         }
530
531         state->ev = ev;
532         state->pf = pf;
533         state->listen_fd_size = listen_fd_size;
534         state->listen_fds = listen_fds;
535         state->accept_fd = -1;
536         state->error = 0;
537
538         fde_ctx = talloc_new(state);
539         if (tevent_req_nomem(fde_ctx, req)) {
540                 return tevent_req_post(req, ev);
541         }
542
543         /* race on accept */
544         for (i = 0; i < state->listen_fd_size; i++) {
545                 ctx = talloc(fde_ctx, struct pf_listen_ctx);
546                 if (tevent_req_nomem(ctx, req)) {
547                         return tevent_req_post(req, ev);
548                 }
549                 ctx->fde_ctx = fde_ctx;
550                 ctx->req = req;
551                 ctx->listen_fd = state->listen_fds[i];
552
553                 fde = tevent_add_fd(state->ev, fde_ctx,
554                                     ctx->listen_fd, TEVENT_FD_READ,
555                                     prefork_listen_accept_handler, ctx);
556                 if (tevent_req_nomem(fde, req)) {
557                         return tevent_req_post(req, ev);
558                 }
559         }
560
561         pf->status = PF_WORKER_ACCEPTING;
562
563         return req;
564 }
565
566 static void prefork_listen_accept_handler(struct tevent_context *ev,
567                                           struct tevent_fd *fde,
568                                           uint16_t flags, void *pvt)
569 {
570         struct pf_listen_state *state;
571         struct tevent_req *req;
572         struct pf_listen_ctx *ctx;
573         struct sockaddr_storage addr;
574         socklen_t addrlen;
575         int soerr = 0;
576         socklen_t solen = sizeof(soerr);
577         int sd = -1;
578         int ret;
579
580         ctx = talloc_get_type_abort(pvt, struct pf_listen_ctx);
581         req = ctx->req;
582         state = tevent_req_data(ctx->req, struct pf_listen_state);
583
584         if (state->pf->cmds == PF_SRV_MSG_EXIT) {
585                 /* We have been asked to exit, so drop here and the next
586                  * child will pick it up */
587                 if (state->pf->num_clients <= 0) {
588                         state->pf->status = PF_WORKER_EXITING;
589                 }
590                 state->error = EINTR;
591                 goto done;
592         }
593
594         /* before proceeding check that the listening fd is ok */
595         ret = getsockopt(ctx->listen_fd, SOL_SOCKET, SO_ERROR, &soerr, &solen);
596         if (ret == -1) {
597                 /* this is a fatal error, we cannot continue listening */
598                 state->error = EBADF;
599                 goto done;
600         }
601         if (soerr != 0) {
602                 /* this is a fatal error, we cannot continue listening */
603                 state->error = soerr;
604                 goto done;
605         }
606
607         ZERO_STRUCT(addr);
608         addrlen = sizeof(addr);
609         sd = accept(ctx->listen_fd, (struct sockaddr *)&addr, &addrlen);
610         if (sd == -1) {
611                 state->error = errno;
612                 DEBUG(6, ("Accept failed! (%d, %s)\n",
613                           state->error, strerror(state->error)));
614                 goto done;
615         }
616
617         state->accept_fd = sd;
618
619         ret = tsocket_address_bsd_from_sockaddr(state,
620                                         (struct sockaddr *)(void *)&addr,
621                                         addrlen, &state->cli_addr);
622         if (ret < 0) {
623                 state->error = errno;
624                 goto done;
625         }
626
627         ZERO_STRUCT(addr);
628         addrlen = sizeof(addr);
629         ret = getsockname(sd, (struct sockaddr *)(void *)&addr, &addrlen);
630         if (ret < 0) {
631                 state->error = errno;
632                 goto done;
633         }
634
635         ret = tsocket_address_bsd_from_sockaddr(state,
636                                         (struct sockaddr *)(void *)&addr,
637                                         addrlen, &state->srv_addr);
638         if (ret < 0) {
639                 state->error = errno;
640                 goto done;
641         }
642
643 done:
644         /* do not track the listen fds anymore */
645         talloc_free(ctx->fde_ctx);
646         tevent_req_done(req);
647 }
648
649 int prefork_listen_recv(struct tevent_req *req,
650                         TALLOC_CTX *mem_ctx, int *fd,
651                         struct tsocket_address **srv_addr,
652                         struct tsocket_address **cli_addr)
653 {
654         struct pf_listen_state *state;
655         int ret = 0;
656
657         state = tevent_req_data(req, struct pf_listen_state);
658
659         if (state->error) {
660                 ret = state->error;
661         } else {
662                 tevent_req_is_unix_error(req, &ret);
663         }
664
665         if (ret) {
666                 if (state->accept_fd != -1) {
667                         close(state->accept_fd);
668                 }
669         } else {
670                 *fd = state->accept_fd;
671                 *srv_addr = talloc_move(mem_ctx, &state->srv_addr);
672                 *cli_addr = talloc_move(mem_ctx, &state->cli_addr);
673                 state->pf->num_clients++;
674         }
675         if (state->pf->status == PF_WORKER_ACCEPTING) {
676                 state->pf->status = PF_WORKER_ALIVE;
677         }
678
679         tevent_req_received(req);
680         return ret;
681 }