ctdb-tests: Implement controls related to tunables in fake_ctdbd
[samba.git] / ctdb / tests / src / fake_ctdbd.c
1 /*
2    Fake CTDB server for testing
3
4    Copyright (C) Amitay Isaacs  2016
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "replace.h"
21 #include "system/network.h"
22 #include "system/time.h"
23
24 #include <popt.h>
25 #include <talloc.h>
26 #include <tevent.h>
27 #include <tdb.h>
28
29 #include "lib/util/dlinklist.h"
30 #include "lib/util/tevent_unix.h"
31 #include "lib/util/debug.h"
32 #include "lib/util/samba_util.h"
33 #include "lib/async_req/async_sock.h"
34
35 #include "protocol/protocol.h"
36 #include "protocol/protocol_api.h"
37
38 #include "common/comm.h"
39 #include "common/system.h"
40 #include "common/logging.h"
41 #include "common/tunable.h"
42
43
44 #define CTDB_PORT 4379
45
46 /* A fake flag that is only supported by some functions */
47 #define NODE_FLAGS_FAKE_TIMEOUT 0x80000000
48
49 struct node {
50         ctdb_sock_addr addr;
51         uint32_t pnn;
52         uint32_t flags;
53         uint32_t capabilities;
54         bool recovery_disabled;
55         void *recovery_substate;
56 };
57
58 struct node_map {
59         uint32_t num_nodes;
60         struct node *node;
61         uint32_t pnn;
62         uint32_t recmaster;
63 };
64
65 struct interface {
66         const char *name;
67         bool link_up;
68         uint32_t references;
69 };
70
71 struct interface_map {
72         int num;
73         struct interface *iface;
74 };
75
76 struct vnn_map {
77         uint32_t recmode;
78         uint32_t generation;
79         uint32_t size;
80         uint32_t *map;
81 };
82
83 struct srvid_register_state {
84         struct srvid_register_state *prev, *next;
85         struct ctdbd_context *ctdb;
86         uint64_t srvid;
87 };
88
89 struct ctdbd_context {
90         struct node_map *node_map;
91         struct interface_map *iface_map;
92         struct vnn_map *vnn_map;
93         struct srvid_register_state *rstate;
94         int num_clients;
95         struct timeval start_time;
96         struct timeval recovery_start_time;
97         struct timeval recovery_end_time;
98         bool takeover_disabled;
99         enum debug_level log_level;
100         enum ctdb_runstate runstate;
101         struct ctdb_tunable_list tun_list;
102 };
103
104 /*
105  * Parse routines
106  */
107
108 static struct node_map *nodemap_init(TALLOC_CTX *mem_ctx)
109 {
110         struct node_map *node_map;
111
112         node_map = talloc_zero(mem_ctx, struct node_map);
113         if (node_map == NULL) {
114                 return NULL;
115         }
116
117         node_map->pnn = CTDB_UNKNOWN_PNN;
118         node_map->recmaster = CTDB_UNKNOWN_PNN;
119
120         return node_map;
121 }
122
123 /* Read a nodemap from stdin.  Each line looks like:
124  *  <PNN> <FLAGS> [RECMASTER] [CURRENT] [CAPABILITIES]
125  * EOF or a blank line terminates input.
126  *
127  * By default, capablities for each node are
128  * CTDB_CAP_RECMASTER|CTDB_CAP_LMASTER.  These 2
129  * capabilities can be faked off by adding, for example,
130  * -CTDB_CAP_RECMASTER.
131  */
132
133 static bool nodemap_parse(struct node_map *node_map)
134 {
135         char line[1024];
136
137         while ((fgets(line, sizeof(line), stdin) != NULL)) {
138                 uint32_t pnn, flags, capabilities;
139                 char *tok, *t;
140                 char *ip;
141                 ctdb_sock_addr saddr;
142                 struct node *node;
143
144                 if (line[0] == '\n') {
145                         break;
146                 }
147
148                 /* Get rid of pesky newline */
149                 if ((t = strchr(line, '\n')) != NULL) {
150                         *t = '\0';
151                 }
152
153                 /* Get PNN */
154                 tok = strtok(line, " \t");
155                 if (tok == NULL) {
156                         fprintf(stderr, "bad line (%s) - missing PNN\n", line);
157                         continue;
158                 }
159                 pnn = (uint32_t)strtoul(tok, NULL, 0);
160
161                 /* Get IP */
162                 tok = strtok(NULL, " \t");
163                 if (tok == NULL) {
164                         fprintf(stderr, "bad line (%s) - missing IP\n", line);
165                         continue;
166                 }
167                 if (!parse_ip(tok, NULL, CTDB_PORT, &saddr)) {
168                         fprintf(stderr, "bad line (%s) - invalid IP\n", line);
169                         continue;
170                 }
171                 ip = talloc_strdup(node_map, tok);
172                 if (ip == NULL) {
173                         goto fail;
174                 }
175
176                 /* Get flags */
177                 tok = strtok(NULL, " \t");
178                 if (tok == NULL) {
179                         fprintf(stderr, "bad line (%s) - missing flags\n",
180                                 line);
181                         continue;
182                 }
183                 flags = (uint32_t)strtoul(tok, NULL, 0);
184                 /* Handle deleted nodes */
185                 if (flags & NODE_FLAGS_DELETED) {
186                         talloc_free(ip);
187                         ip = talloc_strdup(node_map, "0.0.0.0");
188                         if (ip == NULL) {
189                                 goto fail;
190                         }
191                 }
192                 capabilities = CTDB_CAP_RECMASTER|CTDB_CAP_LMASTER;
193
194                 tok = strtok(NULL, " \t");
195                 while (tok != NULL) {
196                         if (strcmp(tok, "CURRENT") == 0) {
197                                 node_map->pnn = pnn;
198                         } else if (strcmp(tok, "RECMASTER") == 0) {
199                                 node_map->recmaster = pnn;
200                         } else if (strcmp(tok, "-CTDB_CAP_RECMASTER") == 0) {
201                                 capabilities &= ~CTDB_CAP_RECMASTER;
202                         } else if (strcmp(tok, "-CTDB_CAP_LMASTER") == 0) {
203                                 capabilities &= ~CTDB_CAP_LMASTER;
204                         } else if (strcmp(tok, "TIMEOUT") == 0) {
205                                 /* This can be done with just a flag
206                                  * value but it is probably clearer
207                                  * and less error-prone to fake this
208                                  * with an explicit token */
209                                 flags |= NODE_FLAGS_FAKE_TIMEOUT;
210                         }
211                         tok = strtok(NULL, " \t");
212                 }
213
214                 node_map->node = talloc_realloc(node_map, node_map->node,
215                                                 struct node,
216                                                 node_map->num_nodes + 1);
217                 if (node_map->node == NULL) {
218                         goto fail;
219                 }
220                 node = &node_map->node[node_map->num_nodes];
221
222                 parse_ip(ip, NULL, CTDB_PORT, &node->addr);
223                 node->pnn = pnn;
224                 node->flags = flags;
225                 node->capabilities = capabilities;
226                 node->recovery_disabled = false;
227                 node->recovery_substate = NULL;
228
229                 node_map->num_nodes += 1;
230         }
231
232         DEBUG(DEBUG_INFO, ("Parsing nodemap done\n"));
233         return true;
234
235 fail:
236         DEBUG(DEBUG_INFO, ("Parsing nodemap failed\n"));
237         return false;
238
239 }
240
241 /* Append a node to a node map with given address and flags */
242 static bool node_map_add(struct ctdb_node_map *nodemap,
243                          const char *nstr, uint32_t flags)
244 {
245         ctdb_sock_addr addr;
246         uint32_t num;
247         struct ctdb_node_and_flags *n;
248
249         if (! parse_ip(nstr, NULL, CTDB_PORT, &addr)) {
250                 fprintf(stderr, "Invalid IP address %s\n", nstr);
251                 return false;
252         }
253
254         num = nodemap->num;
255         nodemap->node = talloc_realloc(nodemap, nodemap->node,
256                                        struct ctdb_node_and_flags, num+1);
257         if (nodemap->node == NULL) {
258                 return false;
259         }
260
261         n = &nodemap->node[num];
262         n->addr = addr;
263         n->pnn = num;
264         n->flags = flags;
265
266         nodemap->num = num+1;
267         return true;
268 }
269
270 /* Read a nodes file into a node map */
271 static struct ctdb_node_map *ctdb_read_nodes_file(TALLOC_CTX *mem_ctx,
272                                                   const char *nlist)
273 {
274         char **lines;
275         int nlines;
276         int i;
277         struct ctdb_node_map *nodemap;
278
279         nodemap = talloc_zero(mem_ctx, struct ctdb_node_map);
280         if (nodemap == NULL) {
281                 return NULL;
282         }
283
284         lines = file_lines_load(nlist, &nlines, 0, mem_ctx);
285         if (lines == NULL) {
286                 return NULL;
287         }
288
289         while (nlines > 0 && strcmp(lines[nlines-1], "") == 0) {
290                 nlines--;
291         }
292
293         for (i=0; i<nlines; i++) {
294                 char *node;
295                 uint32_t flags;
296                 size_t len;
297
298                 node = lines[i];
299                 /* strip leading spaces */
300                 while((*node == ' ') || (*node == '\t')) {
301                         node++;
302                 }
303
304                 len = strlen(node);
305
306                 /* strip trailing spaces */
307                 while ((len > 1) &&
308                        ((node[len-1] == ' ') || (node[len-1] == '\t')))
309                 {
310                         node[len-1] = '\0';
311                         len--;
312                 }
313
314                 if (len == 0) {
315                         continue;
316                 }
317                 if (*node == '#') {
318                         /* A "deleted" node is a node that is
319                            commented out in the nodes file.  This is
320                            used instead of removing a line, which
321                            would cause subsequent nodes to change
322                            their PNN. */
323                         flags = NODE_FLAGS_DELETED;
324                         node = discard_const("0.0.0.0");
325                 } else {
326                         flags = 0;
327                 }
328                 if (! node_map_add(nodemap, node, flags)) {
329                         talloc_free(lines);
330                         TALLOC_FREE(nodemap);
331                         return NULL;
332                 }
333         }
334
335         talloc_free(lines);
336         return nodemap;
337 }
338
339 static struct ctdb_node_map *read_nodes_file(TALLOC_CTX *mem_ctx,
340                                              uint32_t pnn)
341 {
342         struct ctdb_node_map *nodemap;
343         char nodepath[PATH_MAX];
344         const char *nodes_list;
345
346         /* read the nodes file */
347         sprintf(nodepath, "CTDB_NODES_%u", pnn);
348         nodes_list = getenv(nodepath);
349         if (nodes_list == NULL) {
350                 nodes_list = getenv("CTDB_NODES");
351                 if (nodes_list == NULL) {
352                         DEBUG(DEBUG_INFO, ("Nodes file not defined\n"));
353                         return NULL;
354                 }
355         }
356
357         nodemap = ctdb_read_nodes_file(mem_ctx, nodes_list);
358         if (nodemap == NULL) {
359                 DEBUG(DEBUG_INFO, ("Failed to read nodes file \"%s\"\n",
360                                    nodes_list));
361                 return NULL;
362         }
363
364         return nodemap;
365 }
366
367 static struct interface_map *interfaces_init(TALLOC_CTX *mem_ctx)
368 {
369         struct interface_map *iface_map;
370
371         iface_map = talloc_zero(mem_ctx, struct interface_map);
372         if (iface_map == NULL) {
373                 return NULL;
374         }
375
376         return iface_map;
377 }
378
379 /* Read interfaces information.  Same format as "ctdb ifaces -Y"
380  * output:
381  *   :Name:LinkStatus:References:
382  *   :eth2:1:4294967294
383  *   :eth1:1:4294967292
384  */
385
386 static bool interfaces_parse(struct interface_map *iface_map)
387 {
388         char line[1024];
389
390         while ((fgets(line, sizeof(line), stdin) != NULL)) {
391                 uint16_t link_state;
392                 uint32_t references;
393                 char *tok, *t, *name;
394                 struct interface *iface;
395
396                 if (line[0] == '\n') {
397                         break;
398                 }
399
400                 /* Get rid of pesky newline */
401                 if ((t = strchr(line, '\n')) != NULL) {
402                         *t = '\0';
403                 }
404
405                 if (strcmp(line, ":Name:LinkStatus:References:") == 0) {
406                         continue;
407                 }
408
409                 /* Leading colon... */
410                 // tok = strtok(line, ":");
411
412                 /* name */
413                 tok = strtok(line, ":");
414                 if (tok == NULL) {
415                         fprintf(stderr, "bad line (%s) - missing name\n", line);
416                         continue;
417                 }
418                 name = tok;
419
420                 /* link_state */
421                 tok = strtok(NULL, ":");
422                 if (tok == NULL) {
423                         fprintf(stderr, "bad line (%s) - missing link state\n",
424                                 line);
425                         continue;
426                 }
427                 link_state = (uint16_t)strtoul(tok, NULL, 0);
428
429                 /* references... */
430                 tok = strtok(NULL, ":");
431                 if (tok == NULL) {
432                         fprintf(stderr, "bad line (%s) - missing references\n",
433                                 line);
434                         continue;
435                 }
436                 references = (uint32_t)strtoul(tok, NULL, 0);
437
438                 iface_map->iface = talloc_realloc(iface_map, iface_map->iface,
439                                                   struct interface,
440                                                   iface_map->num + 1);
441                 if (iface_map->iface == NULL) {
442                         goto fail;
443                 }
444
445                 iface = &iface_map->iface[iface_map->num];
446
447                 iface->name = talloc_strdup(iface_map, name);
448                 if (iface->name == NULL) {
449                         goto fail;
450                 }
451                 iface->link_up = link_state;
452                 iface->references = references;
453
454                 iface_map->num += 1;
455         }
456
457         DEBUG(DEBUG_INFO, ("Parsing interfaces done\n"));
458         return true;
459
460 fail:
461         fprintf(stderr, "Parsing interfaces failed\n");
462         return false;
463 }
464
465 static struct vnn_map *vnnmap_init(TALLOC_CTX *mem_ctx)
466 {
467         struct vnn_map *vnn_map;
468
469         vnn_map = talloc_zero(mem_ctx, struct vnn_map);
470         if (vnn_map == NULL) {
471                 fprintf(stderr, "Memory error\n");
472                 return NULL;
473         }
474         vnn_map->recmode = CTDB_RECOVERY_ACTIVE;
475         vnn_map->generation = INVALID_GENERATION;
476
477         return vnn_map;
478 }
479
480 /* Read vnn map.
481  * output:
482  *   <GENERATION>
483  *   <LMASTER0>
484  *   <LMASTER1>
485  *   ...
486  */
487
488 static bool vnnmap_parse(struct vnn_map *vnn_map)
489 {
490         char line[1024];
491
492         while (fgets(line, sizeof(line), stdin) != NULL) {
493                 uint32_t n;
494                 char *t;
495
496                 if (line[0] == '\n') {
497                         break;
498                 }
499
500                 /* Get rid of pesky newline */
501                 if ((t = strchr(line, '\n')) != NULL) {
502                         *t = '\0';
503                 }
504
505                 n = (uint32_t) strtol(line, NULL, 0);
506
507                 /* generation */
508                 if (vnn_map->generation == INVALID_GENERATION) {
509                         vnn_map->generation = n;
510                         continue;
511                 }
512
513                 vnn_map->map = talloc_realloc(vnn_map, vnn_map->map, uint32_t,
514                                               vnn_map->size + 1);
515                 if (vnn_map->map == NULL) {
516                         fprintf(stderr, "Memory error\n");
517                         goto fail;
518                 }
519
520                 vnn_map->map[vnn_map->size] = n;
521                 vnn_map->size += 1;
522         }
523
524         DEBUG(DEBUG_INFO, ("Parsing vnnmap done\n"));
525         return true;
526
527 fail:
528         fprintf(stderr, "Parsing vnnmap failed\n");
529         return false;
530 }
531
532 /*
533  * CTDB context setup
534  */
535
536 static uint32_t new_generation(uint32_t old_generation)
537 {
538         uint32_t generation;
539
540         while (1) {
541                 generation = random();
542                 if (generation != INVALID_GENERATION &&
543                     generation != old_generation) {
544                         break;
545                 }
546         }
547
548         return generation;
549 }
550
551 static struct ctdbd_context *ctdbd_setup(TALLOC_CTX *mem_ctx)
552 {
553         struct ctdbd_context *ctdb;
554         char line[1024];
555         bool status;
556
557         ctdb = talloc_zero(mem_ctx, struct ctdbd_context);
558         if (ctdb == NULL) {
559                 return NULL;
560         }
561
562         ctdb->node_map = nodemap_init(ctdb);
563         if (ctdb->node_map == NULL) {
564                 goto fail;
565         }
566
567         ctdb->iface_map = interfaces_init(ctdb);
568         if (ctdb->iface_map == NULL) {
569                 goto fail;
570         }
571
572         ctdb->vnn_map = vnnmap_init(ctdb);
573         if (ctdb->vnn_map == NULL) {
574                 goto fail;
575         }
576
577         while (fgets(line, sizeof(line), stdin) != NULL) {
578                 char *t;
579
580                 if ((t = strchr(line, '\n')) != NULL) {
581                         *t = '\0';
582                 }
583
584                 if (strcmp(line, "NODEMAP") == 0) {
585                         status = nodemap_parse(ctdb->node_map);
586                 } else if (strcmp(line, "IFACES") == 0) {
587                         status = interfaces_parse(ctdb->iface_map);
588                 } else if (strcmp(line, "VNNMAP") == 0) {
589                         status = vnnmap_parse(ctdb->vnn_map);
590                 } else {
591                         fprintf(stderr, "Unknown line %s\n", line);
592                         status = false;
593                 }
594
595                 if (! status) {
596                         goto fail;
597                 }
598         }
599
600         ctdb->start_time = tevent_timeval_current();
601         ctdb->recovery_start_time = tevent_timeval_current();
602         ctdb->vnn_map->recmode = CTDB_RECOVERY_NORMAL;
603         if (ctdb->vnn_map->generation == INVALID_GENERATION) {
604                 ctdb->vnn_map->generation =
605                         new_generation(ctdb->vnn_map->generation);
606         }
607         ctdb->recovery_end_time = tevent_timeval_current();
608
609         ctdb->log_level = DEBUG_ERR;
610         ctdb->runstate = CTDB_RUNSTATE_RUNNING;
611
612         ctdb_tunable_set_defaults(&ctdb->tun_list);
613
614         return ctdb;
615
616 fail:
617         TALLOC_FREE(ctdb);
618         return NULL;
619 }
620
621 static bool ctdbd_verify(struct ctdbd_context *ctdb)
622 {
623         struct node *node;
624         int i;
625
626         if (ctdb->node_map->num_nodes == 0) {
627                 return true;
628         }
629
630         /* Make sure all the nodes are in order */
631         for (i=0; i<ctdb->node_map->num_nodes; i++) {
632                 node = &ctdb->node_map->node[i];
633                 if (node->pnn != i) {
634                         fprintf(stderr, "Expected node %u, found %u\n",
635                                 i, node->pnn);
636                         return false;
637                 }
638         }
639
640         node = &ctdb->node_map->node[ctdb->node_map->pnn];
641         if (node->flags & NODE_FLAGS_DISCONNECTED) {
642                 DEBUG(DEBUG_INFO, ("Node disconnected, exiting\n"));
643                 exit(0);
644         }
645
646         return true;
647 }
648
649 /*
650  * Doing a recovery
651  */
652
653 struct recover_state {
654         struct tevent_context *ev;
655         struct ctdbd_context *ctdb;
656 };
657
658 static int recover_check(struct tevent_req *req);
659 static void recover_wait_done(struct tevent_req *subreq);
660 static void recover_done(struct tevent_req *subreq);
661
662 static struct tevent_req *recover_send(TALLOC_CTX *mem_ctx,
663                                        struct tevent_context *ev,
664                                        struct ctdbd_context *ctdb)
665 {
666         struct tevent_req *req;
667         struct recover_state *state;
668         int ret;
669
670         req = tevent_req_create(mem_ctx, &state, struct recover_state);
671         if (req == NULL) {
672                 return NULL;
673         }
674
675         state->ev = ev;
676         state->ctdb = ctdb;
677
678         ret = recover_check(req);
679         if (ret != 0) {
680                 tevent_req_error(req, ret);
681                 return tevent_req_post(req, ev);
682         }
683
684         return req;
685 }
686
687 static int recover_check(struct tevent_req *req)
688 {
689         struct recover_state *state = tevent_req_data(
690                 req, struct recover_state);
691         struct ctdbd_context *ctdb = state->ctdb;
692         struct tevent_req *subreq;
693         bool recovery_disabled;
694         int i;
695
696         recovery_disabled = false;
697         for (i=0; i<ctdb->node_map->num_nodes; i++) {
698                 if (ctdb->node_map->node[i].recovery_disabled) {
699                         recovery_disabled = true;
700                         break;
701                 }
702         }
703
704         subreq = tevent_wakeup_send(state, state->ev,
705                                     tevent_timeval_current_ofs(1, 0));
706         if (subreq == NULL) {
707                 return ENOMEM;
708         }
709
710         if (recovery_disabled) {
711                 tevent_req_set_callback(subreq, recover_wait_done, req);
712         } else {
713                 ctdb->recovery_start_time = tevent_timeval_current();
714                 tevent_req_set_callback(subreq, recover_done, req);
715         }
716
717         return 0;
718 }
719
720 static void recover_wait_done(struct tevent_req *subreq)
721 {
722         struct tevent_req *req = tevent_req_callback_data(
723                 subreq, struct tevent_req);
724         int ret;
725         bool status;
726
727         status = tevent_wakeup_recv(subreq);
728         TALLOC_FREE(subreq);
729         if (! status) {
730                 tevent_req_error(req, EIO);
731                 return;
732         }
733
734         ret = recover_check(req);
735         if (ret != 0) {
736                 tevent_req_error(req, ret);
737         }
738 }
739
740 static void recover_done(struct tevent_req *subreq)
741 {
742         struct tevent_req *req = tevent_req_callback_data(
743                 subreq, struct tevent_req);
744         struct recover_state *state = tevent_req_data(
745                 req, struct recover_state);
746         struct ctdbd_context *ctdb = state->ctdb;
747         bool status;
748
749         status = tevent_wakeup_recv(subreq);
750         TALLOC_FREE(subreq);
751         if (! status) {
752                 tevent_req_error(req, EIO);
753                 return;
754         }
755
756         ctdb->vnn_map->recmode = CTDB_RECOVERY_NORMAL;
757         ctdb->recovery_end_time = tevent_timeval_current();
758         ctdb->vnn_map->generation = new_generation(ctdb->vnn_map->generation);
759
760         tevent_req_done(req);
761 }
762
763 static bool recover_recv(struct tevent_req *req, int *perr)
764 {
765         int err;
766
767         if (tevent_req_is_unix_error(req, &err)) {
768                 if (perr != NULL) {
769                         *perr = err;
770                 }
771                 return false;
772         }
773
774         return true;
775 }
776
777 /*
778  * Routines for ctdb_req_header
779  */
780
781 static void header_fix_pnn(struct ctdb_req_header *header,
782                            struct ctdbd_context *ctdb)
783 {
784         if (header->srcnode == CTDB_CURRENT_NODE) {
785                 header->srcnode = ctdb->node_map->pnn;
786         }
787
788         if (header->destnode == CTDB_CURRENT_NODE) {
789                 header->destnode = ctdb->node_map->pnn;
790         }
791 }
792
793 static struct ctdb_req_header header_reply_control(
794                                         struct ctdb_req_header *header,
795                                         struct ctdbd_context *ctdb)
796 {
797         struct ctdb_req_header reply_header;
798
799         reply_header = (struct ctdb_req_header) {
800                 .ctdb_magic = CTDB_MAGIC,
801                 .ctdb_version = CTDB_PROTOCOL,
802                 .generation = ctdb->vnn_map->generation,
803                 .operation = CTDB_REPLY_CONTROL,
804                 .destnode = header->srcnode,
805                 .srcnode = header->destnode,
806                 .reqid = header->reqid,
807         };
808
809         return reply_header;
810 }
811
812 static struct ctdb_req_header header_reply_message(
813                                         struct ctdb_req_header *header,
814                                         struct ctdbd_context *ctdb)
815 {
816         struct ctdb_req_header reply_header;
817
818         reply_header = (struct ctdb_req_header) {
819                 .ctdb_magic = CTDB_MAGIC,
820                 .ctdb_version = CTDB_PROTOCOL,
821                 .generation = ctdb->vnn_map->generation,
822                 .operation = CTDB_REQ_MESSAGE,
823                 .destnode = header->srcnode,
824                 .srcnode = header->destnode,
825                 .reqid = 0,
826         };
827
828         return reply_header;
829 }
830
831 /*
832  * Client state
833  */
834
835 struct client_state {
836         struct tevent_context *ev;
837         int fd;
838         struct ctdbd_context *ctdb;
839         int pnn;
840         struct comm_context *comm;
841         struct srvid_register_state *rstate;
842         int status;
843 };
844
845 /*
846  * Send replies to controls and messages
847  */
848
849 static void client_reply_done(struct tevent_req *subreq);
850
851 static void client_send_message(struct tevent_req *req,
852                                 struct ctdb_req_header *header,
853                                 struct ctdb_req_message_data *message)
854 {
855         struct client_state *state = tevent_req_data(
856                 req, struct client_state);
857         struct ctdbd_context *ctdb = state->ctdb;
858         struct tevent_req *subreq;
859         struct ctdb_req_header reply_header;
860         uint8_t *buf;
861         size_t datalen, buflen;
862         int ret;
863
864         reply_header = header_reply_message(header, ctdb);
865
866         datalen = ctdb_req_message_data_len(&reply_header, message);
867         ret = ctdb_allocate_pkt(state, datalen, &buf, &buflen);
868         if (ret != 0) {
869                 tevent_req_error(req, ret);
870                 return;
871         }
872
873         ret = ctdb_req_message_data_push(&reply_header, message,
874                                          buf, &buflen);
875         if (ret != 0) {
876                 tevent_req_error(req, ret);
877                 return;
878         }
879
880         DEBUG(DEBUG_INFO, ("message srvid = 0x%"PRIx64"\n", message->srvid));
881
882         subreq = comm_write_send(state, state->ev, state->comm, buf, buflen);
883         if (tevent_req_nomem(subreq, req)) {
884                 return;
885         }
886         tevent_req_set_callback(subreq, client_reply_done, req);
887
888         talloc_steal(subreq, buf);
889 }
890
891 static void client_send_control(struct tevent_req *req,
892                                 struct ctdb_req_header *header,
893                                 struct ctdb_reply_control *reply)
894 {
895         struct client_state *state = tevent_req_data(
896                 req, struct client_state);
897         struct ctdbd_context *ctdb = state->ctdb;
898         struct tevent_req *subreq;
899         struct ctdb_req_header reply_header;
900         uint8_t *buf;
901         size_t datalen, buflen;
902         int ret;
903
904         reply_header = header_reply_control(header, ctdb);
905
906         datalen = ctdb_reply_control_len(&reply_header, reply);
907         ret = ctdb_allocate_pkt(state, datalen, &buf, &buflen);
908         if (ret != 0) {
909                 tevent_req_error(req, ret);
910                 return;
911         }
912
913         ret = ctdb_reply_control_push(&reply_header, reply, buf, &buflen);
914         if (ret != 0) {
915                 tevent_req_error(req, ret);
916                 return;
917         }
918
919         DEBUG(DEBUG_INFO, ("reply opcode = %u\n", reply->rdata.opcode));
920
921         subreq = comm_write_send(state, state->ev, state->comm, buf, buflen);
922         if (tevent_req_nomem(subreq, req)) {
923                 return;
924         }
925         tevent_req_set_callback(subreq, client_reply_done, req);
926
927         talloc_steal(subreq, buf);
928 }
929
930 static void client_reply_done(struct tevent_req *subreq)
931 {
932         struct tevent_req *req = tevent_req_callback_data(
933                 subreq, struct tevent_req);
934         int ret;
935         bool status;
936
937         status = comm_write_recv(subreq, &ret);
938         TALLOC_FREE(subreq);
939         if (! status) {
940                 tevent_req_error(req, ret);
941         }
942 }
943
944 /*
945  * Handling protocol - controls
946  */
947
948 static void control_process_exists(TALLOC_CTX *mem_ctx,
949                                    struct tevent_req *req,
950                                    struct ctdb_req_header *header,
951                                    struct ctdb_req_control *request)
952 {
953         struct ctdb_reply_control reply;
954
955         reply.rdata.opcode = request->opcode;
956         reply.status = kill(request->rdata.data.pid, 0);
957         reply.errmsg = NULL;
958
959         client_send_control(req, header, &reply);
960 }
961
962 static void control_ping(TALLOC_CTX *mem_ctx,
963                          struct tevent_req *req,
964                          struct ctdb_req_header *header,
965                          struct ctdb_req_control *request)
966 {
967         struct client_state *state = tevent_req_data(
968                 req, struct client_state);
969         struct ctdbd_context *ctdb = state->ctdb;
970         struct ctdb_reply_control reply;
971
972         reply.rdata.opcode = request->opcode;
973         reply.status = ctdb->num_clients;
974         reply.errmsg = NULL;
975
976         client_send_control(req, header, &reply);
977 }
978
979 static void control_getvnnmap(TALLOC_CTX *mem_ctx,
980                               struct tevent_req *req,
981                               struct ctdb_req_header *header,
982                               struct ctdb_req_control *request)
983 {
984         struct client_state *state = tevent_req_data(
985                 req, struct client_state);
986         struct ctdbd_context *ctdb = state->ctdb;
987         struct ctdb_reply_control reply;
988         struct ctdb_vnn_map *vnnmap;
989
990         reply.rdata.opcode = request->opcode;
991
992         vnnmap = talloc_zero(mem_ctx, struct ctdb_vnn_map);
993         if (vnnmap == NULL) {
994                 reply.status = ENOMEM;
995                 reply.errmsg = "Memory error";
996         } else {
997                 vnnmap->generation = ctdb->vnn_map->generation;
998                 vnnmap->size = ctdb->vnn_map->size;
999                 vnnmap->map = ctdb->vnn_map->map;
1000
1001                 reply.rdata.data.vnnmap = vnnmap;
1002                 reply.status = 0;
1003                 reply.errmsg = NULL;
1004         }
1005
1006         client_send_control(req, header, &reply);
1007 }
1008
1009 static void control_get_debug(TALLOC_CTX *mem_ctx,
1010                               struct tevent_req *req,
1011                               struct ctdb_req_header *header,
1012                               struct ctdb_req_control *request)
1013 {
1014         struct client_state *state = tevent_req_data(
1015                 req, struct client_state);
1016         struct ctdbd_context *ctdb = state->ctdb;
1017         struct ctdb_reply_control reply;
1018
1019         reply.rdata.opcode = request->opcode;
1020         reply.rdata.data.loglevel = debug_level_to_int(ctdb->log_level);
1021         reply.status = 0;
1022         reply.errmsg = NULL;
1023
1024         client_send_control(req, header, &reply);
1025 }
1026
1027 static void control_set_debug(TALLOC_CTX *mem_ctx,
1028                               struct tevent_req *req,
1029                               struct ctdb_req_header *header,
1030                               struct ctdb_req_control *request)
1031 {
1032         struct client_state *state = tevent_req_data(
1033                 req, struct client_state);
1034         struct ctdbd_context *ctdb = state->ctdb;
1035         struct ctdb_reply_control reply;
1036
1037         ctdb->log_level = debug_level_from_int(request->rdata.data.loglevel);
1038
1039         reply.rdata.opcode = request->opcode;
1040         reply.status = 0;
1041         reply.errmsg = NULL;
1042
1043         client_send_control(req, header, &reply);
1044 }
1045
1046 static void control_get_recmode(TALLOC_CTX *mem_ctx,
1047                                 struct tevent_req *req,
1048                                 struct ctdb_req_header *header,
1049                                 struct ctdb_req_control *request)
1050 {
1051         struct client_state *state = tevent_req_data(
1052                 req, struct client_state);
1053         struct ctdbd_context *ctdb = state->ctdb;
1054         struct ctdb_reply_control reply;
1055
1056         reply.rdata.opcode = request->opcode;
1057         reply.status = ctdb->vnn_map->recmode;
1058         reply.errmsg = NULL;
1059
1060         client_send_control(req, header, &reply);
1061 }
1062
1063 struct set_recmode_state {
1064         struct tevent_req *req;
1065         struct ctdbd_context *ctdb;
1066         struct ctdb_req_header header;
1067         struct ctdb_reply_control reply;
1068 };
1069
1070 static void set_recmode_callback(struct tevent_req *subreq)
1071 {
1072         struct set_recmode_state *substate = tevent_req_callback_data(
1073                 subreq, struct set_recmode_state);
1074         bool status;
1075         int ret;
1076
1077         status = recover_recv(subreq, &ret);
1078         TALLOC_FREE(subreq);
1079         if (! status) {
1080                 substate->reply.status = ret;
1081                 substate->reply.errmsg = "recovery failed";
1082         } else {
1083                 substate->reply.status = 0;
1084                 substate->reply.errmsg = NULL;
1085         }
1086
1087         client_send_control(substate->req, &substate->header, &substate->reply);
1088         talloc_free(substate);
1089 }
1090
1091 static void control_set_recmode(TALLOC_CTX *mem_ctx,
1092                                 struct tevent_req *req,
1093                                 struct ctdb_req_header *header,
1094                                 struct ctdb_req_control *request)
1095 {
1096         struct client_state *state = tevent_req_data(
1097                 req, struct client_state);
1098         struct tevent_req *subreq;
1099         struct ctdbd_context *ctdb = state->ctdb;
1100         struct set_recmode_state *substate;
1101         struct ctdb_reply_control reply;
1102
1103         reply.rdata.opcode = request->opcode;
1104
1105         if (request->rdata.data.recmode == CTDB_RECOVERY_NORMAL) {
1106                 reply.status = -1;
1107                 reply.errmsg = "Client cannot set recmode to NORMAL";
1108                 goto fail;
1109         }
1110
1111         substate = talloc_zero(ctdb, struct set_recmode_state);
1112         if (substate == NULL) {
1113                 reply.status = -1;
1114                 reply.errmsg = "Memory error";
1115                 goto fail;
1116         }
1117
1118         substate->req = req;
1119         substate->ctdb = ctdb;
1120         substate->header = *header;
1121         substate->reply.rdata.opcode = request->opcode;
1122
1123         subreq = recover_send(substate, state->ev, state->ctdb);
1124         if (subreq == NULL) {
1125                 talloc_free(substate);
1126                 goto fail;
1127         }
1128         tevent_req_set_callback(subreq, set_recmode_callback, substate);
1129
1130         ctdb->vnn_map->recmode = CTDB_RECOVERY_ACTIVE;
1131         return;
1132
1133 fail:
1134         client_send_control(req, header, &reply);
1135
1136 }
1137
1138 static int srvid_register_state_destructor(struct srvid_register_state *rstate)
1139 {
1140         DLIST_REMOVE(rstate->ctdb->rstate, rstate);
1141         return 0;
1142 }
1143
1144 static void control_register_srvid(TALLOC_CTX *mem_ctx,
1145                                    struct tevent_req *req,
1146                                    struct ctdb_req_header *header,
1147                                    struct ctdb_req_control *request)
1148 {
1149         struct client_state *state = tevent_req_data(
1150                 req, struct client_state);
1151         struct ctdbd_context *ctdb = state->ctdb;
1152         struct ctdb_reply_control reply;
1153         struct srvid_register_state *rstate;
1154
1155         reply.rdata.opcode = request->opcode;
1156
1157         rstate = talloc_zero(ctdb, struct srvid_register_state);
1158         if (rstate == NULL) {
1159                 reply.status = -1;
1160                 reply.errmsg = "Memory error";
1161                 goto fail;
1162         }
1163         rstate->ctdb = ctdb;
1164         rstate->srvid = request->srvid;
1165
1166         talloc_set_destructor(rstate, srvid_register_state_destructor);
1167
1168         DLIST_ADD_END(ctdb->rstate, rstate);
1169
1170         DEBUG(DEBUG_INFO, ("Register srvid 0x%"PRIx64"\n", rstate->srvid));
1171
1172         reply.status = 0;
1173         reply.errmsg = NULL;
1174
1175 fail:
1176         client_send_control(req, header, &reply);
1177 }
1178
1179 static void control_deregister_srvid(TALLOC_CTX *mem_ctx,
1180                                      struct tevent_req *req,
1181                                      struct ctdb_req_header *header,
1182                                      struct ctdb_req_control *request)
1183 {
1184         struct client_state *state = tevent_req_data(
1185                 req, struct client_state);
1186         struct ctdbd_context *ctdb = state->ctdb;
1187         struct ctdb_reply_control reply;
1188         struct srvid_register_state *rstate = NULL;
1189
1190         reply.rdata.opcode = request->opcode;
1191
1192         for (rstate = ctdb->rstate; rstate != NULL; rstate = rstate->next) {
1193                 if (rstate->srvid == request->srvid) {
1194                         break;
1195                 }
1196         }
1197
1198         if (rstate == NULL) {
1199                 reply.status = -1;
1200                 reply.errmsg = "srvid not registered";
1201                 goto fail;
1202         }
1203
1204         DEBUG(DEBUG_INFO, ("Deregister srvid 0x%"PRIx64"\n", rstate->srvid));
1205         talloc_free(rstate);
1206
1207         reply.status = 0;
1208         reply.errmsg = NULL;
1209
1210         client_send_control(req, header, &reply);
1211         return;
1212
1213 fail:
1214         TALLOC_FREE(rstate);
1215         client_send_control(req, header, &reply);
1216 }
1217
1218 static void control_get_pid(TALLOC_CTX *mem_ctx,
1219                             struct tevent_req *req,
1220                             struct ctdb_req_header *header,
1221                             struct ctdb_req_control *request)
1222 {
1223         struct ctdb_reply_control reply;
1224
1225         reply.rdata.opcode = request->opcode;
1226         reply.status = getpid();
1227         reply.errmsg = NULL;
1228
1229         client_send_control(req, header, &reply);
1230 }
1231
1232 static void control_get_recmaster(TALLOC_CTX *mem_ctx,
1233                                   struct tevent_req *req,
1234                                   struct ctdb_req_header *header,
1235                                   struct ctdb_req_control *request)
1236 {
1237         struct client_state *state = tevent_req_data(
1238                 req, struct client_state);
1239         struct ctdbd_context *ctdb = state->ctdb;
1240         struct ctdb_reply_control reply;
1241
1242         reply.rdata.opcode = request->opcode;
1243         reply.status = ctdb->node_map->recmaster;
1244         reply.errmsg = NULL;
1245
1246         client_send_control(req, header, &reply);
1247 }
1248
1249 static void control_get_pnn(TALLOC_CTX *mem_ctx,
1250                             struct tevent_req *req,
1251                             struct ctdb_req_header *header,
1252                             struct ctdb_req_control *request)
1253 {
1254         struct ctdb_reply_control reply;
1255
1256         reply.rdata.opcode = request->opcode;
1257         reply.status = header->destnode;
1258         reply.errmsg = NULL;
1259
1260         client_send_control(req, header, &reply);
1261 }
1262
1263 static void control_shutdown(TALLOC_CTX *mem_ctx,
1264                              struct tevent_req *req,
1265                              struct ctdb_req_header *hdr,
1266                              struct ctdb_req_control *request)
1267 {
1268         struct client_state *state = tevent_req_data(
1269                 req, struct client_state);
1270
1271         state->status = 99;
1272 }
1273
1274 static void control_set_tunable(TALLOC_CTX *mem_ctx,
1275                                 struct tevent_req *req,
1276                                 struct ctdb_req_header *header,
1277                                 struct ctdb_req_control *request)
1278 {
1279         struct client_state *state = tevent_req_data(
1280                 req, struct client_state);
1281         struct ctdbd_context *ctdb = state->ctdb;
1282         struct ctdb_reply_control reply;
1283         bool ret, obsolete;
1284
1285         reply.rdata.opcode = request->opcode;
1286         reply.errmsg = NULL;
1287
1288         ret = ctdb_tunable_set_value(&ctdb->tun_list,
1289                                      request->rdata.data.tunable->name,
1290                                      request->rdata.data.tunable->value,
1291                                      &obsolete);
1292         if (! ret) {
1293                 reply.status = -1;
1294         } else if (obsolete) {
1295                 reply.status = 1;
1296         } else {
1297                 reply.status = 0;
1298         }
1299
1300         client_send_control(req, header, &reply);
1301 }
1302
1303 static void control_get_tunable(TALLOC_CTX *mem_ctx,
1304                                 struct tevent_req *req,
1305                                 struct ctdb_req_header *header,
1306                                 struct ctdb_req_control *request)
1307 {
1308         struct client_state *state = tevent_req_data(
1309                 req, struct client_state);
1310         struct ctdbd_context *ctdb = state->ctdb;
1311         struct ctdb_reply_control reply;
1312         uint32_t value;
1313         bool ret;
1314
1315         reply.rdata.opcode = request->opcode;
1316         reply.errmsg = NULL;
1317
1318         ret = ctdb_tunable_get_value(&ctdb->tun_list,
1319                                      request->rdata.data.tun_var, &value);
1320         if (! ret) {
1321                 reply.status = -1;
1322         } else {
1323                 reply.rdata.data.tun_value = value;
1324                 reply.status = 0;
1325         }
1326
1327         client_send_control(req, header, &reply);
1328 }
1329
1330 static void control_list_tunables(TALLOC_CTX *mem_ctx,
1331                                   struct tevent_req *req,
1332                                   struct ctdb_req_header *header,
1333                                   struct ctdb_req_control *request)
1334 {
1335         struct ctdb_reply_control reply;
1336         struct ctdb_var_list *var_list;
1337
1338         reply.rdata.opcode = request->opcode;
1339         reply.errmsg = NULL;
1340
1341         var_list = ctdb_tunable_names(mem_ctx);
1342         if (var_list == NULL) {
1343                 reply.status = -1;
1344         } else {
1345                 reply.rdata.data.tun_var_list = var_list;
1346                 reply.status = 0;
1347         }
1348
1349         client_send_control(req, header, &reply);
1350 }
1351
1352 static void control_get_all_tunables(TALLOC_CTX *mem_ctx,
1353                                      struct tevent_req *req,
1354                                      struct ctdb_req_header *header,
1355                                      struct ctdb_req_control *request)
1356 {
1357         struct client_state *state = tevent_req_data(
1358                 req, struct client_state);
1359         struct ctdbd_context *ctdb = state->ctdb;
1360         struct ctdb_reply_control reply;
1361
1362         reply.rdata.opcode = request->opcode;
1363         reply.rdata.data.tun_list = &ctdb->tun_list;
1364         reply.status = 0;
1365         reply.errmsg = NULL;
1366
1367         client_send_control(req, header, &reply);
1368 }
1369
1370 static void control_uptime(TALLOC_CTX *mem_ctx,
1371                            struct tevent_req *req,
1372                            struct ctdb_req_header *header,
1373                            struct ctdb_req_control *request)
1374 {
1375         struct client_state *state = tevent_req_data(
1376                 req, struct client_state);
1377         struct ctdbd_context *ctdb = state->ctdb;
1378         struct ctdb_reply_control reply;
1379         struct ctdb_uptime *uptime;;
1380
1381         reply.rdata.opcode = request->opcode;
1382
1383         uptime = talloc_zero(mem_ctx, struct ctdb_uptime);
1384         if (uptime == NULL) {
1385                 goto fail;
1386         }
1387
1388         uptime->current_time = tevent_timeval_current();
1389         uptime->ctdbd_start_time = ctdb->start_time;
1390         uptime->last_recovery_started = ctdb->recovery_start_time;
1391         uptime->last_recovery_finished = ctdb->recovery_end_time;
1392
1393         reply.rdata.data.uptime = uptime;
1394         reply.status = 0;
1395         reply.errmsg = NULL;
1396         client_send_control(req, header, &reply);
1397         return;
1398
1399 fail:
1400         reply.status = -1;
1401         reply.errmsg = "Memory error";
1402         client_send_control(req, header, &reply);
1403 }
1404
1405 static void control_reload_nodes_file(TALLOC_CTX *mem_ctx,
1406                                       struct tevent_req *req,
1407                                       struct ctdb_req_header *header,
1408                                       struct ctdb_req_control *request)
1409 {
1410         struct client_state *state = tevent_req_data(
1411                 req, struct client_state);
1412         struct ctdbd_context *ctdb = state->ctdb;
1413         struct ctdb_reply_control reply;
1414         struct ctdb_node_map *nodemap;
1415         struct node_map *node_map = ctdb->node_map;
1416         int i;
1417
1418         reply.rdata.opcode = request->opcode;
1419
1420         nodemap = read_nodes_file(mem_ctx, header->destnode);
1421         if (nodemap == NULL) {
1422                 goto fail;
1423         }
1424
1425         for (i=0; i<nodemap->num; i++) {
1426                 struct node *node;
1427
1428                 if (i < node_map->num_nodes &&
1429                     ctdb_sock_addr_same(&nodemap->node[i].addr,
1430                                         &node_map->node[i].addr)) {
1431                         continue;
1432                 }
1433
1434                 if (nodemap->node[i].flags & NODE_FLAGS_DELETED) {
1435                         node = &node_map->node[i];
1436
1437                         node->flags |= NODE_FLAGS_DELETED;
1438                         parse_ip("0.0.0.0", NULL, 0, &node->addr);
1439
1440                         continue;
1441                 }
1442
1443                 if (i < node_map->num_nodes &&
1444                     node_map->node[i].flags & NODE_FLAGS_DELETED) {
1445                         node = &node_map->node[i];
1446
1447                         node->flags &= ~NODE_FLAGS_DELETED;
1448                         node->addr = nodemap->node[i].addr;
1449
1450                         continue;
1451                 }
1452
1453                 node_map->node = talloc_realloc(node_map, node_map->node,
1454                                                 struct node,
1455                                                 node_map->num_nodes+1);
1456                 if (node_map->node == NULL) {
1457                         goto fail;
1458                 }
1459                 node = &node_map->node[node_map->num_nodes];
1460
1461                 node->addr = nodemap->node[i].addr;
1462                 node->pnn = nodemap->node[i].pnn;
1463                 node->flags = 0;
1464                 node->capabilities = CTDB_CAP_DEFAULT;
1465                 node->recovery_disabled = false;
1466                 node->recovery_substate = NULL;
1467
1468                 node_map->num_nodes += 1;
1469         }
1470
1471         talloc_free(nodemap);
1472
1473         reply.status = 0;
1474         reply.errmsg = NULL;
1475         client_send_control(req, header, &reply);
1476         return;
1477
1478 fail:
1479         reply.status = -1;
1480         reply.errmsg = "Memory error";
1481         client_send_control(req, header, &reply);
1482 }
1483
1484 static void control_get_capabilities(TALLOC_CTX *mem_ctx,
1485                                      struct tevent_req *req,
1486                                      struct ctdb_req_header *header,
1487                                      struct ctdb_req_control *request)
1488 {
1489         struct client_state *state = tevent_req_data(
1490                 req, struct client_state);
1491         struct ctdbd_context *ctdb = state->ctdb;
1492         struct ctdb_reply_control reply;
1493         struct node *node;
1494         uint32_t caps = 0;
1495
1496         reply.rdata.opcode = request->opcode;
1497
1498         node = &ctdb->node_map->node[header->destnode];
1499         caps = node->capabilities;
1500
1501         if (node->flags & NODE_FLAGS_FAKE_TIMEOUT) {
1502                 /* Don't send reply */
1503                 return;
1504         }
1505
1506         reply.rdata.data.caps = caps;
1507         reply.status = 0;
1508         reply.errmsg = NULL;
1509
1510         client_send_control(req, header, &reply);
1511 }
1512
1513 static void control_get_nodemap(TALLOC_CTX *mem_ctx,
1514                                 struct tevent_req *req,
1515                                 struct ctdb_req_header *header,
1516                                 struct ctdb_req_control *request)
1517 {
1518         struct client_state *state = tevent_req_data(
1519                 req, struct client_state);
1520         struct ctdbd_context *ctdb = state->ctdb;
1521         struct ctdb_reply_control reply;
1522         struct ctdb_node_map *nodemap;
1523         struct node *node;
1524         int i;
1525
1526         reply.rdata.opcode = request->opcode;
1527
1528         nodemap = talloc_zero(mem_ctx, struct ctdb_node_map);
1529         if (nodemap == NULL) {
1530                 goto fail;
1531         }
1532
1533         nodemap->num = ctdb->node_map->num_nodes;
1534         nodemap->node = talloc_array(nodemap, struct ctdb_node_and_flags,
1535                                      nodemap->num);
1536         if (nodemap->node == NULL) {
1537                 goto fail;
1538         }
1539
1540         for (i=0; i<nodemap->num; i++) {
1541                 node = &ctdb->node_map->node[i];
1542                 nodemap->node[i] = (struct ctdb_node_and_flags) {
1543                         .pnn = node->pnn,
1544                         .flags = node->flags,
1545                         .addr = node->addr,
1546                 };
1547         }
1548
1549         reply.rdata.data.nodemap = nodemap;
1550         reply.status = 0;
1551         reply.errmsg = NULL;
1552         client_send_control(req, header, &reply);
1553         return;
1554
1555 fail:
1556         reply.status = -1;
1557         reply.errmsg = "Memory error";
1558         client_send_control(req, header, &reply);
1559 }
1560
1561 static void control_get_ifaces(TALLOC_CTX *mem_ctx,
1562                                struct tevent_req *req,
1563                                struct ctdb_req_header *header,
1564                                struct ctdb_req_control *request)
1565 {
1566         struct client_state *state = tevent_req_data(
1567                 req, struct client_state);
1568         struct ctdbd_context *ctdb = state->ctdb;
1569         struct ctdb_reply_control reply;
1570         struct ctdb_iface_list *iface_list;
1571         struct interface *iface;
1572         int i;
1573
1574         reply.rdata.opcode = request->opcode;
1575
1576         iface_list = talloc_zero(mem_ctx, struct ctdb_iface_list);
1577         if (iface_list == NULL) {
1578                 goto fail;
1579         }
1580
1581         iface_list->num = ctdb->iface_map->num;
1582         iface_list->iface = talloc_array(iface_list, struct ctdb_iface,
1583                                          iface_list->num);
1584         if (iface_list->iface == NULL) {
1585                 goto fail;
1586         }
1587
1588         for (i=0; i<iface_list->num; i++) {
1589                 iface = &ctdb->iface_map->iface[i];
1590                 iface_list->iface[i] = (struct ctdb_iface) {
1591                         .link_state = iface->link_up,
1592                         .references = iface->references,
1593                 };
1594                 strncpy(iface_list->iface[i].name, iface->name,
1595                         CTDB_IFACE_SIZE+2);
1596         }
1597
1598         reply.rdata.data.iface_list = iface_list;
1599         reply.status = 0;
1600         reply.errmsg = NULL;
1601         client_send_control(req, header, &reply);
1602         return;
1603
1604 fail:
1605         reply.status = -1;
1606         reply.errmsg = "Memory error";
1607         client_send_control(req, header, &reply);
1608 }
1609
1610 static void control_get_runstate(TALLOC_CTX *mem_ctx,
1611                                  struct tevent_req *req,
1612                                  struct ctdb_req_header *header,
1613                                  struct ctdb_req_control *request)
1614 {
1615         struct client_state *state = tevent_req_data(
1616                 req, struct client_state);
1617         struct ctdbd_context *ctdb = state->ctdb;
1618         struct ctdb_reply_control reply;
1619
1620         reply.rdata.opcode = request->opcode;
1621         reply.rdata.data.runstate = ctdb->runstate;
1622         reply.status = 0;
1623         reply.errmsg = NULL;
1624
1625         client_send_control(req, header, &reply);
1626 }
1627
1628 static void control_get_nodes_file(TALLOC_CTX *mem_ctx,
1629                                    struct tevent_req *req,
1630                                    struct ctdb_req_header *header,
1631                                    struct ctdb_req_control *request)
1632 {
1633         struct ctdb_reply_control reply;
1634         struct ctdb_node_map *nodemap;
1635
1636         reply.rdata.opcode = request->opcode;
1637
1638         nodemap = read_nodes_file(mem_ctx, header->destnode);
1639         if (nodemap == NULL) {
1640                 goto fail;
1641         }
1642
1643         reply.rdata.data.nodemap = nodemap;
1644         reply.status = 0;
1645         reply.errmsg = NULL;
1646         client_send_control(req, header, &reply);
1647         return;
1648
1649 fail:
1650         reply.status = -1;
1651         reply.errmsg = "Failed to read nodes file";
1652         client_send_control(req, header, &reply);
1653 }
1654
1655 static void control_error(TALLOC_CTX *mem_ctx,
1656                           struct tevent_req *req,
1657                           struct ctdb_req_header *header,
1658                           struct ctdb_req_control *request)
1659 {
1660         struct ctdb_reply_control reply;
1661
1662         reply.rdata.opcode = request->opcode;
1663         reply.status = -1;
1664         reply.errmsg = "Not implemented";
1665
1666         client_send_control(req, header, &reply);
1667 }
1668
1669 /*
1670  * Handling protocol - messages
1671  */
1672
1673 struct disable_recoveries_state {
1674         struct node *node;
1675 };
1676
1677 static void disable_recoveries_callback(struct tevent_req *subreq)
1678 {
1679         struct disable_recoveries_state *substate = tevent_req_callback_data(
1680                 subreq, struct disable_recoveries_state);
1681         bool status;
1682
1683         status = tevent_wakeup_recv(subreq);
1684         TALLOC_FREE(subreq);
1685         if (! status) {
1686                 DEBUG(DEBUG_INFO, ("tevent_wakeup_recv failed\n"));
1687         }
1688
1689         substate->node->recovery_disabled = false;
1690         TALLOC_FREE(substate->node->recovery_substate);
1691 }
1692
1693 static void message_disable_recoveries(TALLOC_CTX *mem_ctx,
1694                                        struct tevent_req *req,
1695                                        struct ctdb_req_header *header,
1696                                        struct ctdb_req_message *request)
1697 {
1698         struct client_state *state = tevent_req_data(
1699                 req, struct client_state);
1700         struct tevent_req *subreq;
1701         struct ctdbd_context *ctdb = state->ctdb;
1702         struct disable_recoveries_state *substate;
1703         struct ctdb_disable_message *disable = request->data.disable;
1704         struct ctdb_req_message_data reply;
1705         struct node *node;
1706         int ret = -1;
1707         TDB_DATA data;
1708
1709         node = &ctdb->node_map->node[header->destnode];
1710
1711         if (disable->timeout == 0) {
1712                 TALLOC_FREE(node->recovery_substate);
1713                 node->recovery_disabled = false;
1714                 DEBUG(DEBUG_INFO, ("Enabled recoveries on node %u\n",
1715                                    header->destnode));
1716                 goto done;
1717         }
1718
1719         substate = talloc_zero(ctdb->node_map,
1720                                struct disable_recoveries_state);
1721         if (substate == NULL) {
1722                 goto fail;
1723         }
1724
1725         substate->node = node;
1726
1727         subreq = tevent_wakeup_send(substate, state->ev,
1728                                     tevent_timeval_current_ofs(
1729                                             disable->timeout, 0));
1730         if (subreq == NULL) {
1731                 talloc_free(substate);
1732                 goto fail;
1733         }
1734         tevent_req_set_callback(subreq, disable_recoveries_callback, substate);
1735
1736         DEBUG(DEBUG_INFO, ("Disabled recoveries for %d seconds on node %u\n",
1737                            disable->timeout, header->destnode));
1738         node->recovery_substate = substate;
1739         node->recovery_disabled = true;
1740
1741 done:
1742         ret = header->destnode;
1743
1744 fail:
1745         reply.srvid = disable->srvid;
1746         data.dptr = (uint8_t *)&ret;
1747         data.dsize = sizeof(int);
1748         reply.data = data;
1749
1750         client_send_message(req, header, &reply);
1751 }
1752
1753 /*
1754  * Handle a single client
1755  */
1756
1757 static void client_read_handler(uint8_t *buf, size_t buflen,
1758                                 void *private_data);
1759 static void client_dead_handler(void *private_data);
1760 static void client_process_packet(struct tevent_req *req,
1761                                   uint8_t *buf, size_t buflen);
1762 static void client_process_message(struct tevent_req *req,
1763                                    uint8_t *buf, size_t buflen);
1764 static void client_process_control(struct tevent_req *req,
1765                                    uint8_t *buf, size_t buflen);
1766 static void client_reply_done(struct tevent_req *subreq);
1767
1768 static struct tevent_req *client_send(TALLOC_CTX *mem_ctx,
1769                                       struct tevent_context *ev,
1770                                       int fd, struct ctdbd_context *ctdb,
1771                                       int pnn)
1772 {
1773         struct tevent_req *req;
1774         struct client_state *state;
1775         int ret;
1776
1777         req = tevent_req_create(mem_ctx, &state, struct client_state);
1778         if (req == NULL) {
1779                 return NULL;
1780         }
1781
1782         state->ev = ev;
1783         state->fd = fd;
1784         state->ctdb = ctdb;
1785         state->pnn = pnn;
1786
1787         ret = comm_setup(state, ev, fd, client_read_handler, req,
1788                          client_dead_handler, req, &state->comm);
1789         if (ret != 0) {
1790                 tevent_req_error(req, ret);
1791                 return tevent_req_post(req, ev);
1792         }
1793
1794         DEBUG(DEBUG_INFO, ("New client fd=%d\n", fd));
1795
1796         return req;
1797 }
1798
1799 static void client_read_handler(uint8_t *buf, size_t buflen,
1800                                 void *private_data)
1801 {
1802         struct tevent_req *req = talloc_get_type_abort(
1803                 private_data, struct tevent_req);
1804         struct client_state *state = tevent_req_data(
1805                 req, struct client_state);
1806         struct ctdbd_context *ctdb = state->ctdb;
1807         struct ctdb_req_header header;
1808         int ret, i;
1809
1810         ret = ctdb_req_header_pull(buf, buflen, &header);
1811         if (ret != 0) {
1812                 return;
1813         }
1814
1815         if (buflen != header.length) {
1816                 return;
1817         }
1818
1819         ret = ctdb_req_header_verify(&header, 0);
1820         if (ret != 0) {
1821                 return;
1822         }
1823
1824         header_fix_pnn(&header, ctdb);
1825
1826         if (header.destnode == CTDB_BROADCAST_ALL) {
1827                 for (i=0; i<ctdb->node_map->num_nodes; i++) {
1828                         header.destnode = i;
1829
1830                         ctdb_req_header_push(&header, buf);
1831                         client_process_packet(req, buf, buflen);
1832                 }
1833                 return;
1834         }
1835
1836         if (header.destnode == CTDB_BROADCAST_CONNECTED) {
1837                 for (i=0; i<ctdb->node_map->num_nodes; i++) {
1838                         if (ctdb->node_map->node[i].flags &
1839                             NODE_FLAGS_DISCONNECTED) {
1840                                 continue;
1841                         }
1842
1843                         header.destnode = i;
1844
1845                         ctdb_req_header_push(&header, buf);
1846                         client_process_packet(req, buf, buflen);
1847                 }
1848                 return;
1849         }
1850
1851         if (header.destnode > ctdb->node_map->num_nodes) {
1852                 fprintf(stderr, "Invalid destination pnn 0x%x\n",
1853                         header.destnode);
1854                 return;
1855         }
1856
1857
1858         if (ctdb->node_map->node[header.destnode].flags & NODE_FLAGS_DISCONNECTED) {
1859                 fprintf(stderr, "Packet for disconnected node pnn %u\n",
1860                         header.destnode);
1861                 return;
1862         }
1863
1864         ctdb_req_header_push(&header, buf);
1865         client_process_packet(req, buf, buflen);
1866 }
1867
1868 static void client_dead_handler(void *private_data)
1869 {
1870         struct tevent_req *req = talloc_get_type_abort(
1871                 private_data, struct tevent_req);
1872
1873         tevent_req_done(req);
1874 }
1875
1876 static void client_process_packet(struct tevent_req *req,
1877                                   uint8_t *buf, size_t buflen)
1878 {
1879         struct ctdb_req_header header;
1880         int ret;
1881
1882         ret = ctdb_req_header_pull(buf, buflen, &header);
1883         if (ret != 0) {
1884                 return;
1885         }
1886
1887         switch (header.operation) {
1888         case CTDB_REQ_MESSAGE:
1889                 client_process_message(req, buf, buflen);
1890                 break;
1891
1892         case CTDB_REQ_CONTROL:
1893                 client_process_control(req, buf, buflen);
1894                 break;
1895
1896         default:
1897                 break;
1898         }
1899 }
1900
1901 static void client_process_message(struct tevent_req *req,
1902                                    uint8_t *buf, size_t buflen)
1903 {
1904         struct client_state *state = tevent_req_data(
1905                 req, struct client_state);
1906         struct ctdbd_context *ctdb = state->ctdb;
1907         TALLOC_CTX *mem_ctx;
1908         struct ctdb_req_header header;
1909         struct ctdb_req_message request;
1910         uint64_t srvid;
1911         int ret;
1912
1913         mem_ctx = talloc_new(state);
1914         if (tevent_req_nomem(mem_ctx, req)) {
1915                 return;
1916         }
1917
1918         ret = ctdb_req_message_pull(buf, buflen, &header, mem_ctx, &request);
1919         if (ret != 0) {
1920                 talloc_free(mem_ctx);
1921                 tevent_req_error(req, ret);
1922                 return;
1923         }
1924
1925         header_fix_pnn(&header, ctdb);
1926
1927         if (header.destnode >= ctdb->node_map->num_nodes) {
1928                 /* Many messages are not replied to, so just behave as
1929                  * though this message was not received */
1930                 fprintf(stderr, "Invalid node %d\n", header.destnode);
1931                 talloc_free(mem_ctx);
1932                 return;
1933         }
1934
1935         srvid = request.srvid;
1936         DEBUG(DEBUG_INFO, ("request srvid = 0x%"PRIx64"\n", srvid));
1937
1938         if (srvid == CTDB_SRVID_DISABLE_RECOVERIES) {
1939                 message_disable_recoveries(mem_ctx, req, &header, &request);
1940         }
1941
1942         /* check srvid */
1943         talloc_free(mem_ctx);
1944 }
1945
1946 static void client_process_control(struct tevent_req *req,
1947                                    uint8_t *buf, size_t buflen)
1948 {
1949         struct client_state *state = tevent_req_data(
1950                 req, struct client_state);
1951         struct ctdbd_context *ctdb = state->ctdb;
1952         TALLOC_CTX *mem_ctx;
1953         struct ctdb_req_header header;
1954         struct ctdb_req_control request;
1955         int ret;
1956
1957         mem_ctx = talloc_new(state);
1958         if (tevent_req_nomem(mem_ctx, req)) {
1959                 return;
1960         }
1961
1962         ret = ctdb_req_control_pull(buf, buflen, &header, mem_ctx, &request);
1963         if (ret != 0) {
1964                 talloc_free(mem_ctx);
1965                 tevent_req_error(req, ret);
1966                 return;
1967         }
1968
1969         header_fix_pnn(&header, ctdb);
1970
1971         if (header.destnode >= ctdb->node_map->num_nodes) {
1972                 struct ctdb_reply_control reply;
1973
1974                 reply.rdata.opcode = request.opcode;
1975                 reply.errmsg = "Invalid node";
1976                 reply.status = -1;
1977                 client_send_control(req, &header, &reply);
1978                 return;
1979         }
1980
1981         DEBUG(DEBUG_INFO, ("request opcode = %u, reqid = %u\n",
1982                            request.opcode, header.reqid));
1983
1984         switch (request.opcode) {
1985         case CTDB_CONTROL_PROCESS_EXISTS:
1986                 control_process_exists(mem_ctx, req, &header, &request);
1987                 break;
1988
1989         case CTDB_CONTROL_PING:
1990                 control_ping(mem_ctx, req, &header, &request);
1991                 break;
1992
1993         case CTDB_CONTROL_GETVNNMAP:
1994                 control_getvnnmap(mem_ctx, req, &header, &request);
1995                 break;
1996
1997         case CTDB_CONTROL_GET_DEBUG:
1998                 control_get_debug(mem_ctx, req, &header, &request);
1999                 break;
2000
2001         case CTDB_CONTROL_SET_DEBUG:
2002                 control_set_debug(mem_ctx, req, &header, &request);
2003                 break;
2004
2005         case CTDB_CONTROL_GET_RECMODE:
2006                 control_get_recmode(mem_ctx, req, &header, &request);
2007                 break;
2008
2009         case CTDB_CONTROL_SET_RECMODE:
2010                 control_set_recmode(mem_ctx, req, &header, &request);
2011                 break;
2012
2013         case CTDB_CONTROL_REGISTER_SRVID:
2014                 control_register_srvid(mem_ctx, req, &header, &request);
2015                 break;
2016
2017         case CTDB_CONTROL_DEREGISTER_SRVID:
2018                 control_deregister_srvid(mem_ctx, req, &header, &request);
2019                 break;
2020
2021         case CTDB_CONTROL_GET_PID:
2022                 control_get_pid(mem_ctx, req, &header, &request);
2023                 break;
2024
2025         case CTDB_CONTROL_GET_RECMASTER:
2026                 control_get_recmaster(mem_ctx, req, &header, &request);
2027                 break;
2028
2029         case CTDB_CONTROL_GET_PNN:
2030                 control_get_pnn(mem_ctx, req, &header, &request);
2031                 break;
2032
2033         case CTDB_CONTROL_SHUTDOWN:
2034                 control_shutdown(mem_ctx, req, &header, &request);
2035                 break;
2036
2037         case CTDB_CONTROL_SET_TUNABLE:
2038                 control_set_tunable(mem_ctx, req, &header, &request);
2039                 break;
2040
2041         case CTDB_CONTROL_GET_TUNABLE:
2042                 control_get_tunable(mem_ctx, req, &header, &request);
2043                 break;
2044
2045         case CTDB_CONTROL_LIST_TUNABLES:
2046                 control_list_tunables(mem_ctx, req, &header, &request);
2047                 break;
2048
2049         case CTDB_CONTROL_GET_ALL_TUNABLES:
2050                 control_get_all_tunables(mem_ctx, req, &header, &request);
2051                 break;
2052
2053         case CTDB_CONTROL_UPTIME:
2054                 control_uptime(mem_ctx, req, &header, &request);
2055                 break;
2056
2057         case CTDB_CONTROL_RELOAD_NODES_FILE:
2058                 control_reload_nodes_file(mem_ctx, req, &header, &request);
2059                 break;
2060
2061         case CTDB_CONTROL_GET_CAPABILITIES:
2062                 control_get_capabilities(mem_ctx, req, &header, &request);
2063                 break;
2064
2065         case CTDB_CONTROL_GET_NODEMAP:
2066                 control_get_nodemap(mem_ctx, req, &header, &request);
2067                 break;
2068
2069         case CTDB_CONTROL_GET_IFACES:
2070                 control_get_ifaces(mem_ctx, req, &header, &request);
2071                 break;
2072
2073         case CTDB_CONTROL_GET_RUNSTATE:
2074                 control_get_runstate(mem_ctx, req, &header, &request);
2075                 break;
2076
2077         case CTDB_CONTROL_GET_NODES_FILE:
2078                 control_get_nodes_file(mem_ctx, req, &header, &request);
2079                 break;
2080
2081         default:
2082                 if (! (request.flags & CTDB_CTRL_FLAG_NOREPLY)) {
2083                         control_error(mem_ctx, req, &header, &request);
2084                 }
2085                 break;
2086         }
2087
2088         talloc_free(mem_ctx);
2089 }
2090
2091 static int client_recv(struct tevent_req *req, int *perr)
2092 {
2093         struct client_state *state = tevent_req_data(
2094                 req, struct client_state);
2095         int err;
2096
2097         DEBUG(DEBUG_INFO, ("Client done fd=%d\n", state->fd));
2098         close(state->fd);
2099
2100         if (tevent_req_is_unix_error(req, &err)) {
2101                 if (perr != NULL) {
2102                         *perr = err;
2103                 }
2104                 return -1;
2105         }
2106
2107         return state->status;
2108 }
2109
2110 /*
2111  * Fake CTDB server
2112  */
2113
2114 struct server_state {
2115         struct tevent_context *ev;
2116         struct ctdbd_context *ctdb;
2117         int fd;
2118 };
2119
2120 static void server_new_client(struct tevent_req *subreq);
2121 static void server_client_done(struct tevent_req *subreq);
2122
2123 static struct tevent_req *server_send(TALLOC_CTX *mem_ctx,
2124                                       struct tevent_context *ev,
2125                                       struct ctdbd_context *ctdb,
2126                                       int fd)
2127 {
2128         struct tevent_req *req, *subreq;
2129         struct server_state *state;
2130
2131         req = tevent_req_create(mem_ctx, &state, struct server_state);
2132         if (req == NULL) {
2133                 return NULL;
2134         }
2135
2136         state->ev = ev;
2137         state->ctdb = ctdb;
2138         state->fd = fd;
2139
2140         subreq = accept_send(state, ev, fd);
2141         if (tevent_req_nomem(subreq, req)) {
2142                 return tevent_req_post(req, ev);
2143         }
2144         tevent_req_set_callback(subreq, server_new_client, req);
2145
2146         return req;
2147 }
2148
2149 static void server_new_client(struct tevent_req *subreq)
2150 {
2151         struct tevent_req *req = tevent_req_callback_data(
2152                 subreq, struct tevent_req);
2153         struct server_state *state = tevent_req_data(
2154                 req, struct server_state);
2155         struct ctdbd_context *ctdb = state->ctdb;
2156         int client_fd;
2157         int ret = 0;
2158
2159         client_fd = accept_recv(subreq, NULL, NULL, &ret);
2160         TALLOC_FREE(subreq);
2161         if (client_fd == -1) {
2162                 tevent_req_error(req, ret);
2163                 return;
2164         }
2165
2166         subreq = client_send(state, state->ev, client_fd,
2167                              ctdb, ctdb->node_map->pnn);
2168         if (tevent_req_nomem(subreq, req)) {
2169                 return;
2170         }
2171         tevent_req_set_callback(subreq, server_client_done, req);
2172
2173         ctdb->num_clients += 1;
2174
2175         subreq = accept_send(state, state->ev, state->fd);
2176         if (tevent_req_nomem(subreq, req)) {
2177                 return;
2178         }
2179         tevent_req_set_callback(subreq, server_new_client, req);
2180 }
2181
2182 static void server_client_done(struct tevent_req *subreq)
2183 {
2184         struct tevent_req *req = tevent_req_callback_data(
2185                 subreq, struct tevent_req);
2186         struct server_state *state = tevent_req_data(
2187                 req, struct server_state);
2188         struct ctdbd_context *ctdb = state->ctdb;
2189         int ret = 0;
2190         int status;
2191
2192         status = client_recv(subreq, &ret);
2193         TALLOC_FREE(subreq);
2194         if (status < 0) {
2195                 tevent_req_error(req, ret);
2196                 return;
2197         }
2198
2199         ctdb->num_clients -= 1;
2200
2201         if (status == 99) {
2202                 /* Special status, to shutdown server */
2203                 DEBUG(DEBUG_INFO, ("Shutting down server\n"));
2204                 tevent_req_done(req);
2205         }
2206 }
2207
2208 static bool server_recv(struct tevent_req *req, int *perr)
2209 {
2210         int err;
2211
2212         if (tevent_req_is_unix_error(req, &err)) {
2213                 if (perr != NULL) {
2214                         *perr = err;
2215                 }
2216                 return false;
2217         }
2218         return true;
2219 }
2220
2221 /*
2222  * Main functions
2223  */
2224
2225 static int socket_init(const char *sockpath)
2226 {
2227         struct sockaddr_un addr;
2228         size_t len;
2229         int ret, fd;
2230
2231         memset(&addr, 0, sizeof(addr));
2232         addr.sun_family = AF_UNIX;
2233
2234         len = strlcpy(addr.sun_path, sockpath, sizeof(addr.sun_path));
2235         if (len >= sizeof(addr.sun_path)) {
2236                 fprintf(stderr, "path too long: %s\n", sockpath);
2237                 return -1;
2238         }
2239
2240         fd = socket(AF_UNIX, SOCK_STREAM, 0);
2241         if (fd == -1) {
2242                 fprintf(stderr, "socket failed - %s\n", sockpath);
2243                 return -1;
2244         }
2245
2246         ret = bind(fd, (struct sockaddr *)&addr, sizeof(addr));
2247         if (ret != 0) {
2248                 fprintf(stderr, "bind failed - %s\n", sockpath);
2249                 goto fail;
2250         }
2251
2252         ret = listen(fd, 10);
2253         if (ret != 0) {
2254                 fprintf(stderr, "listen failed\n");
2255                 goto fail;
2256         }
2257
2258         DEBUG(DEBUG_INFO, ("Socket init done\n"));
2259
2260         return fd;
2261
2262 fail:
2263         if (fd != -1) {
2264                 close(fd);
2265         }
2266         return -1;
2267 }
2268
2269 static struct options {
2270         const char *sockpath;
2271         const char *pidfile;
2272         const char *debuglevel;
2273 } options;
2274
2275 static struct poptOption cmdline_options[] = {
2276         { "socket", 's', POPT_ARG_STRING, &options.sockpath, 0,
2277                 "Unix domain socket path", "filename" },
2278         { "pidfile", 'p', POPT_ARG_STRING, &options.pidfile, 0,
2279                 "pid file", "filename" } ,
2280         { "debug", 'd', POPT_ARG_STRING, &options.debuglevel, 0,
2281                 "debug level", "ERR|WARNING|NOTICE|INFO|DEBUG" } ,
2282 };
2283
2284 static void cleanup(void)
2285 {
2286         unlink(options.sockpath);
2287         unlink(options.pidfile);
2288 }
2289
2290 static void signal_handler(int sig)
2291 {
2292         cleanup();
2293         exit(0);
2294 }
2295
2296 static void start_server(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
2297                          struct ctdbd_context *ctdb, int fd, int pfd)
2298 {
2299         struct tevent_req *req;
2300         int ret = 0;
2301         ssize_t len;
2302
2303         atexit(cleanup);
2304         signal(SIGTERM, signal_handler);
2305
2306         req = server_send(mem_ctx, ev, ctdb, fd);
2307         if (req == NULL) {
2308                 fprintf(stderr, "Memory error\n");
2309                 exit(1);
2310         }
2311
2312         len = write(pfd, &ret, sizeof(ret));
2313         if (len != sizeof(ret)) {
2314                 fprintf(stderr, "Failed to send message to parent\n");
2315                 exit(1);
2316         }
2317         close(pfd);
2318
2319         tevent_req_poll(req, ev);
2320
2321         server_recv(req, &ret);
2322         if (ret != 0) {
2323                 exit(1);
2324         }
2325 }
2326
2327 int main(int argc, const char *argv[])
2328 {
2329         TALLOC_CTX *mem_ctx;
2330         struct ctdbd_context *ctdb;
2331         struct tevent_context *ev;
2332         enum debug_level debug_level;
2333         poptContext pc;
2334         int opt, fd, ret, pfd[2];
2335         ssize_t len;
2336         pid_t pid;
2337         FILE *fp;
2338
2339         pc = poptGetContext(argv[0], argc, argv, cmdline_options,
2340                             POPT_CONTEXT_KEEP_FIRST);
2341         while ((opt = poptGetNextOpt(pc)) != -1) {
2342                 fprintf(stderr, "Invalid option %s\n", poptBadOption(pc, 0));
2343                 exit(1);
2344         }
2345
2346         if (options.sockpath == NULL) {
2347                 fprintf(stderr, "Please specify socket path\n");
2348                 poptPrintHelp(pc, stdout, 0);
2349                 exit(1);
2350         }
2351
2352         if (options.pidfile == NULL) {
2353                 fprintf(stderr, "Please specify pid file\n");
2354                 poptPrintHelp(pc, stdout, 0);
2355                 exit(1);
2356         }
2357
2358         if (options.debuglevel == NULL) {
2359                 DEBUGLEVEL = debug_level_to_int(DEBUG_ERR);
2360         } else {
2361                 if (debug_level_parse(options.debuglevel, &debug_level)) {
2362                         DEBUGLEVEL = debug_level_to_int(debug_level);
2363                 } else {
2364                         fprintf(stderr, "Invalid debug level\n");
2365                         poptPrintHelp(pc, stdout, 0);
2366                         exit(1);
2367                 }
2368         }
2369
2370         mem_ctx = talloc_new(NULL);
2371         if (mem_ctx == NULL) {
2372                 fprintf(stderr, "Memory error\n");
2373                 exit(1);
2374         }
2375
2376         ctdb = ctdbd_setup(mem_ctx);
2377         if (ctdb == NULL) {
2378                 exit(1);
2379         }
2380
2381         if (! ctdbd_verify(ctdb)) {
2382                 exit(1);
2383         }
2384
2385         ev = tevent_context_init(mem_ctx);
2386         if (ev == NULL) {
2387                 fprintf(stderr, "Memory error\n");
2388                 exit(1);
2389         }
2390
2391         fd = socket_init(options.sockpath);
2392         if (fd == -1) {
2393                 exit(1);
2394         }
2395
2396         ret = pipe(pfd);
2397         if (ret != 0) {
2398                 fprintf(stderr, "Failed to create pipe\n");
2399                 cleanup();
2400                 exit(1);
2401         }
2402
2403         pid = fork();
2404         if (pid == -1) {
2405                 fprintf(stderr, "Failed to fork\n");
2406                 cleanup();
2407                 exit(1);
2408         }
2409
2410         if (pid == 0) {
2411                 /* Child */
2412                 close(pfd[0]);
2413                 start_server(mem_ctx, ev, ctdb, fd, pfd[1]);
2414                 exit(1);
2415         }
2416
2417         /* Parent */
2418         close(pfd[1]);
2419
2420         len = read(pfd[0], &ret, sizeof(ret));
2421         close(pfd[0]);
2422         if (len != sizeof(ret)) {
2423                 fprintf(stderr, "len = %zi\n", len);
2424                 fprintf(stderr, "Failed to get message from child\n");
2425                 kill(pid, SIGTERM);
2426                 exit(1);
2427         }
2428
2429         fp = fopen(options.pidfile, "w");
2430         if (fp == NULL) {
2431                 fprintf(stderr, "Failed to open pid file %s\n",
2432                         options.pidfile);
2433                 kill(pid, SIGTERM);
2434                 exit(1);
2435         }
2436         fprintf(fp, "%d\n", pid);
2437         fclose(fp);
2438
2439         return 0;
2440 }