ctdb-tests: Implement STOP_NODE and CONTINUE_NODE controls in fake_ctdbd
[amitay/samba.git] / ctdb / tests / src / fake_ctdbd.c
1 /*
2    Fake CTDB server for testing
3
4    Copyright (C) Amitay Isaacs  2016
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "replace.h"
21 #include "system/network.h"
22 #include "system/time.h"
23
24 #include <popt.h>
25 #include <talloc.h>
26 #include <tevent.h>
27 #include <tdb.h>
28
29 #include "lib/util/dlinklist.h"
30 #include "lib/util/tevent_unix.h"
31 #include "lib/util/debug.h"
32 #include "lib/util/samba_util.h"
33 #include "lib/async_req/async_sock.h"
34
35 #include "protocol/protocol.h"
36 #include "protocol/protocol_api.h"
37
38 #include "common/comm.h"
39 #include "common/system.h"
40 #include "common/logging.h"
41 #include "common/tunable.h"
42
43
44 #define CTDB_PORT 4379
45
46 /* A fake flag that is only supported by some functions */
47 #define NODE_FLAGS_FAKE_TIMEOUT 0x80000000
48
49 struct node {
50         ctdb_sock_addr addr;
51         uint32_t pnn;
52         uint32_t flags;
53         uint32_t capabilities;
54         bool recovery_disabled;
55         void *recovery_substate;
56 };
57
58 struct node_map {
59         uint32_t num_nodes;
60         struct node *node;
61         uint32_t pnn;
62         uint32_t recmaster;
63 };
64
65 struct interface {
66         const char *name;
67         bool link_up;
68         uint32_t references;
69 };
70
71 struct interface_map {
72         int num;
73         struct interface *iface;
74 };
75
76 struct vnn_map {
77         uint32_t recmode;
78         uint32_t generation;
79         uint32_t size;
80         uint32_t *map;
81 };
82
83 struct srvid_register_state {
84         struct srvid_register_state *prev, *next;
85         struct ctdbd_context *ctdb;
86         uint64_t srvid;
87 };
88
89 struct ctdbd_context {
90         struct node_map *node_map;
91         struct interface_map *iface_map;
92         struct vnn_map *vnn_map;
93         struct srvid_register_state *rstate;
94         int num_clients;
95         struct timeval start_time;
96         struct timeval recovery_start_time;
97         struct timeval recovery_end_time;
98         bool takeover_disabled;
99         enum debug_level log_level;
100         enum ctdb_runstate runstate;
101         struct ctdb_tunable_list tun_list;
102         int monitoring_mode;
103         char *reclock;
104 };
105
106 /*
107  * Parse routines
108  */
109
110 static struct node_map *nodemap_init(TALLOC_CTX *mem_ctx)
111 {
112         struct node_map *node_map;
113
114         node_map = talloc_zero(mem_ctx, struct node_map);
115         if (node_map == NULL) {
116                 return NULL;
117         }
118
119         node_map->pnn = CTDB_UNKNOWN_PNN;
120         node_map->recmaster = CTDB_UNKNOWN_PNN;
121
122         return node_map;
123 }
124
125 /* Read a nodemap from stdin.  Each line looks like:
126  *  <PNN> <FLAGS> [RECMASTER] [CURRENT] [CAPABILITIES]
127  * EOF or a blank line terminates input.
128  *
129  * By default, capablities for each node are
130  * CTDB_CAP_RECMASTER|CTDB_CAP_LMASTER.  These 2
131  * capabilities can be faked off by adding, for example,
132  * -CTDB_CAP_RECMASTER.
133  */
134
135 static bool nodemap_parse(struct node_map *node_map)
136 {
137         char line[1024];
138
139         while ((fgets(line, sizeof(line), stdin) != NULL)) {
140                 uint32_t pnn, flags, capabilities;
141                 char *tok, *t;
142                 char *ip;
143                 ctdb_sock_addr saddr;
144                 struct node *node;
145
146                 if (line[0] == '\n') {
147                         break;
148                 }
149
150                 /* Get rid of pesky newline */
151                 if ((t = strchr(line, '\n')) != NULL) {
152                         *t = '\0';
153                 }
154
155                 /* Get PNN */
156                 tok = strtok(line, " \t");
157                 if (tok == NULL) {
158                         fprintf(stderr, "bad line (%s) - missing PNN\n", line);
159                         continue;
160                 }
161                 pnn = (uint32_t)strtoul(tok, NULL, 0);
162
163                 /* Get IP */
164                 tok = strtok(NULL, " \t");
165                 if (tok == NULL) {
166                         fprintf(stderr, "bad line (%s) - missing IP\n", line);
167                         continue;
168                 }
169                 if (!parse_ip(tok, NULL, CTDB_PORT, &saddr)) {
170                         fprintf(stderr, "bad line (%s) - invalid IP\n", line);
171                         continue;
172                 }
173                 ip = talloc_strdup(node_map, tok);
174                 if (ip == NULL) {
175                         goto fail;
176                 }
177
178                 /* Get flags */
179                 tok = strtok(NULL, " \t");
180                 if (tok == NULL) {
181                         fprintf(stderr, "bad line (%s) - missing flags\n",
182                                 line);
183                         continue;
184                 }
185                 flags = (uint32_t)strtoul(tok, NULL, 0);
186                 /* Handle deleted nodes */
187                 if (flags & NODE_FLAGS_DELETED) {
188                         talloc_free(ip);
189                         ip = talloc_strdup(node_map, "0.0.0.0");
190                         if (ip == NULL) {
191                                 goto fail;
192                         }
193                 }
194                 capabilities = CTDB_CAP_RECMASTER|CTDB_CAP_LMASTER;
195
196                 tok = strtok(NULL, " \t");
197                 while (tok != NULL) {
198                         if (strcmp(tok, "CURRENT") == 0) {
199                                 node_map->pnn = pnn;
200                         } else if (strcmp(tok, "RECMASTER") == 0) {
201                                 node_map->recmaster = pnn;
202                         } else if (strcmp(tok, "-CTDB_CAP_RECMASTER") == 0) {
203                                 capabilities &= ~CTDB_CAP_RECMASTER;
204                         } else if (strcmp(tok, "-CTDB_CAP_LMASTER") == 0) {
205                                 capabilities &= ~CTDB_CAP_LMASTER;
206                         } else if (strcmp(tok, "TIMEOUT") == 0) {
207                                 /* This can be done with just a flag
208                                  * value but it is probably clearer
209                                  * and less error-prone to fake this
210                                  * with an explicit token */
211                                 flags |= NODE_FLAGS_FAKE_TIMEOUT;
212                         }
213                         tok = strtok(NULL, " \t");
214                 }
215
216                 node_map->node = talloc_realloc(node_map, node_map->node,
217                                                 struct node,
218                                                 node_map->num_nodes + 1);
219                 if (node_map->node == NULL) {
220                         goto fail;
221                 }
222                 node = &node_map->node[node_map->num_nodes];
223
224                 parse_ip(ip, NULL, CTDB_PORT, &node->addr);
225                 node->pnn = pnn;
226                 node->flags = flags;
227                 node->capabilities = capabilities;
228                 node->recovery_disabled = false;
229                 node->recovery_substate = NULL;
230
231                 node_map->num_nodes += 1;
232         }
233
234         DEBUG(DEBUG_INFO, ("Parsing nodemap done\n"));
235         return true;
236
237 fail:
238         DEBUG(DEBUG_INFO, ("Parsing nodemap failed\n"));
239         return false;
240
241 }
242
243 /* Append a node to a node map with given address and flags */
244 static bool node_map_add(struct ctdb_node_map *nodemap,
245                          const char *nstr, uint32_t flags)
246 {
247         ctdb_sock_addr addr;
248         uint32_t num;
249         struct ctdb_node_and_flags *n;
250
251         if (! parse_ip(nstr, NULL, CTDB_PORT, &addr)) {
252                 fprintf(stderr, "Invalid IP address %s\n", nstr);
253                 return false;
254         }
255
256         num = nodemap->num;
257         nodemap->node = talloc_realloc(nodemap, nodemap->node,
258                                        struct ctdb_node_and_flags, num+1);
259         if (nodemap->node == NULL) {
260                 return false;
261         }
262
263         n = &nodemap->node[num];
264         n->addr = addr;
265         n->pnn = num;
266         n->flags = flags;
267
268         nodemap->num = num+1;
269         return true;
270 }
271
272 /* Read a nodes file into a node map */
273 static struct ctdb_node_map *ctdb_read_nodes_file(TALLOC_CTX *mem_ctx,
274                                                   const char *nlist)
275 {
276         char **lines;
277         int nlines;
278         int i;
279         struct ctdb_node_map *nodemap;
280
281         nodemap = talloc_zero(mem_ctx, struct ctdb_node_map);
282         if (nodemap == NULL) {
283                 return NULL;
284         }
285
286         lines = file_lines_load(nlist, &nlines, 0, mem_ctx);
287         if (lines == NULL) {
288                 return NULL;
289         }
290
291         while (nlines > 0 && strcmp(lines[nlines-1], "") == 0) {
292                 nlines--;
293         }
294
295         for (i=0; i<nlines; i++) {
296                 char *node;
297                 uint32_t flags;
298                 size_t len;
299
300                 node = lines[i];
301                 /* strip leading spaces */
302                 while((*node == ' ') || (*node == '\t')) {
303                         node++;
304                 }
305
306                 len = strlen(node);
307
308                 /* strip trailing spaces */
309                 while ((len > 1) &&
310                        ((node[len-1] == ' ') || (node[len-1] == '\t')))
311                 {
312                         node[len-1] = '\0';
313                         len--;
314                 }
315
316                 if (len == 0) {
317                         continue;
318                 }
319                 if (*node == '#') {
320                         /* A "deleted" node is a node that is
321                            commented out in the nodes file.  This is
322                            used instead of removing a line, which
323                            would cause subsequent nodes to change
324                            their PNN. */
325                         flags = NODE_FLAGS_DELETED;
326                         node = discard_const("0.0.0.0");
327                 } else {
328                         flags = 0;
329                 }
330                 if (! node_map_add(nodemap, node, flags)) {
331                         talloc_free(lines);
332                         TALLOC_FREE(nodemap);
333                         return NULL;
334                 }
335         }
336
337         talloc_free(lines);
338         return nodemap;
339 }
340
341 static struct ctdb_node_map *read_nodes_file(TALLOC_CTX *mem_ctx,
342                                              uint32_t pnn)
343 {
344         struct ctdb_node_map *nodemap;
345         char nodepath[PATH_MAX];
346         const char *nodes_list;
347
348         /* read the nodes file */
349         sprintf(nodepath, "CTDB_NODES_%u", pnn);
350         nodes_list = getenv(nodepath);
351         if (nodes_list == NULL) {
352                 nodes_list = getenv("CTDB_NODES");
353                 if (nodes_list == NULL) {
354                         DEBUG(DEBUG_INFO, ("Nodes file not defined\n"));
355                         return NULL;
356                 }
357         }
358
359         nodemap = ctdb_read_nodes_file(mem_ctx, nodes_list);
360         if (nodemap == NULL) {
361                 DEBUG(DEBUG_INFO, ("Failed to read nodes file \"%s\"\n",
362                                    nodes_list));
363                 return NULL;
364         }
365
366         return nodemap;
367 }
368
369 static struct interface_map *interfaces_init(TALLOC_CTX *mem_ctx)
370 {
371         struct interface_map *iface_map;
372
373         iface_map = talloc_zero(mem_ctx, struct interface_map);
374         if (iface_map == NULL) {
375                 return NULL;
376         }
377
378         return iface_map;
379 }
380
381 /* Read interfaces information.  Same format as "ctdb ifaces -Y"
382  * output:
383  *   :Name:LinkStatus:References:
384  *   :eth2:1:4294967294
385  *   :eth1:1:4294967292
386  */
387
388 static bool interfaces_parse(struct interface_map *iface_map)
389 {
390         char line[1024];
391
392         while ((fgets(line, sizeof(line), stdin) != NULL)) {
393                 uint16_t link_state;
394                 uint32_t references;
395                 char *tok, *t, *name;
396                 struct interface *iface;
397
398                 if (line[0] == '\n') {
399                         break;
400                 }
401
402                 /* Get rid of pesky newline */
403                 if ((t = strchr(line, '\n')) != NULL) {
404                         *t = '\0';
405                 }
406
407                 if (strcmp(line, ":Name:LinkStatus:References:") == 0) {
408                         continue;
409                 }
410
411                 /* Leading colon... */
412                 // tok = strtok(line, ":");
413
414                 /* name */
415                 tok = strtok(line, ":");
416                 if (tok == NULL) {
417                         fprintf(stderr, "bad line (%s) - missing name\n", line);
418                         continue;
419                 }
420                 name = tok;
421
422                 /* link_state */
423                 tok = strtok(NULL, ":");
424                 if (tok == NULL) {
425                         fprintf(stderr, "bad line (%s) - missing link state\n",
426                                 line);
427                         continue;
428                 }
429                 link_state = (uint16_t)strtoul(tok, NULL, 0);
430
431                 /* references... */
432                 tok = strtok(NULL, ":");
433                 if (tok == NULL) {
434                         fprintf(stderr, "bad line (%s) - missing references\n",
435                                 line);
436                         continue;
437                 }
438                 references = (uint32_t)strtoul(tok, NULL, 0);
439
440                 iface_map->iface = talloc_realloc(iface_map, iface_map->iface,
441                                                   struct interface,
442                                                   iface_map->num + 1);
443                 if (iface_map->iface == NULL) {
444                         goto fail;
445                 }
446
447                 iface = &iface_map->iface[iface_map->num];
448
449                 iface->name = talloc_strdup(iface_map, name);
450                 if (iface->name == NULL) {
451                         goto fail;
452                 }
453                 iface->link_up = link_state;
454                 iface->references = references;
455
456                 iface_map->num += 1;
457         }
458
459         DEBUG(DEBUG_INFO, ("Parsing interfaces done\n"));
460         return true;
461
462 fail:
463         fprintf(stderr, "Parsing interfaces failed\n");
464         return false;
465 }
466
467 static struct vnn_map *vnnmap_init(TALLOC_CTX *mem_ctx)
468 {
469         struct vnn_map *vnn_map;
470
471         vnn_map = talloc_zero(mem_ctx, struct vnn_map);
472         if (vnn_map == NULL) {
473                 fprintf(stderr, "Memory error\n");
474                 return NULL;
475         }
476         vnn_map->recmode = CTDB_RECOVERY_ACTIVE;
477         vnn_map->generation = INVALID_GENERATION;
478
479         return vnn_map;
480 }
481
482 /* Read vnn map.
483  * output:
484  *   <GENERATION>
485  *   <LMASTER0>
486  *   <LMASTER1>
487  *   ...
488  */
489
490 static bool vnnmap_parse(struct vnn_map *vnn_map)
491 {
492         char line[1024];
493
494         while (fgets(line, sizeof(line), stdin) != NULL) {
495                 uint32_t n;
496                 char *t;
497
498                 if (line[0] == '\n') {
499                         break;
500                 }
501
502                 /* Get rid of pesky newline */
503                 if ((t = strchr(line, '\n')) != NULL) {
504                         *t = '\0';
505                 }
506
507                 n = (uint32_t) strtol(line, NULL, 0);
508
509                 /* generation */
510                 if (vnn_map->generation == INVALID_GENERATION) {
511                         vnn_map->generation = n;
512                         continue;
513                 }
514
515                 vnn_map->map = talloc_realloc(vnn_map, vnn_map->map, uint32_t,
516                                               vnn_map->size + 1);
517                 if (vnn_map->map == NULL) {
518                         fprintf(stderr, "Memory error\n");
519                         goto fail;
520                 }
521
522                 vnn_map->map[vnn_map->size] = n;
523                 vnn_map->size += 1;
524         }
525
526         DEBUG(DEBUG_INFO, ("Parsing vnnmap done\n"));
527         return true;
528
529 fail:
530         fprintf(stderr, "Parsing vnnmap failed\n");
531         return false;
532 }
533
534 static bool reclock_parse(struct ctdbd_context *ctdb)
535 {
536         char line[1024];
537         char *t;
538
539         if (fgets(line, sizeof(line), stdin) == NULL) {
540                 goto fail;
541         }
542
543         if (line[0] == '\n') {
544                 /* Recovery lock remains unset */
545                 goto ok;
546         }
547
548         /* Get rid of pesky newline */
549         if ((t = strchr(line, '\n')) != NULL) {
550                 *t = '\0';
551         }
552
553         ctdb->reclock = talloc_strdup(ctdb, line);
554         if (ctdb->reclock == NULL) {
555                 goto fail;
556         }
557 ok:
558         /* Swallow possible blank line following section */
559         fgets(line, sizeof(line), stdin);
560         DEBUG(DEBUG_INFO, ("Parsing reclock done\n"));
561         return true;
562
563 fail:
564         fprintf(stderr, "Parsing reclock failed\n");
565         return false;
566 }
567
568 /*
569  * CTDB context setup
570  */
571
572 static uint32_t new_generation(uint32_t old_generation)
573 {
574         uint32_t generation;
575
576         while (1) {
577                 generation = random();
578                 if (generation != INVALID_GENERATION &&
579                     generation != old_generation) {
580                         break;
581                 }
582         }
583
584         return generation;
585 }
586
587 static struct ctdbd_context *ctdbd_setup(TALLOC_CTX *mem_ctx)
588 {
589         struct ctdbd_context *ctdb;
590         char line[1024];
591         bool status;
592
593         ctdb = talloc_zero(mem_ctx, struct ctdbd_context);
594         if (ctdb == NULL) {
595                 return NULL;
596         }
597
598         ctdb->node_map = nodemap_init(ctdb);
599         if (ctdb->node_map == NULL) {
600                 goto fail;
601         }
602
603         ctdb->iface_map = interfaces_init(ctdb);
604         if (ctdb->iface_map == NULL) {
605                 goto fail;
606         }
607
608         ctdb->vnn_map = vnnmap_init(ctdb);
609         if (ctdb->vnn_map == NULL) {
610                 goto fail;
611         }
612
613         while (fgets(line, sizeof(line), stdin) != NULL) {
614                 char *t;
615
616                 if ((t = strchr(line, '\n')) != NULL) {
617                         *t = '\0';
618                 }
619
620                 if (strcmp(line, "NODEMAP") == 0) {
621                         status = nodemap_parse(ctdb->node_map);
622                 } else if (strcmp(line, "IFACES") == 0) {
623                         status = interfaces_parse(ctdb->iface_map);
624                 } else if (strcmp(line, "VNNMAP") == 0) {
625                         status = vnnmap_parse(ctdb->vnn_map);
626                 } else if (strcmp(line, "RECLOCK") == 0) {
627                         status = reclock_parse(ctdb);
628                 } else {
629                         fprintf(stderr, "Unknown line %s\n", line);
630                         status = false;
631                 }
632
633                 if (! status) {
634                         goto fail;
635                 }
636         }
637
638         ctdb->start_time = tevent_timeval_current();
639         ctdb->recovery_start_time = tevent_timeval_current();
640         ctdb->vnn_map->recmode = CTDB_RECOVERY_NORMAL;
641         if (ctdb->vnn_map->generation == INVALID_GENERATION) {
642                 ctdb->vnn_map->generation =
643                         new_generation(ctdb->vnn_map->generation);
644         }
645         ctdb->recovery_end_time = tevent_timeval_current();
646
647         ctdb->log_level = DEBUG_ERR;
648         ctdb->runstate = CTDB_RUNSTATE_RUNNING;
649
650         ctdb_tunable_set_defaults(&ctdb->tun_list);
651
652         ctdb->monitoring_mode = CTDB_MONITORING_ACTIVE;
653
654         return ctdb;
655
656 fail:
657         TALLOC_FREE(ctdb);
658         return NULL;
659 }
660
661 static bool ctdbd_verify(struct ctdbd_context *ctdb)
662 {
663         struct node *node;
664         int i;
665
666         if (ctdb->node_map->num_nodes == 0) {
667                 return true;
668         }
669
670         /* Make sure all the nodes are in order */
671         for (i=0; i<ctdb->node_map->num_nodes; i++) {
672                 node = &ctdb->node_map->node[i];
673                 if (node->pnn != i) {
674                         fprintf(stderr, "Expected node %u, found %u\n",
675                                 i, node->pnn);
676                         return false;
677                 }
678         }
679
680         node = &ctdb->node_map->node[ctdb->node_map->pnn];
681         if (node->flags & NODE_FLAGS_DISCONNECTED) {
682                 DEBUG(DEBUG_INFO, ("Node disconnected, exiting\n"));
683                 exit(0);
684         }
685
686         return true;
687 }
688
689 /*
690  * Doing a recovery
691  */
692
693 struct recover_state {
694         struct tevent_context *ev;
695         struct ctdbd_context *ctdb;
696 };
697
698 static int recover_check(struct tevent_req *req);
699 static void recover_wait_done(struct tevent_req *subreq);
700 static void recover_done(struct tevent_req *subreq);
701
702 static struct tevent_req *recover_send(TALLOC_CTX *mem_ctx,
703                                        struct tevent_context *ev,
704                                        struct ctdbd_context *ctdb)
705 {
706         struct tevent_req *req;
707         struct recover_state *state;
708         int ret;
709
710         req = tevent_req_create(mem_ctx, &state, struct recover_state);
711         if (req == NULL) {
712                 return NULL;
713         }
714
715         state->ev = ev;
716         state->ctdb = ctdb;
717
718         ret = recover_check(req);
719         if (ret != 0) {
720                 tevent_req_error(req, ret);
721                 return tevent_req_post(req, ev);
722         }
723
724         return req;
725 }
726
727 static int recover_check(struct tevent_req *req)
728 {
729         struct recover_state *state = tevent_req_data(
730                 req, struct recover_state);
731         struct ctdbd_context *ctdb = state->ctdb;
732         struct tevent_req *subreq;
733         bool recovery_disabled;
734         int i;
735
736         recovery_disabled = false;
737         for (i=0; i<ctdb->node_map->num_nodes; i++) {
738                 if (ctdb->node_map->node[i].recovery_disabled) {
739                         recovery_disabled = true;
740                         break;
741                 }
742         }
743
744         subreq = tevent_wakeup_send(state, state->ev,
745                                     tevent_timeval_current_ofs(1, 0));
746         if (subreq == NULL) {
747                 return ENOMEM;
748         }
749
750         if (recovery_disabled) {
751                 tevent_req_set_callback(subreq, recover_wait_done, req);
752         } else {
753                 ctdb->recovery_start_time = tevent_timeval_current();
754                 tevent_req_set_callback(subreq, recover_done, req);
755         }
756
757         return 0;
758 }
759
760 static void recover_wait_done(struct tevent_req *subreq)
761 {
762         struct tevent_req *req = tevent_req_callback_data(
763                 subreq, struct tevent_req);
764         int ret;
765         bool status;
766
767         status = tevent_wakeup_recv(subreq);
768         TALLOC_FREE(subreq);
769         if (! status) {
770                 tevent_req_error(req, EIO);
771                 return;
772         }
773
774         ret = recover_check(req);
775         if (ret != 0) {
776                 tevent_req_error(req, ret);
777         }
778 }
779
780 static void recover_done(struct tevent_req *subreq)
781 {
782         struct tevent_req *req = tevent_req_callback_data(
783                 subreq, struct tevent_req);
784         struct recover_state *state = tevent_req_data(
785                 req, struct recover_state);
786         struct ctdbd_context *ctdb = state->ctdb;
787         bool status;
788
789         status = tevent_wakeup_recv(subreq);
790         TALLOC_FREE(subreq);
791         if (! status) {
792                 tevent_req_error(req, EIO);
793                 return;
794         }
795
796         ctdb->vnn_map->recmode = CTDB_RECOVERY_NORMAL;
797         ctdb->recovery_end_time = tevent_timeval_current();
798         ctdb->vnn_map->generation = new_generation(ctdb->vnn_map->generation);
799
800         tevent_req_done(req);
801 }
802
803 static bool recover_recv(struct tevent_req *req, int *perr)
804 {
805         int err;
806
807         if (tevent_req_is_unix_error(req, &err)) {
808                 if (perr != NULL) {
809                         *perr = err;
810                 }
811                 return false;
812         }
813
814         return true;
815 }
816
817 /*
818  * Routines for ctdb_req_header
819  */
820
821 static void header_fix_pnn(struct ctdb_req_header *header,
822                            struct ctdbd_context *ctdb)
823 {
824         if (header->srcnode == CTDB_CURRENT_NODE) {
825                 header->srcnode = ctdb->node_map->pnn;
826         }
827
828         if (header->destnode == CTDB_CURRENT_NODE) {
829                 header->destnode = ctdb->node_map->pnn;
830         }
831 }
832
833 static struct ctdb_req_header header_reply_control(
834                                         struct ctdb_req_header *header,
835                                         struct ctdbd_context *ctdb)
836 {
837         struct ctdb_req_header reply_header;
838
839         reply_header = (struct ctdb_req_header) {
840                 .ctdb_magic = CTDB_MAGIC,
841                 .ctdb_version = CTDB_PROTOCOL,
842                 .generation = ctdb->vnn_map->generation,
843                 .operation = CTDB_REPLY_CONTROL,
844                 .destnode = header->srcnode,
845                 .srcnode = header->destnode,
846                 .reqid = header->reqid,
847         };
848
849         return reply_header;
850 }
851
852 static struct ctdb_req_header header_reply_message(
853                                         struct ctdb_req_header *header,
854                                         struct ctdbd_context *ctdb)
855 {
856         struct ctdb_req_header reply_header;
857
858         reply_header = (struct ctdb_req_header) {
859                 .ctdb_magic = CTDB_MAGIC,
860                 .ctdb_version = CTDB_PROTOCOL,
861                 .generation = ctdb->vnn_map->generation,
862                 .operation = CTDB_REQ_MESSAGE,
863                 .destnode = header->srcnode,
864                 .srcnode = header->destnode,
865                 .reqid = 0,
866         };
867
868         return reply_header;
869 }
870
871 /*
872  * Client state
873  */
874
875 struct client_state {
876         struct tevent_context *ev;
877         int fd;
878         struct ctdbd_context *ctdb;
879         int pnn;
880         struct comm_context *comm;
881         struct srvid_register_state *rstate;
882         int status;
883 };
884
885 /*
886  * Send replies to controls and messages
887  */
888
889 static void client_reply_done(struct tevent_req *subreq);
890
891 static void client_send_message(struct tevent_req *req,
892                                 struct ctdb_req_header *header,
893                                 struct ctdb_req_message_data *message)
894 {
895         struct client_state *state = tevent_req_data(
896                 req, struct client_state);
897         struct ctdbd_context *ctdb = state->ctdb;
898         struct tevent_req *subreq;
899         struct ctdb_req_header reply_header;
900         uint8_t *buf;
901         size_t datalen, buflen;
902         int ret;
903
904         reply_header = header_reply_message(header, ctdb);
905
906         datalen = ctdb_req_message_data_len(&reply_header, message);
907         ret = ctdb_allocate_pkt(state, datalen, &buf, &buflen);
908         if (ret != 0) {
909                 tevent_req_error(req, ret);
910                 return;
911         }
912
913         ret = ctdb_req_message_data_push(&reply_header, message,
914                                          buf, &buflen);
915         if (ret != 0) {
916                 tevent_req_error(req, ret);
917                 return;
918         }
919
920         DEBUG(DEBUG_INFO, ("message srvid = 0x%"PRIx64"\n", message->srvid));
921
922         subreq = comm_write_send(state, state->ev, state->comm, buf, buflen);
923         if (tevent_req_nomem(subreq, req)) {
924                 return;
925         }
926         tevent_req_set_callback(subreq, client_reply_done, req);
927
928         talloc_steal(subreq, buf);
929 }
930
931 static void client_send_control(struct tevent_req *req,
932                                 struct ctdb_req_header *header,
933                                 struct ctdb_reply_control *reply)
934 {
935         struct client_state *state = tevent_req_data(
936                 req, struct client_state);
937         struct ctdbd_context *ctdb = state->ctdb;
938         struct tevent_req *subreq;
939         struct ctdb_req_header reply_header;
940         uint8_t *buf;
941         size_t datalen, buflen;
942         int ret;
943
944         reply_header = header_reply_control(header, ctdb);
945
946         datalen = ctdb_reply_control_len(&reply_header, reply);
947         ret = ctdb_allocate_pkt(state, datalen, &buf, &buflen);
948         if (ret != 0) {
949                 tevent_req_error(req, ret);
950                 return;
951         }
952
953         ret = ctdb_reply_control_push(&reply_header, reply, buf, &buflen);
954         if (ret != 0) {
955                 tevent_req_error(req, ret);
956                 return;
957         }
958
959         DEBUG(DEBUG_INFO, ("reply opcode = %u\n", reply->rdata.opcode));
960
961         subreq = comm_write_send(state, state->ev, state->comm, buf, buflen);
962         if (tevent_req_nomem(subreq, req)) {
963                 return;
964         }
965         tevent_req_set_callback(subreq, client_reply_done, req);
966
967         talloc_steal(subreq, buf);
968 }
969
970 static void client_reply_done(struct tevent_req *subreq)
971 {
972         struct tevent_req *req = tevent_req_callback_data(
973                 subreq, struct tevent_req);
974         int ret;
975         bool status;
976
977         status = comm_write_recv(subreq, &ret);
978         TALLOC_FREE(subreq);
979         if (! status) {
980                 tevent_req_error(req, ret);
981         }
982 }
983
984 /*
985  * Handling protocol - controls
986  */
987
988 static void control_process_exists(TALLOC_CTX *mem_ctx,
989                                    struct tevent_req *req,
990                                    struct ctdb_req_header *header,
991                                    struct ctdb_req_control *request)
992 {
993         struct ctdb_reply_control reply;
994
995         reply.rdata.opcode = request->opcode;
996         reply.status = kill(request->rdata.data.pid, 0);
997         reply.errmsg = NULL;
998
999         client_send_control(req, header, &reply);
1000 }
1001
1002 static void control_ping(TALLOC_CTX *mem_ctx,
1003                          struct tevent_req *req,
1004                          struct ctdb_req_header *header,
1005                          struct ctdb_req_control *request)
1006 {
1007         struct client_state *state = tevent_req_data(
1008                 req, struct client_state);
1009         struct ctdbd_context *ctdb = state->ctdb;
1010         struct ctdb_reply_control reply;
1011
1012         reply.rdata.opcode = request->opcode;
1013         reply.status = ctdb->num_clients;
1014         reply.errmsg = NULL;
1015
1016         client_send_control(req, header, &reply);
1017 }
1018
1019 static void control_getvnnmap(TALLOC_CTX *mem_ctx,
1020                               struct tevent_req *req,
1021                               struct ctdb_req_header *header,
1022                               struct ctdb_req_control *request)
1023 {
1024         struct client_state *state = tevent_req_data(
1025                 req, struct client_state);
1026         struct ctdbd_context *ctdb = state->ctdb;
1027         struct ctdb_reply_control reply;
1028         struct ctdb_vnn_map *vnnmap;
1029
1030         reply.rdata.opcode = request->opcode;
1031
1032         vnnmap = talloc_zero(mem_ctx, struct ctdb_vnn_map);
1033         if (vnnmap == NULL) {
1034                 reply.status = ENOMEM;
1035                 reply.errmsg = "Memory error";
1036         } else {
1037                 vnnmap->generation = ctdb->vnn_map->generation;
1038                 vnnmap->size = ctdb->vnn_map->size;
1039                 vnnmap->map = ctdb->vnn_map->map;
1040
1041                 reply.rdata.data.vnnmap = vnnmap;
1042                 reply.status = 0;
1043                 reply.errmsg = NULL;
1044         }
1045
1046         client_send_control(req, header, &reply);
1047 }
1048
1049 static void control_get_debug(TALLOC_CTX *mem_ctx,
1050                               struct tevent_req *req,
1051                               struct ctdb_req_header *header,
1052                               struct ctdb_req_control *request)
1053 {
1054         struct client_state *state = tevent_req_data(
1055                 req, struct client_state);
1056         struct ctdbd_context *ctdb = state->ctdb;
1057         struct ctdb_reply_control reply;
1058
1059         reply.rdata.opcode = request->opcode;
1060         reply.rdata.data.loglevel = debug_level_to_int(ctdb->log_level);
1061         reply.status = 0;
1062         reply.errmsg = NULL;
1063
1064         client_send_control(req, header, &reply);
1065 }
1066
1067 static void control_set_debug(TALLOC_CTX *mem_ctx,
1068                               struct tevent_req *req,
1069                               struct ctdb_req_header *header,
1070                               struct ctdb_req_control *request)
1071 {
1072         struct client_state *state = tevent_req_data(
1073                 req, struct client_state);
1074         struct ctdbd_context *ctdb = state->ctdb;
1075         struct ctdb_reply_control reply;
1076
1077         ctdb->log_level = debug_level_from_int(request->rdata.data.loglevel);
1078
1079         reply.rdata.opcode = request->opcode;
1080         reply.status = 0;
1081         reply.errmsg = NULL;
1082
1083         client_send_control(req, header, &reply);
1084 }
1085
1086 static void control_get_recmode(TALLOC_CTX *mem_ctx,
1087                                 struct tevent_req *req,
1088                                 struct ctdb_req_header *header,
1089                                 struct ctdb_req_control *request)
1090 {
1091         struct client_state *state = tevent_req_data(
1092                 req, struct client_state);
1093         struct ctdbd_context *ctdb = state->ctdb;
1094         struct ctdb_reply_control reply;
1095
1096         reply.rdata.opcode = request->opcode;
1097         reply.status = ctdb->vnn_map->recmode;
1098         reply.errmsg = NULL;
1099
1100         client_send_control(req, header, &reply);
1101 }
1102
1103 struct set_recmode_state {
1104         struct tevent_req *req;
1105         struct ctdbd_context *ctdb;
1106         struct ctdb_req_header header;
1107         struct ctdb_reply_control reply;
1108 };
1109
1110 static void set_recmode_callback(struct tevent_req *subreq)
1111 {
1112         struct set_recmode_state *substate = tevent_req_callback_data(
1113                 subreq, struct set_recmode_state);
1114         bool status;
1115         int ret;
1116
1117         status = recover_recv(subreq, &ret);
1118         TALLOC_FREE(subreq);
1119         if (! status) {
1120                 substate->reply.status = ret;
1121                 substate->reply.errmsg = "recovery failed";
1122         } else {
1123                 substate->reply.status = 0;
1124                 substate->reply.errmsg = NULL;
1125         }
1126
1127         client_send_control(substate->req, &substate->header, &substate->reply);
1128         talloc_free(substate);
1129 }
1130
1131 static void control_set_recmode(TALLOC_CTX *mem_ctx,
1132                                 struct tevent_req *req,
1133                                 struct ctdb_req_header *header,
1134                                 struct ctdb_req_control *request)
1135 {
1136         struct client_state *state = tevent_req_data(
1137                 req, struct client_state);
1138         struct tevent_req *subreq;
1139         struct ctdbd_context *ctdb = state->ctdb;
1140         struct set_recmode_state *substate;
1141         struct ctdb_reply_control reply;
1142
1143         reply.rdata.opcode = request->opcode;
1144
1145         if (request->rdata.data.recmode == CTDB_RECOVERY_NORMAL) {
1146                 reply.status = -1;
1147                 reply.errmsg = "Client cannot set recmode to NORMAL";
1148                 goto fail;
1149         }
1150
1151         substate = talloc_zero(ctdb, struct set_recmode_state);
1152         if (substate == NULL) {
1153                 reply.status = -1;
1154                 reply.errmsg = "Memory error";
1155                 goto fail;
1156         }
1157
1158         substate->req = req;
1159         substate->ctdb = ctdb;
1160         substate->header = *header;
1161         substate->reply.rdata.opcode = request->opcode;
1162
1163         subreq = recover_send(substate, state->ev, state->ctdb);
1164         if (subreq == NULL) {
1165                 talloc_free(substate);
1166                 goto fail;
1167         }
1168         tevent_req_set_callback(subreq, set_recmode_callback, substate);
1169
1170         ctdb->vnn_map->recmode = CTDB_RECOVERY_ACTIVE;
1171         return;
1172
1173 fail:
1174         client_send_control(req, header, &reply);
1175
1176 }
1177
1178 static int srvid_register_state_destructor(struct srvid_register_state *rstate)
1179 {
1180         DLIST_REMOVE(rstate->ctdb->rstate, rstate);
1181         return 0;
1182 }
1183
1184 static void control_register_srvid(TALLOC_CTX *mem_ctx,
1185                                    struct tevent_req *req,
1186                                    struct ctdb_req_header *header,
1187                                    struct ctdb_req_control *request)
1188 {
1189         struct client_state *state = tevent_req_data(
1190                 req, struct client_state);
1191         struct ctdbd_context *ctdb = state->ctdb;
1192         struct ctdb_reply_control reply;
1193         struct srvid_register_state *rstate;
1194
1195         reply.rdata.opcode = request->opcode;
1196
1197         rstate = talloc_zero(ctdb, struct srvid_register_state);
1198         if (rstate == NULL) {
1199                 reply.status = -1;
1200                 reply.errmsg = "Memory error";
1201                 goto fail;
1202         }
1203         rstate->ctdb = ctdb;
1204         rstate->srvid = request->srvid;
1205
1206         talloc_set_destructor(rstate, srvid_register_state_destructor);
1207
1208         DLIST_ADD_END(ctdb->rstate, rstate);
1209
1210         DEBUG(DEBUG_INFO, ("Register srvid 0x%"PRIx64"\n", rstate->srvid));
1211
1212         reply.status = 0;
1213         reply.errmsg = NULL;
1214
1215 fail:
1216         client_send_control(req, header, &reply);
1217 }
1218
1219 static void control_deregister_srvid(TALLOC_CTX *mem_ctx,
1220                                      struct tevent_req *req,
1221                                      struct ctdb_req_header *header,
1222                                      struct ctdb_req_control *request)
1223 {
1224         struct client_state *state = tevent_req_data(
1225                 req, struct client_state);
1226         struct ctdbd_context *ctdb = state->ctdb;
1227         struct ctdb_reply_control reply;
1228         struct srvid_register_state *rstate = NULL;
1229
1230         reply.rdata.opcode = request->opcode;
1231
1232         for (rstate = ctdb->rstate; rstate != NULL; rstate = rstate->next) {
1233                 if (rstate->srvid == request->srvid) {
1234                         break;
1235                 }
1236         }
1237
1238         if (rstate == NULL) {
1239                 reply.status = -1;
1240                 reply.errmsg = "srvid not registered";
1241                 goto fail;
1242         }
1243
1244         DEBUG(DEBUG_INFO, ("Deregister srvid 0x%"PRIx64"\n", rstate->srvid));
1245         talloc_free(rstate);
1246
1247         reply.status = 0;
1248         reply.errmsg = NULL;
1249
1250         client_send_control(req, header, &reply);
1251         return;
1252
1253 fail:
1254         TALLOC_FREE(rstate);
1255         client_send_control(req, header, &reply);
1256 }
1257
1258 static void control_get_pid(TALLOC_CTX *mem_ctx,
1259                             struct tevent_req *req,
1260                             struct ctdb_req_header *header,
1261                             struct ctdb_req_control *request)
1262 {
1263         struct ctdb_reply_control reply;
1264
1265         reply.rdata.opcode = request->opcode;
1266         reply.status = getpid();
1267         reply.errmsg = NULL;
1268
1269         client_send_control(req, header, &reply);
1270 }
1271
1272 static void control_get_recmaster(TALLOC_CTX *mem_ctx,
1273                                   struct tevent_req *req,
1274                                   struct ctdb_req_header *header,
1275                                   struct ctdb_req_control *request)
1276 {
1277         struct client_state *state = tevent_req_data(
1278                 req, struct client_state);
1279         struct ctdbd_context *ctdb = state->ctdb;
1280         struct ctdb_reply_control reply;
1281
1282         reply.rdata.opcode = request->opcode;
1283         reply.status = ctdb->node_map->recmaster;
1284         reply.errmsg = NULL;
1285
1286         client_send_control(req, header, &reply);
1287 }
1288
1289 static void control_get_pnn(TALLOC_CTX *mem_ctx,
1290                             struct tevent_req *req,
1291                             struct ctdb_req_header *header,
1292                             struct ctdb_req_control *request)
1293 {
1294         struct ctdb_reply_control reply;
1295
1296         reply.rdata.opcode = request->opcode;
1297         reply.status = header->destnode;
1298         reply.errmsg = NULL;
1299
1300         client_send_control(req, header, &reply);
1301 }
1302
1303 static void control_shutdown(TALLOC_CTX *mem_ctx,
1304                              struct tevent_req *req,
1305                              struct ctdb_req_header *hdr,
1306                              struct ctdb_req_control *request)
1307 {
1308         struct client_state *state = tevent_req_data(
1309                 req, struct client_state);
1310
1311         state->status = 99;
1312 }
1313
1314 static void control_get_monmode(TALLOC_CTX *mem_ctx,
1315                                 struct tevent_req *req,
1316                                 struct ctdb_req_header *header,
1317                                 struct ctdb_req_control *request)
1318 {
1319         struct client_state *state = tevent_req_data(
1320                 req, struct client_state);
1321         struct ctdbd_context *ctdb = state->ctdb;
1322         struct ctdb_reply_control reply;
1323
1324         reply.rdata.opcode = request->opcode;
1325         reply.status = ctdb->monitoring_mode;
1326         reply.errmsg = NULL;
1327
1328         client_send_control(req, header, &reply);
1329 }
1330
1331 static void control_set_tunable(TALLOC_CTX *mem_ctx,
1332                                 struct tevent_req *req,
1333                                 struct ctdb_req_header *header,
1334                                 struct ctdb_req_control *request)
1335 {
1336         struct client_state *state = tevent_req_data(
1337                 req, struct client_state);
1338         struct ctdbd_context *ctdb = state->ctdb;
1339         struct ctdb_reply_control reply;
1340         bool ret, obsolete;
1341
1342         reply.rdata.opcode = request->opcode;
1343         reply.errmsg = NULL;
1344
1345         ret = ctdb_tunable_set_value(&ctdb->tun_list,
1346                                      request->rdata.data.tunable->name,
1347                                      request->rdata.data.tunable->value,
1348                                      &obsolete);
1349         if (! ret) {
1350                 reply.status = -1;
1351         } else if (obsolete) {
1352                 reply.status = 1;
1353         } else {
1354                 reply.status = 0;
1355         }
1356
1357         client_send_control(req, header, &reply);
1358 }
1359
1360 static void control_get_tunable(TALLOC_CTX *mem_ctx,
1361                                 struct tevent_req *req,
1362                                 struct ctdb_req_header *header,
1363                                 struct ctdb_req_control *request)
1364 {
1365         struct client_state *state = tevent_req_data(
1366                 req, struct client_state);
1367         struct ctdbd_context *ctdb = state->ctdb;
1368         struct ctdb_reply_control reply;
1369         uint32_t value;
1370         bool ret;
1371
1372         reply.rdata.opcode = request->opcode;
1373         reply.errmsg = NULL;
1374
1375         ret = ctdb_tunable_get_value(&ctdb->tun_list,
1376                                      request->rdata.data.tun_var, &value);
1377         if (! ret) {
1378                 reply.status = -1;
1379         } else {
1380                 reply.rdata.data.tun_value = value;
1381                 reply.status = 0;
1382         }
1383
1384         client_send_control(req, header, &reply);
1385 }
1386
1387 static void control_list_tunables(TALLOC_CTX *mem_ctx,
1388                                   struct tevent_req *req,
1389                                   struct ctdb_req_header *header,
1390                                   struct ctdb_req_control *request)
1391 {
1392         struct ctdb_reply_control reply;
1393         struct ctdb_var_list *var_list;
1394
1395         reply.rdata.opcode = request->opcode;
1396         reply.errmsg = NULL;
1397
1398         var_list = ctdb_tunable_names(mem_ctx);
1399         if (var_list == NULL) {
1400                 reply.status = -1;
1401         } else {
1402                 reply.rdata.data.tun_var_list = var_list;
1403                 reply.status = 0;
1404         }
1405
1406         client_send_control(req, header, &reply);
1407 }
1408
1409 static void control_get_all_tunables(TALLOC_CTX *mem_ctx,
1410                                      struct tevent_req *req,
1411                                      struct ctdb_req_header *header,
1412                                      struct ctdb_req_control *request)
1413 {
1414         struct client_state *state = tevent_req_data(
1415                 req, struct client_state);
1416         struct ctdbd_context *ctdb = state->ctdb;
1417         struct ctdb_reply_control reply;
1418
1419         reply.rdata.opcode = request->opcode;
1420         reply.rdata.data.tun_list = &ctdb->tun_list;
1421         reply.status = 0;
1422         reply.errmsg = NULL;
1423
1424         client_send_control(req, header, &reply);
1425 }
1426
1427 static void control_uptime(TALLOC_CTX *mem_ctx,
1428                            struct tevent_req *req,
1429                            struct ctdb_req_header *header,
1430                            struct ctdb_req_control *request)
1431 {
1432         struct client_state *state = tevent_req_data(
1433                 req, struct client_state);
1434         struct ctdbd_context *ctdb = state->ctdb;
1435         struct ctdb_reply_control reply;
1436         struct ctdb_uptime *uptime;;
1437
1438         reply.rdata.opcode = request->opcode;
1439
1440         uptime = talloc_zero(mem_ctx, struct ctdb_uptime);
1441         if (uptime == NULL) {
1442                 goto fail;
1443         }
1444
1445         uptime->current_time = tevent_timeval_current();
1446         uptime->ctdbd_start_time = ctdb->start_time;
1447         uptime->last_recovery_started = ctdb->recovery_start_time;
1448         uptime->last_recovery_finished = ctdb->recovery_end_time;
1449
1450         reply.rdata.data.uptime = uptime;
1451         reply.status = 0;
1452         reply.errmsg = NULL;
1453         client_send_control(req, header, &reply);
1454         return;
1455
1456 fail:
1457         reply.status = -1;
1458         reply.errmsg = "Memory error";
1459         client_send_control(req, header, &reply);
1460 }
1461
1462 static void control_enable_monitor(TALLOC_CTX *mem_ctx,
1463                                    struct tevent_req *req,
1464                                    struct ctdb_req_header *header,
1465                                    struct ctdb_req_control *request)
1466 {
1467         struct client_state *state = tevent_req_data(
1468                 req, struct client_state);
1469         struct ctdbd_context *ctdb = state->ctdb;
1470         struct ctdb_reply_control reply;
1471
1472         ctdb->monitoring_mode = CTDB_MONITORING_ACTIVE;
1473
1474         reply.rdata.opcode = request->opcode;
1475         reply.status = 0;
1476         reply.errmsg = NULL;
1477         client_send_control(req, header, &reply);
1478 }
1479
1480 static void control_disable_monitor(TALLOC_CTX *mem_ctx,
1481                                     struct tevent_req *req,
1482                                     struct ctdb_req_header *header,
1483                                     struct ctdb_req_control *request)
1484 {
1485         struct client_state *state = tevent_req_data(
1486                 req, struct client_state);
1487         struct ctdbd_context *ctdb = state->ctdb;
1488         struct ctdb_reply_control reply;
1489
1490         ctdb->monitoring_mode = CTDB_MONITORING_DISABLED;
1491
1492         reply.rdata.opcode = request->opcode;
1493         reply.status = 0;
1494         reply.errmsg = NULL;
1495         client_send_control(req, header, &reply);
1496 }
1497
1498 static void control_reload_nodes_file(TALLOC_CTX *mem_ctx,
1499                                       struct tevent_req *req,
1500                                       struct ctdb_req_header *header,
1501                                       struct ctdb_req_control *request)
1502 {
1503         struct client_state *state = tevent_req_data(
1504                 req, struct client_state);
1505         struct ctdbd_context *ctdb = state->ctdb;
1506         struct ctdb_reply_control reply;
1507         struct ctdb_node_map *nodemap;
1508         struct node_map *node_map = ctdb->node_map;
1509         int i;
1510
1511         reply.rdata.opcode = request->opcode;
1512
1513         nodemap = read_nodes_file(mem_ctx, header->destnode);
1514         if (nodemap == NULL) {
1515                 goto fail;
1516         }
1517
1518         for (i=0; i<nodemap->num; i++) {
1519                 struct node *node;
1520
1521                 if (i < node_map->num_nodes &&
1522                     ctdb_sock_addr_same(&nodemap->node[i].addr,
1523                                         &node_map->node[i].addr)) {
1524                         continue;
1525                 }
1526
1527                 if (nodemap->node[i].flags & NODE_FLAGS_DELETED) {
1528                         node = &node_map->node[i];
1529
1530                         node->flags |= NODE_FLAGS_DELETED;
1531                         parse_ip("0.0.0.0", NULL, 0, &node->addr);
1532
1533                         continue;
1534                 }
1535
1536                 if (i < node_map->num_nodes &&
1537                     node_map->node[i].flags & NODE_FLAGS_DELETED) {
1538                         node = &node_map->node[i];
1539
1540                         node->flags &= ~NODE_FLAGS_DELETED;
1541                         node->addr = nodemap->node[i].addr;
1542
1543                         continue;
1544                 }
1545
1546                 node_map->node = talloc_realloc(node_map, node_map->node,
1547                                                 struct node,
1548                                                 node_map->num_nodes+1);
1549                 if (node_map->node == NULL) {
1550                         goto fail;
1551                 }
1552                 node = &node_map->node[node_map->num_nodes];
1553
1554                 node->addr = nodemap->node[i].addr;
1555                 node->pnn = nodemap->node[i].pnn;
1556                 node->flags = 0;
1557                 node->capabilities = CTDB_CAP_DEFAULT;
1558                 node->recovery_disabled = false;
1559                 node->recovery_substate = NULL;
1560
1561                 node_map->num_nodes += 1;
1562         }
1563
1564         talloc_free(nodemap);
1565
1566         reply.status = 0;
1567         reply.errmsg = NULL;
1568         client_send_control(req, header, &reply);
1569         return;
1570
1571 fail:
1572         reply.status = -1;
1573         reply.errmsg = "Memory error";
1574         client_send_control(req, header, &reply);
1575 }
1576
1577 static void control_get_capabilities(TALLOC_CTX *mem_ctx,
1578                                      struct tevent_req *req,
1579                                      struct ctdb_req_header *header,
1580                                      struct ctdb_req_control *request)
1581 {
1582         struct client_state *state = tevent_req_data(
1583                 req, struct client_state);
1584         struct ctdbd_context *ctdb = state->ctdb;
1585         struct ctdb_reply_control reply;
1586         struct node *node;
1587         uint32_t caps = 0;
1588
1589         reply.rdata.opcode = request->opcode;
1590
1591         node = &ctdb->node_map->node[header->destnode];
1592         caps = node->capabilities;
1593
1594         if (node->flags & NODE_FLAGS_FAKE_TIMEOUT) {
1595                 /* Don't send reply */
1596                 return;
1597         }
1598
1599         reply.rdata.data.caps = caps;
1600         reply.status = 0;
1601         reply.errmsg = NULL;
1602
1603         client_send_control(req, header, &reply);
1604 }
1605
1606 static void control_get_nodemap(TALLOC_CTX *mem_ctx,
1607                                 struct tevent_req *req,
1608                                 struct ctdb_req_header *header,
1609                                 struct ctdb_req_control *request)
1610 {
1611         struct client_state *state = tevent_req_data(
1612                 req, struct client_state);
1613         struct ctdbd_context *ctdb = state->ctdb;
1614         struct ctdb_reply_control reply;
1615         struct ctdb_node_map *nodemap;
1616         struct node *node;
1617         int i;
1618
1619         reply.rdata.opcode = request->opcode;
1620
1621         nodemap = talloc_zero(mem_ctx, struct ctdb_node_map);
1622         if (nodemap == NULL) {
1623                 goto fail;
1624         }
1625
1626         nodemap->num = ctdb->node_map->num_nodes;
1627         nodemap->node = talloc_array(nodemap, struct ctdb_node_and_flags,
1628                                      nodemap->num);
1629         if (nodemap->node == NULL) {
1630                 goto fail;
1631         }
1632
1633         for (i=0; i<nodemap->num; i++) {
1634                 node = &ctdb->node_map->node[i];
1635                 nodemap->node[i] = (struct ctdb_node_and_flags) {
1636                         .pnn = node->pnn,
1637                         .flags = node->flags,
1638                         .addr = node->addr,
1639                 };
1640         }
1641
1642         reply.rdata.data.nodemap = nodemap;
1643         reply.status = 0;
1644         reply.errmsg = NULL;
1645         client_send_control(req, header, &reply);
1646         return;
1647
1648 fail:
1649         reply.status = -1;
1650         reply.errmsg = "Memory error";
1651         client_send_control(req, header, &reply);
1652 }
1653
1654 static void control_get_reclock_file(TALLOC_CTX *mem_ctx,
1655                                      struct tevent_req *req,
1656                                      struct ctdb_req_header *header,
1657                                      struct ctdb_req_control *request)
1658 {
1659         struct client_state *state = tevent_req_data(
1660                 req, struct client_state);
1661         struct ctdbd_context *ctdb = state->ctdb;
1662         struct ctdb_reply_control reply;
1663
1664         reply.rdata.opcode = request->opcode;
1665         if (ctdb->reclock != NULL) {
1666                 reply.rdata.data.reclock_file =
1667                         talloc_strdup(mem_ctx, ctdb->reclock);
1668                 if (reply.rdata.data.reclock_file == NULL) {
1669                         reply.status = ENOMEM;
1670                         reply.errmsg = "Memory error";
1671                         goto done;
1672                 }
1673         } else {
1674                 reply.rdata.data.reclock_file = NULL;
1675         }
1676
1677         reply.status = 0;
1678         reply.errmsg = NULL;
1679
1680 done:
1681         client_send_control(req, header, &reply);
1682 }
1683
1684 static void control_stop_node(TALLOC_CTX *mem_ctx,
1685                               struct tevent_req *req,
1686                               struct ctdb_req_header *header,
1687                               struct ctdb_req_control *request)
1688 {
1689         struct client_state *state = tevent_req_data(
1690                 req, struct client_state);
1691         struct ctdbd_context *ctdb = state->ctdb;
1692         struct ctdb_reply_control reply;
1693
1694         reply.rdata.opcode = request->opcode;
1695
1696         DEBUG(DEBUG_INFO, ("Stopping node\n"));
1697         ctdb->monitoring_mode = CTDB_MONITORING_DISABLED;
1698         ctdb->node_map->node[header->destnode].flags |= NODE_FLAGS_STOPPED;
1699
1700         reply.status = 0;
1701         reply.errmsg = NULL;
1702
1703         client_send_control(req, header, &reply);
1704         return;
1705 }
1706
1707 static void control_continue_node(TALLOC_CTX *mem_ctx,
1708                                   struct tevent_req *req,
1709                                   struct ctdb_req_header *header,
1710                                   struct ctdb_req_control *request)
1711 {
1712         struct client_state *state = tevent_req_data(
1713                 req, struct client_state);
1714         struct ctdbd_context *ctdb = state->ctdb;
1715         struct ctdb_reply_control reply;
1716
1717         reply.rdata.opcode = request->opcode;
1718
1719         DEBUG(DEBUG_INFO, ("Continue node\n"));
1720         ctdb->node_map->node[header->destnode].flags &= ~NODE_FLAGS_STOPPED;
1721
1722         reply.status = 0;
1723         reply.errmsg = NULL;
1724
1725         client_send_control(req, header, &reply);
1726         return;
1727 }
1728
1729 static void control_get_ifaces(TALLOC_CTX *mem_ctx,
1730                                struct tevent_req *req,
1731                                struct ctdb_req_header *header,
1732                                struct ctdb_req_control *request)
1733 {
1734         struct client_state *state = tevent_req_data(
1735                 req, struct client_state);
1736         struct ctdbd_context *ctdb = state->ctdb;
1737         struct ctdb_reply_control reply;
1738         struct ctdb_iface_list *iface_list;
1739         struct interface *iface;
1740         int i;
1741
1742         reply.rdata.opcode = request->opcode;
1743
1744         iface_list = talloc_zero(mem_ctx, struct ctdb_iface_list);
1745         if (iface_list == NULL) {
1746                 goto fail;
1747         }
1748
1749         iface_list->num = ctdb->iface_map->num;
1750         iface_list->iface = talloc_array(iface_list, struct ctdb_iface,
1751                                          iface_list->num);
1752         if (iface_list->iface == NULL) {
1753                 goto fail;
1754         }
1755
1756         for (i=0; i<iface_list->num; i++) {
1757                 iface = &ctdb->iface_map->iface[i];
1758                 iface_list->iface[i] = (struct ctdb_iface) {
1759                         .link_state = iface->link_up,
1760                         .references = iface->references,
1761                 };
1762                 strncpy(iface_list->iface[i].name, iface->name,
1763                         CTDB_IFACE_SIZE+2);
1764         }
1765
1766         reply.rdata.data.iface_list = iface_list;
1767         reply.status = 0;
1768         reply.errmsg = NULL;
1769         client_send_control(req, header, &reply);
1770         return;
1771
1772 fail:
1773         reply.status = -1;
1774         reply.errmsg = "Memory error";
1775         client_send_control(req, header, &reply);
1776 }
1777
1778 static void control_set_iface_link_state(TALLOC_CTX *mem_ctx,
1779                                          struct tevent_req *req,
1780                                          struct ctdb_req_header *header,
1781                                          struct ctdb_req_control *request)
1782 {
1783         struct client_state *state = tevent_req_data(
1784                 req, struct client_state);
1785         struct ctdbd_context *ctdb = state->ctdb;
1786         struct ctdb_reply_control reply;
1787         struct ctdb_iface *in_iface;
1788         struct interface *iface = NULL;
1789         bool link_up = false;
1790         int i;
1791
1792         reply.rdata.opcode = request->opcode;
1793
1794         in_iface = request->rdata.data.iface;
1795
1796         if (in_iface->name[CTDB_IFACE_SIZE] != '\0') {
1797                 reply.errmsg = "interface name not terminated";
1798                 goto fail;
1799         }
1800
1801         switch (in_iface->link_state) {
1802                 case 0:
1803                         link_up = false;
1804                         break;
1805
1806                 case 1:
1807                         link_up = true;
1808                         break;
1809
1810                 default:
1811                         reply.errmsg = "invalid link state";
1812                         goto fail;
1813         }
1814
1815         if (in_iface->references != 0) {
1816                 reply.errmsg = "references should be 0";
1817                 goto fail;
1818         }
1819
1820         for (i=0; i<ctdb->iface_map->num; i++) {
1821                 if (strcmp(ctdb->iface_map->iface[i].name,
1822                            in_iface->name) == 0) {
1823                         iface = &ctdb->iface_map->iface[i];
1824                         break;
1825                 }
1826         }
1827
1828         if (iface == NULL) {
1829                 reply.errmsg = "interface not found";
1830                 goto fail;
1831         }
1832
1833         iface->link_up = link_up;
1834
1835         reply.status = 0;
1836         reply.errmsg = NULL;
1837         client_send_control(req, header, &reply);
1838         return;
1839
1840 fail:
1841         reply.status = -1;
1842         client_send_control(req, header, &reply);
1843 }
1844
1845 static void control_get_runstate(TALLOC_CTX *mem_ctx,
1846                                  struct tevent_req *req,
1847                                  struct ctdb_req_header *header,
1848                                  struct ctdb_req_control *request)
1849 {
1850         struct client_state *state = tevent_req_data(
1851                 req, struct client_state);
1852         struct ctdbd_context *ctdb = state->ctdb;
1853         struct ctdb_reply_control reply;
1854
1855         reply.rdata.opcode = request->opcode;
1856         reply.rdata.data.runstate = ctdb->runstate;
1857         reply.status = 0;
1858         reply.errmsg = NULL;
1859
1860         client_send_control(req, header, &reply);
1861 }
1862
1863 static void control_get_nodes_file(TALLOC_CTX *mem_ctx,
1864                                    struct tevent_req *req,
1865                                    struct ctdb_req_header *header,
1866                                    struct ctdb_req_control *request)
1867 {
1868         struct ctdb_reply_control reply;
1869         struct ctdb_node_map *nodemap;
1870
1871         reply.rdata.opcode = request->opcode;
1872
1873         nodemap = read_nodes_file(mem_ctx, header->destnode);
1874         if (nodemap == NULL) {
1875                 goto fail;
1876         }
1877
1878         reply.rdata.data.nodemap = nodemap;
1879         reply.status = 0;
1880         reply.errmsg = NULL;
1881         client_send_control(req, header, &reply);
1882         return;
1883
1884 fail:
1885         reply.status = -1;
1886         reply.errmsg = "Failed to read nodes file";
1887         client_send_control(req, header, &reply);
1888 }
1889
1890 static void control_error(TALLOC_CTX *mem_ctx,
1891                           struct tevent_req *req,
1892                           struct ctdb_req_header *header,
1893                           struct ctdb_req_control *request)
1894 {
1895         struct ctdb_reply_control reply;
1896
1897         reply.rdata.opcode = request->opcode;
1898         reply.status = -1;
1899         reply.errmsg = "Not implemented";
1900
1901         client_send_control(req, header, &reply);
1902 }
1903
1904 /*
1905  * Handling protocol - messages
1906  */
1907
1908 struct disable_recoveries_state {
1909         struct node *node;
1910 };
1911
1912 static void disable_recoveries_callback(struct tevent_req *subreq)
1913 {
1914         struct disable_recoveries_state *substate = tevent_req_callback_data(
1915                 subreq, struct disable_recoveries_state);
1916         bool status;
1917
1918         status = tevent_wakeup_recv(subreq);
1919         TALLOC_FREE(subreq);
1920         if (! status) {
1921                 DEBUG(DEBUG_INFO, ("tevent_wakeup_recv failed\n"));
1922         }
1923
1924         substate->node->recovery_disabled = false;
1925         TALLOC_FREE(substate->node->recovery_substate);
1926 }
1927
1928 static void message_disable_recoveries(TALLOC_CTX *mem_ctx,
1929                                        struct tevent_req *req,
1930                                        struct ctdb_req_header *header,
1931                                        struct ctdb_req_message *request)
1932 {
1933         struct client_state *state = tevent_req_data(
1934                 req, struct client_state);
1935         struct tevent_req *subreq;
1936         struct ctdbd_context *ctdb = state->ctdb;
1937         struct disable_recoveries_state *substate;
1938         struct ctdb_disable_message *disable = request->data.disable;
1939         struct ctdb_req_message_data reply;
1940         struct node *node;
1941         int ret = -1;
1942         TDB_DATA data;
1943
1944         node = &ctdb->node_map->node[header->destnode];
1945
1946         if (disable->timeout == 0) {
1947                 TALLOC_FREE(node->recovery_substate);
1948                 node->recovery_disabled = false;
1949                 DEBUG(DEBUG_INFO, ("Enabled recoveries on node %u\n",
1950                                    header->destnode));
1951                 goto done;
1952         }
1953
1954         substate = talloc_zero(ctdb->node_map,
1955                                struct disable_recoveries_state);
1956         if (substate == NULL) {
1957                 goto fail;
1958         }
1959
1960         substate->node = node;
1961
1962         subreq = tevent_wakeup_send(substate, state->ev,
1963                                     tevent_timeval_current_ofs(
1964                                             disable->timeout, 0));
1965         if (subreq == NULL) {
1966                 talloc_free(substate);
1967                 goto fail;
1968         }
1969         tevent_req_set_callback(subreq, disable_recoveries_callback, substate);
1970
1971         DEBUG(DEBUG_INFO, ("Disabled recoveries for %d seconds on node %u\n",
1972                            disable->timeout, header->destnode));
1973         node->recovery_substate = substate;
1974         node->recovery_disabled = true;
1975
1976 done:
1977         ret = header->destnode;
1978
1979 fail:
1980         reply.srvid = disable->srvid;
1981         data.dptr = (uint8_t *)&ret;
1982         data.dsize = sizeof(int);
1983         reply.data = data;
1984
1985         client_send_message(req, header, &reply);
1986 }
1987
1988 /*
1989  * Handle a single client
1990  */
1991
1992 static void client_read_handler(uint8_t *buf, size_t buflen,
1993                                 void *private_data);
1994 static void client_dead_handler(void *private_data);
1995 static void client_process_packet(struct tevent_req *req,
1996                                   uint8_t *buf, size_t buflen);
1997 static void client_process_message(struct tevent_req *req,
1998                                    uint8_t *buf, size_t buflen);
1999 static void client_process_control(struct tevent_req *req,
2000                                    uint8_t *buf, size_t buflen);
2001 static void client_reply_done(struct tevent_req *subreq);
2002
2003 static struct tevent_req *client_send(TALLOC_CTX *mem_ctx,
2004                                       struct tevent_context *ev,
2005                                       int fd, struct ctdbd_context *ctdb,
2006                                       int pnn)
2007 {
2008         struct tevent_req *req;
2009         struct client_state *state;
2010         int ret;
2011
2012         req = tevent_req_create(mem_ctx, &state, struct client_state);
2013         if (req == NULL) {
2014                 return NULL;
2015         }
2016
2017         state->ev = ev;
2018         state->fd = fd;
2019         state->ctdb = ctdb;
2020         state->pnn = pnn;
2021
2022         ret = comm_setup(state, ev, fd, client_read_handler, req,
2023                          client_dead_handler, req, &state->comm);
2024         if (ret != 0) {
2025                 tevent_req_error(req, ret);
2026                 return tevent_req_post(req, ev);
2027         }
2028
2029         DEBUG(DEBUG_INFO, ("New client fd=%d\n", fd));
2030
2031         return req;
2032 }
2033
2034 static void client_read_handler(uint8_t *buf, size_t buflen,
2035                                 void *private_data)
2036 {
2037         struct tevent_req *req = talloc_get_type_abort(
2038                 private_data, struct tevent_req);
2039         struct client_state *state = tevent_req_data(
2040                 req, struct client_state);
2041         struct ctdbd_context *ctdb = state->ctdb;
2042         struct ctdb_req_header header;
2043         int ret, i;
2044
2045         ret = ctdb_req_header_pull(buf, buflen, &header);
2046         if (ret != 0) {
2047                 return;
2048         }
2049
2050         if (buflen != header.length) {
2051                 return;
2052         }
2053
2054         ret = ctdb_req_header_verify(&header, 0);
2055         if (ret != 0) {
2056                 return;
2057         }
2058
2059         header_fix_pnn(&header, ctdb);
2060
2061         if (header.destnode == CTDB_BROADCAST_ALL) {
2062                 for (i=0; i<ctdb->node_map->num_nodes; i++) {
2063                         header.destnode = i;
2064
2065                         ctdb_req_header_push(&header, buf);
2066                         client_process_packet(req, buf, buflen);
2067                 }
2068                 return;
2069         }
2070
2071         if (header.destnode == CTDB_BROADCAST_CONNECTED) {
2072                 for (i=0; i<ctdb->node_map->num_nodes; i++) {
2073                         if (ctdb->node_map->node[i].flags &
2074                             NODE_FLAGS_DISCONNECTED) {
2075                                 continue;
2076                         }
2077
2078                         header.destnode = i;
2079
2080                         ctdb_req_header_push(&header, buf);
2081                         client_process_packet(req, buf, buflen);
2082                 }
2083                 return;
2084         }
2085
2086         if (header.destnode > ctdb->node_map->num_nodes) {
2087                 fprintf(stderr, "Invalid destination pnn 0x%x\n",
2088                         header.destnode);
2089                 return;
2090         }
2091
2092
2093         if (ctdb->node_map->node[header.destnode].flags & NODE_FLAGS_DISCONNECTED) {
2094                 fprintf(stderr, "Packet for disconnected node pnn %u\n",
2095                         header.destnode);
2096                 return;
2097         }
2098
2099         ctdb_req_header_push(&header, buf);
2100         client_process_packet(req, buf, buflen);
2101 }
2102
2103 static void client_dead_handler(void *private_data)
2104 {
2105         struct tevent_req *req = talloc_get_type_abort(
2106                 private_data, struct tevent_req);
2107
2108         tevent_req_done(req);
2109 }
2110
2111 static void client_process_packet(struct tevent_req *req,
2112                                   uint8_t *buf, size_t buflen)
2113 {
2114         struct ctdb_req_header header;
2115         int ret;
2116
2117         ret = ctdb_req_header_pull(buf, buflen, &header);
2118         if (ret != 0) {
2119                 return;
2120         }
2121
2122         switch (header.operation) {
2123         case CTDB_REQ_MESSAGE:
2124                 client_process_message(req, buf, buflen);
2125                 break;
2126
2127         case CTDB_REQ_CONTROL:
2128                 client_process_control(req, buf, buflen);
2129                 break;
2130
2131         default:
2132                 break;
2133         }
2134 }
2135
2136 static void client_process_message(struct tevent_req *req,
2137                                    uint8_t *buf, size_t buflen)
2138 {
2139         struct client_state *state = tevent_req_data(
2140                 req, struct client_state);
2141         struct ctdbd_context *ctdb = state->ctdb;
2142         TALLOC_CTX *mem_ctx;
2143         struct ctdb_req_header header;
2144         struct ctdb_req_message request;
2145         uint64_t srvid;
2146         int ret;
2147
2148         mem_ctx = talloc_new(state);
2149         if (tevent_req_nomem(mem_ctx, req)) {
2150                 return;
2151         }
2152
2153         ret = ctdb_req_message_pull(buf, buflen, &header, mem_ctx, &request);
2154         if (ret != 0) {
2155                 talloc_free(mem_ctx);
2156                 tevent_req_error(req, ret);
2157                 return;
2158         }
2159
2160         header_fix_pnn(&header, ctdb);
2161
2162         if (header.destnode >= ctdb->node_map->num_nodes) {
2163                 /* Many messages are not replied to, so just behave as
2164                  * though this message was not received */
2165                 fprintf(stderr, "Invalid node %d\n", header.destnode);
2166                 talloc_free(mem_ctx);
2167                 return;
2168         }
2169
2170         srvid = request.srvid;
2171         DEBUG(DEBUG_INFO, ("request srvid = 0x%"PRIx64"\n", srvid));
2172
2173         if (srvid == CTDB_SRVID_DISABLE_RECOVERIES) {
2174                 message_disable_recoveries(mem_ctx, req, &header, &request);
2175         }
2176
2177         /* check srvid */
2178         talloc_free(mem_ctx);
2179 }
2180
2181 static void client_process_control(struct tevent_req *req,
2182                                    uint8_t *buf, size_t buflen)
2183 {
2184         struct client_state *state = tevent_req_data(
2185                 req, struct client_state);
2186         struct ctdbd_context *ctdb = state->ctdb;
2187         TALLOC_CTX *mem_ctx;
2188         struct ctdb_req_header header;
2189         struct ctdb_req_control request;
2190         int ret;
2191
2192         mem_ctx = talloc_new(state);
2193         if (tevent_req_nomem(mem_ctx, req)) {
2194                 return;
2195         }
2196
2197         ret = ctdb_req_control_pull(buf, buflen, &header, mem_ctx, &request);
2198         if (ret != 0) {
2199                 talloc_free(mem_ctx);
2200                 tevent_req_error(req, ret);
2201                 return;
2202         }
2203
2204         header_fix_pnn(&header, ctdb);
2205
2206         if (header.destnode >= ctdb->node_map->num_nodes) {
2207                 struct ctdb_reply_control reply;
2208
2209                 reply.rdata.opcode = request.opcode;
2210                 reply.errmsg = "Invalid node";
2211                 reply.status = -1;
2212                 client_send_control(req, &header, &reply);
2213                 return;
2214         }
2215
2216         DEBUG(DEBUG_INFO, ("request opcode = %u, reqid = %u\n",
2217                            request.opcode, header.reqid));
2218
2219         switch (request.opcode) {
2220         case CTDB_CONTROL_PROCESS_EXISTS:
2221                 control_process_exists(mem_ctx, req, &header, &request);
2222                 break;
2223
2224         case CTDB_CONTROL_PING:
2225                 control_ping(mem_ctx, req, &header, &request);
2226                 break;
2227
2228         case CTDB_CONTROL_GETVNNMAP:
2229                 control_getvnnmap(mem_ctx, req, &header, &request);
2230                 break;
2231
2232         case CTDB_CONTROL_GET_DEBUG:
2233                 control_get_debug(mem_ctx, req, &header, &request);
2234                 break;
2235
2236         case CTDB_CONTROL_SET_DEBUG:
2237                 control_set_debug(mem_ctx, req, &header, &request);
2238                 break;
2239
2240         case CTDB_CONTROL_GET_RECMODE:
2241                 control_get_recmode(mem_ctx, req, &header, &request);
2242                 break;
2243
2244         case CTDB_CONTROL_SET_RECMODE:
2245                 control_set_recmode(mem_ctx, req, &header, &request);
2246                 break;
2247
2248         case CTDB_CONTROL_REGISTER_SRVID:
2249                 control_register_srvid(mem_ctx, req, &header, &request);
2250                 break;
2251
2252         case CTDB_CONTROL_DEREGISTER_SRVID:
2253                 control_deregister_srvid(mem_ctx, req, &header, &request);
2254                 break;
2255
2256         case CTDB_CONTROL_GET_PID:
2257                 control_get_pid(mem_ctx, req, &header, &request);
2258                 break;
2259
2260         case CTDB_CONTROL_GET_RECMASTER:
2261                 control_get_recmaster(mem_ctx, req, &header, &request);
2262                 break;
2263
2264         case CTDB_CONTROL_GET_PNN:
2265                 control_get_pnn(mem_ctx, req, &header, &request);
2266                 break;
2267
2268         case CTDB_CONTROL_SHUTDOWN:
2269                 control_shutdown(mem_ctx, req, &header, &request);
2270                 break;
2271
2272         case CTDB_CONTROL_GET_MONMODE:
2273                 control_get_monmode(mem_ctx, req, &header, &request);
2274                 break;
2275
2276         case CTDB_CONTROL_SET_TUNABLE:
2277                 control_set_tunable(mem_ctx, req, &header, &request);
2278                 break;
2279
2280         case CTDB_CONTROL_GET_TUNABLE:
2281                 control_get_tunable(mem_ctx, req, &header, &request);
2282                 break;
2283
2284         case CTDB_CONTROL_LIST_TUNABLES:
2285                 control_list_tunables(mem_ctx, req, &header, &request);
2286                 break;
2287
2288         case CTDB_CONTROL_GET_ALL_TUNABLES:
2289                 control_get_all_tunables(mem_ctx, req, &header, &request);
2290                 break;
2291
2292         case CTDB_CONTROL_UPTIME:
2293                 control_uptime(mem_ctx, req, &header, &request);
2294                 break;
2295
2296         case CTDB_CONTROL_ENABLE_MONITOR:
2297                 control_enable_monitor(mem_ctx, req, &header, &request);
2298                 break;
2299
2300         case CTDB_CONTROL_DISABLE_MONITOR:
2301                 control_disable_monitor(mem_ctx, req, &header, &request);
2302                 break;
2303
2304         case CTDB_CONTROL_RELOAD_NODES_FILE:
2305                 control_reload_nodes_file(mem_ctx, req, &header, &request);
2306                 break;
2307
2308         case CTDB_CONTROL_GET_CAPABILITIES:
2309                 control_get_capabilities(mem_ctx, req, &header, &request);
2310                 break;
2311
2312         case CTDB_CONTROL_GET_NODEMAP:
2313                 control_get_nodemap(mem_ctx, req, &header, &request);
2314                 break;
2315
2316         case CTDB_CONTROL_GET_RECLOCK_FILE:
2317                 control_get_reclock_file(mem_ctx, req, &header, &request);
2318                 break;
2319
2320         case CTDB_CONTROL_STOP_NODE:
2321                 control_stop_node(mem_ctx, req, &header, &request);
2322                 break;
2323
2324         case CTDB_CONTROL_CONTINUE_NODE:
2325                 control_continue_node(mem_ctx, req, &header, &request);
2326                 break;
2327
2328         case CTDB_CONTROL_GET_IFACES:
2329                 control_get_ifaces(mem_ctx, req, &header, &request);
2330                 break;
2331
2332         case CTDB_CONTROL_SET_IFACE_LINK_STATE:
2333                 control_set_iface_link_state(mem_ctx, req, &header, &request);
2334                 break;
2335
2336         case CTDB_CONTROL_GET_RUNSTATE:
2337                 control_get_runstate(mem_ctx, req, &header, &request);
2338                 break;
2339
2340         case CTDB_CONTROL_GET_NODES_FILE:
2341                 control_get_nodes_file(mem_ctx, req, &header, &request);
2342                 break;
2343
2344         default:
2345                 if (! (request.flags & CTDB_CTRL_FLAG_NOREPLY)) {
2346                         control_error(mem_ctx, req, &header, &request);
2347                 }
2348                 break;
2349         }
2350
2351         talloc_free(mem_ctx);
2352 }
2353
2354 static int client_recv(struct tevent_req *req, int *perr)
2355 {
2356         struct client_state *state = tevent_req_data(
2357                 req, struct client_state);
2358         int err;
2359
2360         DEBUG(DEBUG_INFO, ("Client done fd=%d\n", state->fd));
2361         close(state->fd);
2362
2363         if (tevent_req_is_unix_error(req, &err)) {
2364                 if (perr != NULL) {
2365                         *perr = err;
2366                 }
2367                 return -1;
2368         }
2369
2370         return state->status;
2371 }
2372
2373 /*
2374  * Fake CTDB server
2375  */
2376
2377 struct server_state {
2378         struct tevent_context *ev;
2379         struct ctdbd_context *ctdb;
2380         int fd;
2381 };
2382
2383 static void server_new_client(struct tevent_req *subreq);
2384 static void server_client_done(struct tevent_req *subreq);
2385
2386 static struct tevent_req *server_send(TALLOC_CTX *mem_ctx,
2387                                       struct tevent_context *ev,
2388                                       struct ctdbd_context *ctdb,
2389                                       int fd)
2390 {
2391         struct tevent_req *req, *subreq;
2392         struct server_state *state;
2393
2394         req = tevent_req_create(mem_ctx, &state, struct server_state);
2395         if (req == NULL) {
2396                 return NULL;
2397         }
2398
2399         state->ev = ev;
2400         state->ctdb = ctdb;
2401         state->fd = fd;
2402
2403         subreq = accept_send(state, ev, fd);
2404         if (tevent_req_nomem(subreq, req)) {
2405                 return tevent_req_post(req, ev);
2406         }
2407         tevent_req_set_callback(subreq, server_new_client, req);
2408
2409         return req;
2410 }
2411
2412 static void server_new_client(struct tevent_req *subreq)
2413 {
2414         struct tevent_req *req = tevent_req_callback_data(
2415                 subreq, struct tevent_req);
2416         struct server_state *state = tevent_req_data(
2417                 req, struct server_state);
2418         struct ctdbd_context *ctdb = state->ctdb;
2419         int client_fd;
2420         int ret = 0;
2421
2422         client_fd = accept_recv(subreq, NULL, NULL, &ret);
2423         TALLOC_FREE(subreq);
2424         if (client_fd == -1) {
2425                 tevent_req_error(req, ret);
2426                 return;
2427         }
2428
2429         subreq = client_send(state, state->ev, client_fd,
2430                              ctdb, ctdb->node_map->pnn);
2431         if (tevent_req_nomem(subreq, req)) {
2432                 return;
2433         }
2434         tevent_req_set_callback(subreq, server_client_done, req);
2435
2436         ctdb->num_clients += 1;
2437
2438         subreq = accept_send(state, state->ev, state->fd);
2439         if (tevent_req_nomem(subreq, req)) {
2440                 return;
2441         }
2442         tevent_req_set_callback(subreq, server_new_client, req);
2443 }
2444
2445 static void server_client_done(struct tevent_req *subreq)
2446 {
2447         struct tevent_req *req = tevent_req_callback_data(
2448                 subreq, struct tevent_req);
2449         struct server_state *state = tevent_req_data(
2450                 req, struct server_state);
2451         struct ctdbd_context *ctdb = state->ctdb;
2452         int ret = 0;
2453         int status;
2454
2455         status = client_recv(subreq, &ret);
2456         TALLOC_FREE(subreq);
2457         if (status < 0) {
2458                 tevent_req_error(req, ret);
2459                 return;
2460         }
2461
2462         ctdb->num_clients -= 1;
2463
2464         if (status == 99) {
2465                 /* Special status, to shutdown server */
2466                 DEBUG(DEBUG_INFO, ("Shutting down server\n"));
2467                 tevent_req_done(req);
2468         }
2469 }
2470
2471 static bool server_recv(struct tevent_req *req, int *perr)
2472 {
2473         int err;
2474
2475         if (tevent_req_is_unix_error(req, &err)) {
2476                 if (perr != NULL) {
2477                         *perr = err;
2478                 }
2479                 return false;
2480         }
2481         return true;
2482 }
2483
2484 /*
2485  * Main functions
2486  */
2487
2488 static int socket_init(const char *sockpath)
2489 {
2490         struct sockaddr_un addr;
2491         size_t len;
2492         int ret, fd;
2493
2494         memset(&addr, 0, sizeof(addr));
2495         addr.sun_family = AF_UNIX;
2496
2497         len = strlcpy(addr.sun_path, sockpath, sizeof(addr.sun_path));
2498         if (len >= sizeof(addr.sun_path)) {
2499                 fprintf(stderr, "path too long: %s\n", sockpath);
2500                 return -1;
2501         }
2502
2503         fd = socket(AF_UNIX, SOCK_STREAM, 0);
2504         if (fd == -1) {
2505                 fprintf(stderr, "socket failed - %s\n", sockpath);
2506                 return -1;
2507         }
2508
2509         ret = bind(fd, (struct sockaddr *)&addr, sizeof(addr));
2510         if (ret != 0) {
2511                 fprintf(stderr, "bind failed - %s\n", sockpath);
2512                 goto fail;
2513         }
2514
2515         ret = listen(fd, 10);
2516         if (ret != 0) {
2517                 fprintf(stderr, "listen failed\n");
2518                 goto fail;
2519         }
2520
2521         DEBUG(DEBUG_INFO, ("Socket init done\n"));
2522
2523         return fd;
2524
2525 fail:
2526         if (fd != -1) {
2527                 close(fd);
2528         }
2529         return -1;
2530 }
2531
2532 static struct options {
2533         const char *sockpath;
2534         const char *pidfile;
2535         const char *debuglevel;
2536 } options;
2537
2538 static struct poptOption cmdline_options[] = {
2539         { "socket", 's', POPT_ARG_STRING, &options.sockpath, 0,
2540                 "Unix domain socket path", "filename" },
2541         { "pidfile", 'p', POPT_ARG_STRING, &options.pidfile, 0,
2542                 "pid file", "filename" } ,
2543         { "debug", 'd', POPT_ARG_STRING, &options.debuglevel, 0,
2544                 "debug level", "ERR|WARNING|NOTICE|INFO|DEBUG" } ,
2545 };
2546
2547 static void cleanup(void)
2548 {
2549         unlink(options.sockpath);
2550         unlink(options.pidfile);
2551 }
2552
2553 static void signal_handler(int sig)
2554 {
2555         cleanup();
2556         exit(0);
2557 }
2558
2559 static void start_server(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
2560                          struct ctdbd_context *ctdb, int fd, int pfd)
2561 {
2562         struct tevent_req *req;
2563         int ret = 0;
2564         ssize_t len;
2565
2566         atexit(cleanup);
2567         signal(SIGTERM, signal_handler);
2568
2569         req = server_send(mem_ctx, ev, ctdb, fd);
2570         if (req == NULL) {
2571                 fprintf(stderr, "Memory error\n");
2572                 exit(1);
2573         }
2574
2575         len = write(pfd, &ret, sizeof(ret));
2576         if (len != sizeof(ret)) {
2577                 fprintf(stderr, "Failed to send message to parent\n");
2578                 exit(1);
2579         }
2580         close(pfd);
2581
2582         tevent_req_poll(req, ev);
2583
2584         server_recv(req, &ret);
2585         if (ret != 0) {
2586                 exit(1);
2587         }
2588 }
2589
2590 int main(int argc, const char *argv[])
2591 {
2592         TALLOC_CTX *mem_ctx;
2593         struct ctdbd_context *ctdb;
2594         struct tevent_context *ev;
2595         enum debug_level debug_level;
2596         poptContext pc;
2597         int opt, fd, ret, pfd[2];
2598         ssize_t len;
2599         pid_t pid;
2600         FILE *fp;
2601
2602         pc = poptGetContext(argv[0], argc, argv, cmdline_options,
2603                             POPT_CONTEXT_KEEP_FIRST);
2604         while ((opt = poptGetNextOpt(pc)) != -1) {
2605                 fprintf(stderr, "Invalid option %s\n", poptBadOption(pc, 0));
2606                 exit(1);
2607         }
2608
2609         if (options.sockpath == NULL) {
2610                 fprintf(stderr, "Please specify socket path\n");
2611                 poptPrintHelp(pc, stdout, 0);
2612                 exit(1);
2613         }
2614
2615         if (options.pidfile == NULL) {
2616                 fprintf(stderr, "Please specify pid file\n");
2617                 poptPrintHelp(pc, stdout, 0);
2618                 exit(1);
2619         }
2620
2621         if (options.debuglevel == NULL) {
2622                 DEBUGLEVEL = debug_level_to_int(DEBUG_ERR);
2623         } else {
2624                 if (debug_level_parse(options.debuglevel, &debug_level)) {
2625                         DEBUGLEVEL = debug_level_to_int(debug_level);
2626                 } else {
2627                         fprintf(stderr, "Invalid debug level\n");
2628                         poptPrintHelp(pc, stdout, 0);
2629                         exit(1);
2630                 }
2631         }
2632
2633         mem_ctx = talloc_new(NULL);
2634         if (mem_ctx == NULL) {
2635                 fprintf(stderr, "Memory error\n");
2636                 exit(1);
2637         }
2638
2639         ctdb = ctdbd_setup(mem_ctx);
2640         if (ctdb == NULL) {
2641                 exit(1);
2642         }
2643
2644         if (! ctdbd_verify(ctdb)) {
2645                 exit(1);
2646         }
2647
2648         ev = tevent_context_init(mem_ctx);
2649         if (ev == NULL) {
2650                 fprintf(stderr, "Memory error\n");
2651                 exit(1);
2652         }
2653
2654         fd = socket_init(options.sockpath);
2655         if (fd == -1) {
2656                 exit(1);
2657         }
2658
2659         ret = pipe(pfd);
2660         if (ret != 0) {
2661                 fprintf(stderr, "Failed to create pipe\n");
2662                 cleanup();
2663                 exit(1);
2664         }
2665
2666         pid = fork();
2667         if (pid == -1) {
2668                 fprintf(stderr, "Failed to fork\n");
2669                 cleanup();
2670                 exit(1);
2671         }
2672
2673         if (pid == 0) {
2674                 /* Child */
2675                 close(pfd[0]);
2676                 start_server(mem_ctx, ev, ctdb, fd, pfd[1]);
2677                 exit(1);
2678         }
2679
2680         /* Parent */
2681         close(pfd[1]);
2682
2683         len = read(pfd[0], &ret, sizeof(ret));
2684         close(pfd[0]);
2685         if (len != sizeof(ret)) {
2686                 fprintf(stderr, "len = %zi\n", len);
2687                 fprintf(stderr, "Failed to get message from child\n");
2688                 kill(pid, SIGTERM);
2689                 exit(1);
2690         }
2691
2692         fp = fopen(options.pidfile, "w");
2693         if (fp == NULL) {
2694                 fprintf(stderr, "Failed to open pid file %s\n",
2695                         options.pidfile);
2696                 kill(pid, SIGTERM);
2697                 exit(1);
2698         }
2699         fprintf(fp, "%d\n", pid);
2700         fclose(fp);
2701
2702         return 0;
2703 }