ctdb-tests: Check all connections from a process in CHECK_PID_SRVID control
[samba.git] / ctdb / tests / src / fake_ctdbd.c
1 /*
2    Fake CTDB server for testing
3
4    Copyright (C) Amitay Isaacs  2016
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "replace.h"
21 #include "system/network.h"
22 #include "system/time.h"
23
24 #include <popt.h>
25 #include <talloc.h>
26 #include <tevent.h>
27 #include <tdb.h>
28
29 #include "lib/util/dlinklist.h"
30 #include "lib/util/tevent_unix.h"
31 #include "lib/util/debug.h"
32 #include "lib/util/samba_util.h"
33 #include "lib/async_req/async_sock.h"
34
35 #include "protocol/protocol.h"
36 #include "protocol/protocol_api.h"
37 #include "protocol/protocol_util.h"
38
39 #include "common/comm.h"
40 #include "common/system.h"
41 #include "common/logging.h"
42 #include "common/tunable.h"
43 #include "common/srvid.h"
44
45 #include "ipalloc_read_known_ips.h"
46
47
48 #define CTDB_PORT 4379
49
50 /* A fake flag that is only supported by some functions */
51 #define NODE_FLAGS_FAKE_TIMEOUT 0x80000000
52
53 struct node {
54         ctdb_sock_addr addr;
55         uint32_t pnn;
56         uint32_t flags;
57         uint32_t capabilities;
58         bool recovery_disabled;
59         void *recovery_substate;
60 };
61
62 struct node_map {
63         uint32_t num_nodes;
64         struct node *node;
65         uint32_t pnn;
66         uint32_t recmaster;
67 };
68
69 struct interface {
70         const char *name;
71         bool link_up;
72         uint32_t references;
73 };
74
75 struct interface_map {
76         int num;
77         struct interface *iface;
78 };
79
80 struct vnn_map {
81         uint32_t recmode;
82         uint32_t generation;
83         uint32_t size;
84         uint32_t *map;
85 };
86
87 struct database {
88         const char *name;
89         uint32_t id;
90         uint8_t flags;
91         uint64_t seq_num;
92 };
93
94 struct database_map {
95         int num_dbs;
96         struct database *db;
97 };
98
99 struct fake_control_failure {
100         struct fake_control_failure  *prev, *next;
101         enum ctdb_controls opcode;
102         uint32_t pnn;
103         const char *error;
104         const char *comment;
105 };
106
107 struct ctdb_client {
108         struct ctdb_client *prev, *next;
109         struct ctdbd_context *ctdb;
110         pid_t pid;
111         void *state;
112 };
113
114 struct ctdbd_context {
115         struct node_map *node_map;
116         struct interface_map *iface_map;
117         struct vnn_map *vnn_map;
118         struct database_map *db_map;
119         struct srvid_context *srv;
120         int num_clients;
121         struct timeval start_time;
122         struct timeval recovery_start_time;
123         struct timeval recovery_end_time;
124         bool takeover_disabled;
125         int log_level;
126         enum ctdb_runstate runstate;
127         struct ctdb_tunable_list tun_list;
128         char *reclock;
129         struct ctdb_public_ip_list *known_ips;
130         struct fake_control_failure *control_failures;
131         struct ctdb_client *client_list;
132 };
133
134 /*
135  * Parse routines
136  */
137
138 static struct node_map *nodemap_init(TALLOC_CTX *mem_ctx)
139 {
140         struct node_map *node_map;
141
142         node_map = talloc_zero(mem_ctx, struct node_map);
143         if (node_map == NULL) {
144                 return NULL;
145         }
146
147         node_map->pnn = CTDB_UNKNOWN_PNN;
148         node_map->recmaster = CTDB_UNKNOWN_PNN;
149
150         return node_map;
151 }
152
153 /* Read a nodemap from stdin.  Each line looks like:
154  *  <PNN> <FLAGS> [RECMASTER] [CURRENT] [CAPABILITIES]
155  * EOF or a blank line terminates input.
156  *
157  * By default, capablities for each node are
158  * CTDB_CAP_RECMASTER|CTDB_CAP_LMASTER.  These 2
159  * capabilities can be faked off by adding, for example,
160  * -CTDB_CAP_RECMASTER.
161  */
162
163 static bool nodemap_parse(struct node_map *node_map)
164 {
165         char line[1024];
166
167         while ((fgets(line, sizeof(line), stdin) != NULL)) {
168                 uint32_t pnn, flags, capabilities;
169                 char *tok, *t;
170                 char *ip;
171                 ctdb_sock_addr saddr;
172                 struct node *node;
173
174                 if (line[0] == '\n') {
175                         break;
176                 }
177
178                 /* Get rid of pesky newline */
179                 if ((t = strchr(line, '\n')) != NULL) {
180                         *t = '\0';
181                 }
182
183                 /* Get PNN */
184                 tok = strtok(line, " \t");
185                 if (tok == NULL) {
186                         fprintf(stderr, "bad line (%s) - missing PNN\n", line);
187                         continue;
188                 }
189                 pnn = (uint32_t)strtoul(tok, NULL, 0);
190
191                 /* Get IP */
192                 tok = strtok(NULL, " \t");
193                 if (tok == NULL) {
194                         fprintf(stderr, "bad line (%s) - missing IP\n", line);
195                         continue;
196                 }
197                 if (!parse_ip(tok, NULL, CTDB_PORT, &saddr)) {
198                         fprintf(stderr, "bad line (%s) - invalid IP\n", line);
199                         continue;
200                 }
201                 ip = talloc_strdup(node_map, tok);
202                 if (ip == NULL) {
203                         goto fail;
204                 }
205
206                 /* Get flags */
207                 tok = strtok(NULL, " \t");
208                 if (tok == NULL) {
209                         fprintf(stderr, "bad line (%s) - missing flags\n",
210                                 line);
211                         continue;
212                 }
213                 flags = (uint32_t)strtoul(tok, NULL, 0);
214                 /* Handle deleted nodes */
215                 if (flags & NODE_FLAGS_DELETED) {
216                         talloc_free(ip);
217                         ip = talloc_strdup(node_map, "0.0.0.0");
218                         if (ip == NULL) {
219                                 goto fail;
220                         }
221                 }
222                 capabilities = CTDB_CAP_RECMASTER|CTDB_CAP_LMASTER;
223
224                 tok = strtok(NULL, " \t");
225                 while (tok != NULL) {
226                         if (strcmp(tok, "CURRENT") == 0) {
227                                 node_map->pnn = pnn;
228                         } else if (strcmp(tok, "RECMASTER") == 0) {
229                                 node_map->recmaster = pnn;
230                         } else if (strcmp(tok, "-CTDB_CAP_RECMASTER") == 0) {
231                                 capabilities &= ~CTDB_CAP_RECMASTER;
232                         } else if (strcmp(tok, "-CTDB_CAP_LMASTER") == 0) {
233                                 capabilities &= ~CTDB_CAP_LMASTER;
234                         } else if (strcmp(tok, "TIMEOUT") == 0) {
235                                 /* This can be done with just a flag
236                                  * value but it is probably clearer
237                                  * and less error-prone to fake this
238                                  * with an explicit token */
239                                 flags |= NODE_FLAGS_FAKE_TIMEOUT;
240                         }
241                         tok = strtok(NULL, " \t");
242                 }
243
244                 node_map->node = talloc_realloc(node_map, node_map->node,
245                                                 struct node,
246                                                 node_map->num_nodes + 1);
247                 if (node_map->node == NULL) {
248                         goto fail;
249                 }
250                 node = &node_map->node[node_map->num_nodes];
251
252                 parse_ip(ip, NULL, CTDB_PORT, &node->addr);
253                 node->pnn = pnn;
254                 node->flags = flags;
255                 node->capabilities = capabilities;
256                 node->recovery_disabled = false;
257                 node->recovery_substate = NULL;
258
259                 node_map->num_nodes += 1;
260         }
261
262         DEBUG(DEBUG_INFO, ("Parsing nodemap done\n"));
263         return true;
264
265 fail:
266         DEBUG(DEBUG_INFO, ("Parsing nodemap failed\n"));
267         return false;
268
269 }
270
271 /* Append a node to a node map with given address and flags */
272 static bool node_map_add(struct ctdb_node_map *nodemap,
273                          const char *nstr, uint32_t flags)
274 {
275         ctdb_sock_addr addr;
276         uint32_t num;
277         struct ctdb_node_and_flags *n;
278
279         if (! parse_ip(nstr, NULL, CTDB_PORT, &addr)) {
280                 fprintf(stderr, "Invalid IP address %s\n", nstr);
281                 return false;
282         }
283
284         num = nodemap->num;
285         nodemap->node = talloc_realloc(nodemap, nodemap->node,
286                                        struct ctdb_node_and_flags, num+1);
287         if (nodemap->node == NULL) {
288                 return false;
289         }
290
291         n = &nodemap->node[num];
292         n->addr = addr;
293         n->pnn = num;
294         n->flags = flags;
295
296         nodemap->num = num+1;
297         return true;
298 }
299
300 /* Read a nodes file into a node map */
301 static struct ctdb_node_map *ctdb_read_nodes_file(TALLOC_CTX *mem_ctx,
302                                                   const char *nlist)
303 {
304         char **lines;
305         int nlines;
306         int i;
307         struct ctdb_node_map *nodemap;
308
309         nodemap = talloc_zero(mem_ctx, struct ctdb_node_map);
310         if (nodemap == NULL) {
311                 return NULL;
312         }
313
314         lines = file_lines_load(nlist, &nlines, 0, mem_ctx);
315         if (lines == NULL) {
316                 return NULL;
317         }
318
319         while (nlines > 0 && strcmp(lines[nlines-1], "") == 0) {
320                 nlines--;
321         }
322
323         for (i=0; i<nlines; i++) {
324                 char *node;
325                 uint32_t flags;
326                 size_t len;
327
328                 node = lines[i];
329                 /* strip leading spaces */
330                 while((*node == ' ') || (*node == '\t')) {
331                         node++;
332                 }
333
334                 len = strlen(node);
335
336                 /* strip trailing spaces */
337                 while ((len > 1) &&
338                        ((node[len-1] == ' ') || (node[len-1] == '\t')))
339                 {
340                         node[len-1] = '\0';
341                         len--;
342                 }
343
344                 if (len == 0) {
345                         continue;
346                 }
347                 if (*node == '#') {
348                         /* A "deleted" node is a node that is
349                            commented out in the nodes file.  This is
350                            used instead of removing a line, which
351                            would cause subsequent nodes to change
352                            their PNN. */
353                         flags = NODE_FLAGS_DELETED;
354                         node = discard_const("0.0.0.0");
355                 } else {
356                         flags = 0;
357                 }
358                 if (! node_map_add(nodemap, node, flags)) {
359                         talloc_free(lines);
360                         TALLOC_FREE(nodemap);
361                         return NULL;
362                 }
363         }
364
365         talloc_free(lines);
366         return nodemap;
367 }
368
369 static struct ctdb_node_map *read_nodes_file(TALLOC_CTX *mem_ctx,
370                                              uint32_t pnn)
371 {
372         struct ctdb_node_map *nodemap;
373         char nodepath[PATH_MAX];
374         const char *nodes_list;
375
376         /* read the nodes file */
377         sprintf(nodepath, "CTDB_NODES_%u", pnn);
378         nodes_list = getenv(nodepath);
379         if (nodes_list == NULL) {
380                 nodes_list = getenv("CTDB_NODES");
381                 if (nodes_list == NULL) {
382                         DEBUG(DEBUG_INFO, ("Nodes file not defined\n"));
383                         return NULL;
384                 }
385         }
386
387         nodemap = ctdb_read_nodes_file(mem_ctx, nodes_list);
388         if (nodemap == NULL) {
389                 DEBUG(DEBUG_INFO, ("Failed to read nodes file \"%s\"\n",
390                                    nodes_list));
391                 return NULL;
392         }
393
394         return nodemap;
395 }
396
397 static struct interface_map *interfaces_init(TALLOC_CTX *mem_ctx)
398 {
399         struct interface_map *iface_map;
400
401         iface_map = talloc_zero(mem_ctx, struct interface_map);
402         if (iface_map == NULL) {
403                 return NULL;
404         }
405
406         return iface_map;
407 }
408
409 /* Read interfaces information.  Same format as "ctdb ifaces -Y"
410  * output:
411  *   :Name:LinkStatus:References:
412  *   :eth2:1:4294967294
413  *   :eth1:1:4294967292
414  */
415
416 static bool interfaces_parse(struct interface_map *iface_map)
417 {
418         char line[1024];
419
420         while ((fgets(line, sizeof(line), stdin) != NULL)) {
421                 uint16_t link_state;
422                 uint32_t references;
423                 char *tok, *t, *name;
424                 struct interface *iface;
425
426                 if (line[0] == '\n') {
427                         break;
428                 }
429
430                 /* Get rid of pesky newline */
431                 if ((t = strchr(line, '\n')) != NULL) {
432                         *t = '\0';
433                 }
434
435                 if (strcmp(line, ":Name:LinkStatus:References:") == 0) {
436                         continue;
437                 }
438
439                 /* Leading colon... */
440                 // tok = strtok(line, ":");
441
442                 /* name */
443                 tok = strtok(line, ":");
444                 if (tok == NULL) {
445                         fprintf(stderr, "bad line (%s) - missing name\n", line);
446                         continue;
447                 }
448                 name = tok;
449
450                 /* link_state */
451                 tok = strtok(NULL, ":");
452                 if (tok == NULL) {
453                         fprintf(stderr, "bad line (%s) - missing link state\n",
454                                 line);
455                         continue;
456                 }
457                 link_state = (uint16_t)strtoul(tok, NULL, 0);
458
459                 /* references... */
460                 tok = strtok(NULL, ":");
461                 if (tok == NULL) {
462                         fprintf(stderr, "bad line (%s) - missing references\n",
463                                 line);
464                         continue;
465                 }
466                 references = (uint32_t)strtoul(tok, NULL, 0);
467
468                 iface_map->iface = talloc_realloc(iface_map, iface_map->iface,
469                                                   struct interface,
470                                                   iface_map->num + 1);
471                 if (iface_map->iface == NULL) {
472                         goto fail;
473                 }
474
475                 iface = &iface_map->iface[iface_map->num];
476
477                 iface->name = talloc_strdup(iface_map, name);
478                 if (iface->name == NULL) {
479                         goto fail;
480                 }
481                 iface->link_up = link_state;
482                 iface->references = references;
483
484                 iface_map->num += 1;
485         }
486
487         DEBUG(DEBUG_INFO, ("Parsing interfaces done\n"));
488         return true;
489
490 fail:
491         fprintf(stderr, "Parsing interfaces failed\n");
492         return false;
493 }
494
495 static struct vnn_map *vnnmap_init(TALLOC_CTX *mem_ctx)
496 {
497         struct vnn_map *vnn_map;
498
499         vnn_map = talloc_zero(mem_ctx, struct vnn_map);
500         if (vnn_map == NULL) {
501                 fprintf(stderr, "Memory error\n");
502                 return NULL;
503         }
504         vnn_map->recmode = CTDB_RECOVERY_ACTIVE;
505         vnn_map->generation = INVALID_GENERATION;
506
507         return vnn_map;
508 }
509
510 /* Read vnn map.
511  * output:
512  *   <GENERATION>
513  *   <LMASTER0>
514  *   <LMASTER1>
515  *   ...
516  */
517
518 static bool vnnmap_parse(struct vnn_map *vnn_map)
519 {
520         char line[1024];
521
522         while (fgets(line, sizeof(line), stdin) != NULL) {
523                 uint32_t n;
524                 char *t;
525
526                 if (line[0] == '\n') {
527                         break;
528                 }
529
530                 /* Get rid of pesky newline */
531                 if ((t = strchr(line, '\n')) != NULL) {
532                         *t = '\0';
533                 }
534
535                 n = (uint32_t) strtol(line, NULL, 0);
536
537                 /* generation */
538                 if (vnn_map->generation == INVALID_GENERATION) {
539                         vnn_map->generation = n;
540                         continue;
541                 }
542
543                 vnn_map->map = talloc_realloc(vnn_map, vnn_map->map, uint32_t,
544                                               vnn_map->size + 1);
545                 if (vnn_map->map == NULL) {
546                         fprintf(stderr, "Memory error\n");
547                         goto fail;
548                 }
549
550                 vnn_map->map[vnn_map->size] = n;
551                 vnn_map->size += 1;
552         }
553
554         DEBUG(DEBUG_INFO, ("Parsing vnnmap done\n"));
555         return true;
556
557 fail:
558         fprintf(stderr, "Parsing vnnmap failed\n");
559         return false;
560 }
561
562 static bool reclock_parse(struct ctdbd_context *ctdb)
563 {
564         char line[1024];
565         char *t;
566
567         if (fgets(line, sizeof(line), stdin) == NULL) {
568                 goto fail;
569         }
570
571         if (line[0] == '\n') {
572                 /* Recovery lock remains unset */
573                 goto ok;
574         }
575
576         /* Get rid of pesky newline */
577         if ((t = strchr(line, '\n')) != NULL) {
578                 *t = '\0';
579         }
580
581         ctdb->reclock = talloc_strdup(ctdb, line);
582         if (ctdb->reclock == NULL) {
583                 goto fail;
584         }
585 ok:
586         /* Swallow possible blank line following section.  Picky
587          * compiler settings don't allow the return value to be
588          * ignored, so make the compiler happy.
589          */
590         if (fgets(line, sizeof(line), stdin) == NULL) {
591                 ;
592         }
593         DEBUG(DEBUG_INFO, ("Parsing reclock done\n"));
594         return true;
595
596 fail:
597         fprintf(stderr, "Parsing reclock failed\n");
598         return false;
599 }
600
601 static struct database_map *dbmap_init(TALLOC_CTX *mem_ctx)
602 {
603         struct database_map *db_map;
604
605         db_map = talloc_zero(mem_ctx, struct database_map);
606         if (db_map == NULL) {
607                 return NULL;
608         }
609
610         return db_map;
611 }
612
613 /* Read a database map from stdin.  Each line looks like:
614  *  <ID> <NAME> [FLAGS] [SEQ_NUM]
615  * EOF or a blank line terminates input.
616  *
617  * By default, flags and seq_num are 0
618  */
619
620 static bool dbmap_parse(struct database_map *db_map)
621 {
622         char line[1024];
623
624         while ((fgets(line, sizeof(line), stdin) != NULL)) {
625                 uint32_t id;
626                 uint8_t flags = 0;
627                 uint32_t seq_num = 0;
628                 char *tok, *t;
629                 char *name;
630                 struct database *db;
631
632                 if (line[0] == '\n') {
633                         break;
634                 }
635
636                 /* Get rid of pesky newline */
637                 if ((t = strchr(line, '\n')) != NULL) {
638                         *t = '\0';
639                 }
640
641                 /* Get ID */
642                 tok = strtok(line, " \t");
643                 if (tok == NULL) {
644                         fprintf(stderr, "bad line (%s) - missing ID\n", line);
645                         continue;
646                 }
647                 id = (uint32_t)strtoul(tok, NULL, 0);
648
649                 /* Get NAME */
650                 tok = strtok(NULL, " \t");
651                 if (tok == NULL) {
652                         fprintf(stderr, "bad line (%s) - missing NAME\n", line);
653                         continue;
654                 }
655                 name = talloc_strdup(db_map, tok);
656                 if (name == NULL) {
657                         goto fail;
658                 }
659
660                 /* Get flags */
661                 tok = strtok(NULL, " \t");
662                 while (tok != NULL) {
663                         if (strcmp(tok, "PERSISTENT") == 0) {
664                                 flags |= CTDB_DB_FLAGS_PERSISTENT;
665                         } else if (strcmp(tok, "STICKY") == 0) {
666                                 flags |= CTDB_DB_FLAGS_STICKY;
667                         } else if (strcmp(tok, "READONLY") == 0) {
668                                 flags |= CTDB_DB_FLAGS_READONLY;
669                         } else if (strcmp(tok, "REPLICATED") == 0) {
670                                 flags |= CTDB_DB_FLAGS_REPLICATED;
671                         } else if (tok[0] >= '0'&& tok[0] <= '9') {
672                                 uint8_t nv = CTDB_DB_FLAGS_PERSISTENT |
673                                              CTDB_DB_FLAGS_REPLICATED;
674
675                                 if ((flags & nv) == 0) {
676                                         fprintf(stderr,
677                                                 "seq_num for volatile db\n");
678                                         goto fail;
679                                 }
680                                 seq_num = (uint64_t)strtoull(tok, NULL, 0);
681                         }
682
683                         tok = strtok(NULL, " \t");
684                 }
685
686                 db_map->db = talloc_realloc(db_map, db_map->db,
687                                             struct database,
688                                             db_map->num_dbs + 1);
689                 if (db_map->db == NULL) {
690                         goto fail;
691                 }
692                 db = &db_map->db[db_map->num_dbs];
693
694                 db->id = id;
695                 db->name = name;
696                 db->flags = flags;
697                 db->seq_num = seq_num;
698
699                 db_map->num_dbs += 1;
700         }
701
702         DEBUG(DEBUG_INFO, ("Parsing dbmap done\n"));
703         return true;
704
705 fail:
706         DEBUG(DEBUG_INFO, ("Parsing dbmap failed\n"));
707         return false;
708
709 }
710
711 static struct database *database_find(struct database_map *map,
712                                       uint32_t db_id)
713 {
714         int i;
715
716         for (i = 0; i < map->num_dbs; i++) {
717                 struct database *db = &map->db[i];
718
719                 if (db->id == db_id) {
720                         return db;
721                 }
722         }
723
724         return NULL;
725 }
726
727 static bool public_ips_parse(struct ctdbd_context *ctdb,
728                              uint32_t numnodes)
729 {
730         if (numnodes == 0) {
731                 D_ERR("Must initialise nodemap before public IPs\n");
732                 return false;
733         }
734
735         ctdb->known_ips = ipalloc_read_known_ips(ctdb, numnodes, false);
736
737         return (ctdb->known_ips != NULL);
738 }
739
740 /* Read information about controls to fail.  Format is:
741  *   <opcode> <pnn> {ERROR|TIMEOUT} <comment>
742  */
743 static bool control_failures_parse(struct ctdbd_context *ctdb)
744 {
745         char line[1024];
746
747         while ((fgets(line, sizeof(line), stdin) != NULL)) {
748                 char *tok, *t;
749                 enum ctdb_controls opcode;
750                 uint32_t pnn;
751                 const char *error;
752                 const char *comment;
753                 struct fake_control_failure *failure = NULL;
754
755                 if (line[0] == '\n') {
756                         break;
757                 }
758
759                 /* Get rid of pesky newline */
760                 if ((t = strchr(line, '\n')) != NULL) {
761                         *t = '\0';
762                 }
763
764                 /* Get opcode */
765                 tok = strtok(line, " \t");
766                 if (tok == NULL) {
767                         D_ERR("bad line (%s) - missing opcode\n", line);
768                         continue;
769                 }
770                 opcode = (enum ctdb_controls)strtoul(tok, NULL, 0);
771
772                 /* Get PNN */
773                 tok = strtok(NULL, " \t");
774                 if (tok == NULL) {
775                         D_ERR("bad line (%s) - missing PNN\n", line);
776                         continue;
777                 }
778                 pnn = (uint32_t)strtoul(tok, NULL, 0);
779
780                 /* Get error */
781                 tok = strtok(NULL, " \t");
782                 if (tok == NULL) {
783                         D_ERR("bad line (%s) - missing errno\n", line);
784                         continue;
785                 }
786                 error = talloc_strdup(ctdb, tok);
787                 if (error == NULL) {
788                         goto fail;
789                 }
790                 if (strcmp(error, "ERROR") != 0 &&
791                     strcmp(error, "TIMEOUT") != 0) {
792                         D_ERR("bad line (%s) "
793                               "- error must be \"ERROR\" or \"TIMEOUT\"\n",
794                               line);
795                         goto fail;
796                 }
797
798                 /* Get comment */
799                 tok = strtok(NULL, "\n"); /* rest of line */
800                 if (tok == NULL) {
801                         D_ERR("bad line (%s) - missing comment\n", line);
802                         continue;
803                 }
804                 comment = talloc_strdup(ctdb, tok);
805                 if (comment == NULL) {
806                         goto fail;
807                 }
808
809                 failure = talloc_zero(ctdb, struct fake_control_failure);
810                 if (failure == NULL) {
811                         goto fail;
812                 }
813
814                 failure->opcode = opcode;
815                 failure->pnn = pnn;
816                 failure->error = error;
817                 failure->comment = comment;
818
819                 DLIST_ADD(ctdb->control_failures, failure);
820         }
821
822         D_INFO("Parsing fake control failures done\n");
823         return true;
824
825 fail:
826         D_INFO("Parsing fake control failures failed\n");
827         return false;
828 }
829
830 /*
831  * Manage clients
832  */
833
834 static int ctdb_client_destructor(struct ctdb_client *client)
835 {
836         DLIST_REMOVE(client->ctdb->client_list, client);
837         return 0;
838 }
839
840 static int client_add(struct ctdbd_context *ctdb, pid_t client_pid,
841                       void *client_state)
842 {
843         struct ctdb_client *client;
844
845         client = talloc_zero(client_state, struct ctdb_client);
846         if (client == NULL) {
847                 return ENOMEM;
848         }
849
850         client->ctdb = ctdb;
851         client->pid = client_pid;
852         client->state = client_state;
853
854         DLIST_ADD(ctdb->client_list, client);
855         talloc_set_destructor(client, ctdb_client_destructor);
856         return 0;
857 }
858
859 static void *client_find(struct ctdbd_context *ctdb, pid_t client_pid)
860 {
861         struct ctdb_client *client;
862
863         for (client=ctdb->client_list; client != NULL; client=client->next) {
864                 if (client->pid == client_pid) {
865                         return client->state;
866                 }
867         }
868
869         return NULL;
870 }
871
872 /*
873  * CTDB context setup
874  */
875
876 static uint32_t new_generation(uint32_t old_generation)
877 {
878         uint32_t generation;
879
880         while (1) {
881                 generation = random();
882                 if (generation != INVALID_GENERATION &&
883                     generation != old_generation) {
884                         break;
885                 }
886         }
887
888         return generation;
889 }
890
891 static struct ctdbd_context *ctdbd_setup(TALLOC_CTX *mem_ctx)
892 {
893         struct ctdbd_context *ctdb;
894         char line[1024];
895         bool status;
896         int ret;
897
898         ctdb = talloc_zero(mem_ctx, struct ctdbd_context);
899         if (ctdb == NULL) {
900                 return NULL;
901         }
902
903         ctdb->node_map = nodemap_init(ctdb);
904         if (ctdb->node_map == NULL) {
905                 goto fail;
906         }
907
908         ctdb->iface_map = interfaces_init(ctdb);
909         if (ctdb->iface_map == NULL) {
910                 goto fail;
911         }
912
913         ctdb->vnn_map = vnnmap_init(ctdb);
914         if (ctdb->vnn_map == NULL) {
915                 goto fail;
916         }
917
918         ctdb->db_map = dbmap_init(ctdb);
919         if (ctdb->db_map == NULL) {
920                 goto fail;
921         }
922
923         ret = srvid_init(ctdb, &ctdb->srv);
924         if (ret != 0) {
925                 goto fail;
926         }
927
928         while (fgets(line, sizeof(line), stdin) != NULL) {
929                 char *t;
930
931                 if ((t = strchr(line, '\n')) != NULL) {
932                         *t = '\0';
933                 }
934
935                 if (strcmp(line, "NODEMAP") == 0) {
936                         status = nodemap_parse(ctdb->node_map);
937                 } else if (strcmp(line, "IFACES") == 0) {
938                         status = interfaces_parse(ctdb->iface_map);
939                 } else if (strcmp(line, "VNNMAP") == 0) {
940                         status = vnnmap_parse(ctdb->vnn_map);
941                 } else if (strcmp(line, "DBMAP") == 0) {
942                         status = dbmap_parse(ctdb->db_map);
943                 } else if (strcmp(line, "PUBLICIPS") == 0) {
944                         status = public_ips_parse(ctdb,
945                                                   ctdb->node_map->num_nodes);
946                 } else if (strcmp(line, "RECLOCK") == 0) {
947                         status = reclock_parse(ctdb);
948                 } else if (strcmp(line, "CONTROLFAILS") == 0) {
949                         status = control_failures_parse(ctdb);
950                 } else {
951                         fprintf(stderr, "Unknown line %s\n", line);
952                         status = false;
953                 }
954
955                 if (! status) {
956                         goto fail;
957                 }
958         }
959
960         ctdb->start_time = tevent_timeval_current();
961         ctdb->recovery_start_time = tevent_timeval_current();
962         ctdb->vnn_map->recmode = CTDB_RECOVERY_NORMAL;
963         if (ctdb->vnn_map->generation == INVALID_GENERATION) {
964                 ctdb->vnn_map->generation =
965                         new_generation(ctdb->vnn_map->generation);
966         }
967         ctdb->recovery_end_time = tevent_timeval_current();
968
969         ctdb->log_level = DEBUG_ERR;
970         ctdb->runstate = CTDB_RUNSTATE_RUNNING;
971
972         ctdb_tunable_set_defaults(&ctdb->tun_list);
973
974         return ctdb;
975
976 fail:
977         TALLOC_FREE(ctdb);
978         return NULL;
979 }
980
981 static bool ctdbd_verify(struct ctdbd_context *ctdb)
982 {
983         struct node *node;
984         int i;
985
986         if (ctdb->node_map->num_nodes == 0) {
987                 return true;
988         }
989
990         /* Make sure all the nodes are in order */
991         for (i=0; i<ctdb->node_map->num_nodes; i++) {
992                 node = &ctdb->node_map->node[i];
993                 if (node->pnn != i) {
994                         fprintf(stderr, "Expected node %u, found %u\n",
995                                 i, node->pnn);
996                         return false;
997                 }
998         }
999
1000         node = &ctdb->node_map->node[ctdb->node_map->pnn];
1001         if (node->flags & NODE_FLAGS_DISCONNECTED) {
1002                 DEBUG(DEBUG_INFO, ("Node disconnected, exiting\n"));
1003                 exit(0);
1004         }
1005
1006         return true;
1007 }
1008
1009 /*
1010  * Doing a recovery
1011  */
1012
1013 struct recover_state {
1014         struct tevent_context *ev;
1015         struct ctdbd_context *ctdb;
1016 };
1017
1018 static int recover_check(struct tevent_req *req);
1019 static void recover_wait_done(struct tevent_req *subreq);
1020 static void recover_done(struct tevent_req *subreq);
1021
1022 static struct tevent_req *recover_send(TALLOC_CTX *mem_ctx,
1023                                        struct tevent_context *ev,
1024                                        struct ctdbd_context *ctdb)
1025 {
1026         struct tevent_req *req;
1027         struct recover_state *state;
1028         int ret;
1029
1030         req = tevent_req_create(mem_ctx, &state, struct recover_state);
1031         if (req == NULL) {
1032                 return NULL;
1033         }
1034
1035         state->ev = ev;
1036         state->ctdb = ctdb;
1037
1038         ret = recover_check(req);
1039         if (ret != 0) {
1040                 tevent_req_error(req, ret);
1041                 return tevent_req_post(req, ev);
1042         }
1043
1044         return req;
1045 }
1046
1047 static int recover_check(struct tevent_req *req)
1048 {
1049         struct recover_state *state = tevent_req_data(
1050                 req, struct recover_state);
1051         struct ctdbd_context *ctdb = state->ctdb;
1052         struct tevent_req *subreq;
1053         bool recovery_disabled;
1054         int i;
1055
1056         recovery_disabled = false;
1057         for (i=0; i<ctdb->node_map->num_nodes; i++) {
1058                 if (ctdb->node_map->node[i].recovery_disabled) {
1059                         recovery_disabled = true;
1060                         break;
1061                 }
1062         }
1063
1064         subreq = tevent_wakeup_send(state, state->ev,
1065                                     tevent_timeval_current_ofs(1, 0));
1066         if (subreq == NULL) {
1067                 return ENOMEM;
1068         }
1069
1070         if (recovery_disabled) {
1071                 tevent_req_set_callback(subreq, recover_wait_done, req);
1072         } else {
1073                 ctdb->recovery_start_time = tevent_timeval_current();
1074                 tevent_req_set_callback(subreq, recover_done, req);
1075         }
1076
1077         return 0;
1078 }
1079
1080 static void recover_wait_done(struct tevent_req *subreq)
1081 {
1082         struct tevent_req *req = tevent_req_callback_data(
1083                 subreq, struct tevent_req);
1084         int ret;
1085         bool status;
1086
1087         status = tevent_wakeup_recv(subreq);
1088         TALLOC_FREE(subreq);
1089         if (! status) {
1090                 tevent_req_error(req, EIO);
1091                 return;
1092         }
1093
1094         ret = recover_check(req);
1095         if (ret != 0) {
1096                 tevent_req_error(req, ret);
1097         }
1098 }
1099
1100 static void recover_done(struct tevent_req *subreq)
1101 {
1102         struct tevent_req *req = tevent_req_callback_data(
1103                 subreq, struct tevent_req);
1104         struct recover_state *state = tevent_req_data(
1105                 req, struct recover_state);
1106         struct ctdbd_context *ctdb = state->ctdb;
1107         bool status;
1108
1109         status = tevent_wakeup_recv(subreq);
1110         TALLOC_FREE(subreq);
1111         if (! status) {
1112                 tevent_req_error(req, EIO);
1113                 return;
1114         }
1115
1116         ctdb->vnn_map->recmode = CTDB_RECOVERY_NORMAL;
1117         ctdb->recovery_end_time = tevent_timeval_current();
1118         ctdb->vnn_map->generation = new_generation(ctdb->vnn_map->generation);
1119
1120         tevent_req_done(req);
1121 }
1122
1123 static bool recover_recv(struct tevent_req *req, int *perr)
1124 {
1125         int err;
1126
1127         if (tevent_req_is_unix_error(req, &err)) {
1128                 if (perr != NULL) {
1129                         *perr = err;
1130                 }
1131                 return false;
1132         }
1133
1134         return true;
1135 }
1136
1137 /*
1138  * Routines for ctdb_req_header
1139  */
1140
1141 static void header_fix_pnn(struct ctdb_req_header *header,
1142                            struct ctdbd_context *ctdb)
1143 {
1144         if (header->srcnode == CTDB_CURRENT_NODE) {
1145                 header->srcnode = ctdb->node_map->pnn;
1146         }
1147
1148         if (header->destnode == CTDB_CURRENT_NODE) {
1149                 header->destnode = ctdb->node_map->pnn;
1150         }
1151 }
1152
1153 static struct ctdb_req_header header_reply_control(
1154                                         struct ctdb_req_header *header,
1155                                         struct ctdbd_context *ctdb)
1156 {
1157         struct ctdb_req_header reply_header;
1158
1159         reply_header = (struct ctdb_req_header) {
1160                 .ctdb_magic = CTDB_MAGIC,
1161                 .ctdb_version = CTDB_PROTOCOL,
1162                 .generation = ctdb->vnn_map->generation,
1163                 .operation = CTDB_REPLY_CONTROL,
1164                 .destnode = header->srcnode,
1165                 .srcnode = header->destnode,
1166                 .reqid = header->reqid,
1167         };
1168
1169         return reply_header;
1170 }
1171
1172 static struct ctdb_req_header header_reply_message(
1173                                         struct ctdb_req_header *header,
1174                                         struct ctdbd_context *ctdb)
1175 {
1176         struct ctdb_req_header reply_header;
1177
1178         reply_header = (struct ctdb_req_header) {
1179                 .ctdb_magic = CTDB_MAGIC,
1180                 .ctdb_version = CTDB_PROTOCOL,
1181                 .generation = ctdb->vnn_map->generation,
1182                 .operation = CTDB_REQ_MESSAGE,
1183                 .destnode = header->srcnode,
1184                 .srcnode = header->destnode,
1185                 .reqid = 0,
1186         };
1187
1188         return reply_header;
1189 }
1190
1191 /*
1192  * Client state
1193  */
1194
1195 struct client_state {
1196         struct tevent_context *ev;
1197         int fd;
1198         struct ctdbd_context *ctdb;
1199         int pnn;
1200         pid_t pid;
1201         struct comm_context *comm;
1202         struct srvid_register_state *rstate;
1203         int status;
1204 };
1205
1206 /*
1207  * Send replies to controls and messages
1208  */
1209
1210 static void client_reply_done(struct tevent_req *subreq);
1211
1212 static void client_send_message(struct tevent_req *req,
1213                                 struct ctdb_req_header *header,
1214                                 struct ctdb_req_message_data *message)
1215 {
1216         struct client_state *state = tevent_req_data(
1217                 req, struct client_state);
1218         struct ctdbd_context *ctdb = state->ctdb;
1219         struct tevent_req *subreq;
1220         struct ctdb_req_header reply_header;
1221         uint8_t *buf;
1222         size_t datalen, buflen;
1223         int ret;
1224
1225         reply_header = header_reply_message(header, ctdb);
1226
1227         datalen = ctdb_req_message_data_len(&reply_header, message);
1228         ret = ctdb_allocate_pkt(state, datalen, &buf, &buflen);
1229         if (ret != 0) {
1230                 tevent_req_error(req, ret);
1231                 return;
1232         }
1233
1234         ret = ctdb_req_message_data_push(&reply_header, message,
1235                                          buf, &buflen);
1236         if (ret != 0) {
1237                 tevent_req_error(req, ret);
1238                 return;
1239         }
1240
1241         DEBUG(DEBUG_INFO, ("message srvid = 0x%"PRIx64"\n", message->srvid));
1242
1243         subreq = comm_write_send(state, state->ev, state->comm, buf, buflen);
1244         if (tevent_req_nomem(subreq, req)) {
1245                 return;
1246         }
1247         tevent_req_set_callback(subreq, client_reply_done, req);
1248
1249         talloc_steal(subreq, buf);
1250 }
1251
1252 static void client_send_control(struct tevent_req *req,
1253                                 struct ctdb_req_header *header,
1254                                 struct ctdb_reply_control *reply)
1255 {
1256         struct client_state *state = tevent_req_data(
1257                 req, struct client_state);
1258         struct ctdbd_context *ctdb = state->ctdb;
1259         struct tevent_req *subreq;
1260         struct ctdb_req_header reply_header;
1261         uint8_t *buf;
1262         size_t datalen, buflen;
1263         int ret;
1264
1265         reply_header = header_reply_control(header, ctdb);
1266
1267         datalen = ctdb_reply_control_len(&reply_header, reply);
1268         ret = ctdb_allocate_pkt(state, datalen, &buf, &buflen);
1269         if (ret != 0) {
1270                 tevent_req_error(req, ret);
1271                 return;
1272         }
1273
1274         ret = ctdb_reply_control_push(&reply_header, reply, buf, &buflen);
1275         if (ret != 0) {
1276                 tevent_req_error(req, ret);
1277                 return;
1278         }
1279
1280         DEBUG(DEBUG_INFO, ("reply opcode = %u\n", reply->rdata.opcode));
1281
1282         subreq = comm_write_send(state, state->ev, state->comm, buf, buflen);
1283         if (tevent_req_nomem(subreq, req)) {
1284                 return;
1285         }
1286         tevent_req_set_callback(subreq, client_reply_done, req);
1287
1288         talloc_steal(subreq, buf);
1289 }
1290
1291 static void client_reply_done(struct tevent_req *subreq)
1292 {
1293         struct tevent_req *req = tevent_req_callback_data(
1294                 subreq, struct tevent_req);
1295         int ret;
1296         bool status;
1297
1298         status = comm_write_recv(subreq, &ret);
1299         TALLOC_FREE(subreq);
1300         if (! status) {
1301                 tevent_req_error(req, ret);
1302         }
1303 }
1304
1305 /*
1306  * Handling protocol - controls
1307  */
1308
1309 static void control_process_exists(TALLOC_CTX *mem_ctx,
1310                                    struct tevent_req *req,
1311                                    struct ctdb_req_header *header,
1312                                    struct ctdb_req_control *request)
1313 {
1314         struct client_state *state = tevent_req_data(
1315                 req, struct client_state);
1316         struct ctdbd_context *ctdb = state->ctdb;
1317         struct client_state *cstate;
1318         struct ctdb_reply_control reply;
1319
1320         reply.rdata.opcode = request->opcode;
1321
1322         cstate = client_find(ctdb, request->rdata.data.pid);
1323         if (cstate == NULL) {
1324                 reply.status = -1;
1325                 reply.errmsg = "No client for PID";
1326         } else {
1327                 reply.status = kill(request->rdata.data.pid, 0);
1328                 reply.errmsg = NULL;
1329         }
1330
1331         client_send_control(req, header, &reply);
1332 }
1333
1334 static void control_ping(TALLOC_CTX *mem_ctx,
1335                          struct tevent_req *req,
1336                          struct ctdb_req_header *header,
1337                          struct ctdb_req_control *request)
1338 {
1339         struct client_state *state = tevent_req_data(
1340                 req, struct client_state);
1341         struct ctdbd_context *ctdb = state->ctdb;
1342         struct ctdb_reply_control reply;
1343
1344         reply.rdata.opcode = request->opcode;
1345         reply.status = ctdb->num_clients;
1346         reply.errmsg = NULL;
1347
1348         client_send_control(req, header, &reply);
1349 }
1350
1351 static void control_getdbpath(TALLOC_CTX *mem_ctx,
1352                               struct tevent_req *req,
1353                               struct ctdb_req_header *header,
1354                               struct ctdb_req_control *request)
1355 {
1356         struct client_state *state = tevent_req_data(
1357                 req, struct client_state);
1358         struct ctdbd_context *ctdb = state->ctdb;
1359         struct ctdb_reply_control reply;
1360         struct database *db;
1361
1362         reply.rdata.opcode = request->opcode;
1363
1364         db = database_find(ctdb->db_map, request->rdata.data.db_id);
1365         if (db == NULL) {
1366                 reply.status = ENOENT;
1367                 reply.errmsg = "Database not found";
1368         } else {
1369                 const char *base;
1370                 if (db->flags & CTDB_DB_FLAGS_PERSISTENT) {
1371                         base = "/var/lib/ctdb/persistent";
1372                 } else {
1373                         base = "/var/run/ctdb/DB_DIR";
1374                 }
1375                 reply.rdata.data.db_path =
1376                         talloc_asprintf(mem_ctx, "%s/%s.%u",
1377                                         base, db->name, header->destnode);
1378                 if (reply.rdata.data.db_path == NULL) {
1379                         reply.status = ENOMEM;
1380                         reply.errmsg = "Memory error";
1381                 } else {
1382                         reply.status = 0;
1383                         reply.errmsg = NULL;
1384                 }
1385         }
1386
1387         client_send_control(req, header, &reply);
1388 }
1389
1390 static void control_getvnnmap(TALLOC_CTX *mem_ctx,
1391                               struct tevent_req *req,
1392                               struct ctdb_req_header *header,
1393                               struct ctdb_req_control *request)
1394 {
1395         struct client_state *state = tevent_req_data(
1396                 req, struct client_state);
1397         struct ctdbd_context *ctdb = state->ctdb;
1398         struct ctdb_reply_control reply;
1399         struct ctdb_vnn_map *vnnmap;
1400
1401         reply.rdata.opcode = request->opcode;
1402
1403         vnnmap = talloc_zero(mem_ctx, struct ctdb_vnn_map);
1404         if (vnnmap == NULL) {
1405                 reply.status = ENOMEM;
1406                 reply.errmsg = "Memory error";
1407         } else {
1408                 vnnmap->generation = ctdb->vnn_map->generation;
1409                 vnnmap->size = ctdb->vnn_map->size;
1410                 vnnmap->map = ctdb->vnn_map->map;
1411
1412                 reply.rdata.data.vnnmap = vnnmap;
1413                 reply.status = 0;
1414                 reply.errmsg = NULL;
1415         }
1416
1417         client_send_control(req, header, &reply);
1418 }
1419
1420 static void control_get_debug(TALLOC_CTX *mem_ctx,
1421                               struct tevent_req *req,
1422                               struct ctdb_req_header *header,
1423                               struct ctdb_req_control *request)
1424 {
1425         struct client_state *state = tevent_req_data(
1426                 req, struct client_state);
1427         struct ctdbd_context *ctdb = state->ctdb;
1428         struct ctdb_reply_control reply;
1429
1430         reply.rdata.opcode = request->opcode;
1431         reply.rdata.data.loglevel = (uint32_t)ctdb->log_level;
1432         reply.status = 0;
1433         reply.errmsg = NULL;
1434
1435         client_send_control(req, header, &reply);
1436 }
1437
1438 static void control_set_debug(TALLOC_CTX *mem_ctx,
1439                               struct tevent_req *req,
1440                               struct ctdb_req_header *header,
1441                               struct ctdb_req_control *request)
1442 {
1443         struct client_state *state = tevent_req_data(
1444                 req, struct client_state);
1445         struct ctdbd_context *ctdb = state->ctdb;
1446         struct ctdb_reply_control reply;
1447
1448         ctdb->log_level = (int)request->rdata.data.loglevel;
1449
1450         reply.rdata.opcode = request->opcode;
1451         reply.status = 0;
1452         reply.errmsg = NULL;
1453
1454         client_send_control(req, header, &reply);
1455 }
1456
1457 static void control_get_dbmap(TALLOC_CTX *mem_ctx,
1458                               struct tevent_req *req,
1459                                struct ctdb_req_header *header,
1460                               struct ctdb_req_control *request)
1461 {
1462         struct client_state *state = tevent_req_data(
1463                 req, struct client_state);
1464         struct ctdbd_context *ctdb = state->ctdb;
1465         struct ctdb_reply_control reply;
1466         struct ctdb_dbid_map *dbmap;
1467         int i;
1468
1469         reply.rdata.opcode = request->opcode;
1470
1471         dbmap = talloc_zero(mem_ctx, struct ctdb_dbid_map);
1472         if (dbmap == NULL) {
1473                 goto fail;
1474         }
1475
1476         dbmap->num = ctdb->db_map->num_dbs;
1477         dbmap->dbs = talloc_zero_array(dbmap, struct ctdb_dbid, dbmap->num);
1478         if (dbmap->dbs == NULL) {
1479                 goto fail;
1480         }
1481
1482         for (i = 0; i < dbmap->num; i++) {
1483                 struct database *db = &ctdb->db_map->db[i];
1484                 dbmap->dbs[i] = (struct ctdb_dbid) {
1485                         .db_id = db->id,
1486                         .flags = db->flags,
1487                 };
1488         }
1489
1490         reply.rdata.data.dbmap = dbmap;
1491         reply.status = 0;
1492         reply.errmsg = NULL;
1493         client_send_control(req, header, &reply);
1494         return;
1495
1496 fail:
1497         reply.status = -1;
1498         reply.errmsg = "Memory error";
1499         client_send_control(req, header, &reply);
1500 }
1501
1502 static void control_get_recmode(TALLOC_CTX *mem_ctx,
1503                                 struct tevent_req *req,
1504                                 struct ctdb_req_header *header,
1505                                 struct ctdb_req_control *request)
1506 {
1507         struct client_state *state = tevent_req_data(
1508                 req, struct client_state);
1509         struct ctdbd_context *ctdb = state->ctdb;
1510         struct ctdb_reply_control reply;
1511
1512         reply.rdata.opcode = request->opcode;
1513         reply.status = ctdb->vnn_map->recmode;
1514         reply.errmsg = NULL;
1515
1516         client_send_control(req, header, &reply);
1517 }
1518
1519 struct set_recmode_state {
1520         struct tevent_req *req;
1521         struct ctdbd_context *ctdb;
1522         struct ctdb_req_header header;
1523         struct ctdb_reply_control reply;
1524 };
1525
1526 static void set_recmode_callback(struct tevent_req *subreq)
1527 {
1528         struct set_recmode_state *substate = tevent_req_callback_data(
1529                 subreq, struct set_recmode_state);
1530         bool status;
1531         int ret;
1532
1533         status = recover_recv(subreq, &ret);
1534         TALLOC_FREE(subreq);
1535         if (! status) {
1536                 substate->reply.status = ret;
1537                 substate->reply.errmsg = "recovery failed";
1538         } else {
1539                 substate->reply.status = 0;
1540                 substate->reply.errmsg = NULL;
1541         }
1542
1543         client_send_control(substate->req, &substate->header, &substate->reply);
1544         talloc_free(substate);
1545 }
1546
1547 static void control_set_recmode(TALLOC_CTX *mem_ctx,
1548                                 struct tevent_req *req,
1549                                 struct ctdb_req_header *header,
1550                                 struct ctdb_req_control *request)
1551 {
1552         struct client_state *state = tevent_req_data(
1553                 req, struct client_state);
1554         struct tevent_req *subreq;
1555         struct ctdbd_context *ctdb = state->ctdb;
1556         struct set_recmode_state *substate;
1557         struct ctdb_reply_control reply;
1558
1559         reply.rdata.opcode = request->opcode;
1560
1561         if (request->rdata.data.recmode == CTDB_RECOVERY_NORMAL) {
1562                 reply.status = -1;
1563                 reply.errmsg = "Client cannot set recmode to NORMAL";
1564                 goto fail;
1565         }
1566
1567         substate = talloc_zero(ctdb, struct set_recmode_state);
1568         if (substate == NULL) {
1569                 reply.status = -1;
1570                 reply.errmsg = "Memory error";
1571                 goto fail;
1572         }
1573
1574         substate->req = req;
1575         substate->ctdb = ctdb;
1576         substate->header = *header;
1577         substate->reply.rdata.opcode = request->opcode;
1578
1579         subreq = recover_send(substate, state->ev, state->ctdb);
1580         if (subreq == NULL) {
1581                 talloc_free(substate);
1582                 goto fail;
1583         }
1584         tevent_req_set_callback(subreq, set_recmode_callback, substate);
1585
1586         ctdb->vnn_map->recmode = CTDB_RECOVERY_ACTIVE;
1587         return;
1588
1589 fail:
1590         client_send_control(req, header, &reply);
1591
1592 }
1593
1594 static void srvid_handler(uint64_t srvid, TDB_DATA data, void *private_data)
1595 {
1596         printf("Received a message for SRVID 0x%"PRIx64"\n", srvid);
1597 }
1598
1599 static void control_register_srvid(TALLOC_CTX *mem_ctx,
1600                                    struct tevent_req *req,
1601                                    struct ctdb_req_header *header,
1602                                    struct ctdb_req_control *request)
1603 {
1604         struct client_state *state = tevent_req_data(
1605                 req, struct client_state);
1606         struct ctdbd_context *ctdb = state->ctdb;
1607         struct ctdb_reply_control reply;
1608         int ret;
1609
1610         reply.rdata.opcode = request->opcode;
1611
1612         ret = srvid_register(ctdb->srv, state, request->srvid,
1613                              srvid_handler, state);
1614         if (ret != 0) {
1615                 reply.status = -1;
1616                 reply.errmsg = "Memory error";
1617                 goto fail;
1618         }
1619
1620         DEBUG(DEBUG_INFO, ("Register srvid 0x%"PRIx64"\n", request->srvid));
1621
1622         reply.status = 0;
1623         reply.errmsg = NULL;
1624
1625 fail:
1626         client_send_control(req, header, &reply);
1627 }
1628
1629 static void control_deregister_srvid(TALLOC_CTX *mem_ctx,
1630                                      struct tevent_req *req,
1631                                      struct ctdb_req_header *header,
1632                                      struct ctdb_req_control *request)
1633 {
1634         struct client_state *state = tevent_req_data(
1635                 req, struct client_state);
1636         struct ctdbd_context *ctdb = state->ctdb;
1637         struct ctdb_reply_control reply;
1638         int ret;
1639
1640         reply.rdata.opcode = request->opcode;
1641
1642         ret = srvid_deregister(ctdb->srv, request->srvid, state);
1643         if (ret != 0) {
1644                 reply.status = -1;
1645                 reply.errmsg = "srvid not registered";
1646                 goto fail;
1647         }
1648
1649         DEBUG(DEBUG_INFO, ("Deregister srvid 0x%"PRIx64"\n", request->srvid));
1650
1651         reply.status = 0;
1652         reply.errmsg = NULL;
1653
1654 fail:
1655         client_send_control(req, header, &reply);
1656 }
1657
1658 static void control_get_dbname(TALLOC_CTX *mem_ctx,
1659                                struct tevent_req *req,
1660                                struct ctdb_req_header *header,
1661                                struct ctdb_req_control *request)
1662 {
1663         struct client_state *state = tevent_req_data(
1664                 req, struct client_state);
1665         struct ctdbd_context *ctdb = state->ctdb;
1666         struct ctdb_reply_control reply;
1667         struct database *db;
1668
1669         reply.rdata.opcode = request->opcode;
1670
1671         db = database_find(ctdb->db_map, request->rdata.data.db_id);
1672         if (db == NULL) {
1673                 reply.status = ENOENT;
1674                 reply.errmsg = "Database not found";
1675         } else {
1676                 reply.rdata.data.db_name = talloc_strdup(mem_ctx, db->name);
1677                 if (reply.rdata.data.db_name == NULL) {
1678                         reply.status = ENOMEM;
1679                         reply.errmsg = "Memory error";
1680                 } else {
1681                         reply.status = 0;
1682                         reply.errmsg = NULL;
1683                 }
1684         }
1685
1686         client_send_control(req, header, &reply);
1687 }
1688
1689 static void control_get_pid(TALLOC_CTX *mem_ctx,
1690                             struct tevent_req *req,
1691                             struct ctdb_req_header *header,
1692                             struct ctdb_req_control *request)
1693 {
1694         struct ctdb_reply_control reply;
1695
1696         reply.rdata.opcode = request->opcode;
1697         reply.status = getpid();
1698         reply.errmsg = NULL;
1699
1700         client_send_control(req, header, &reply);
1701 }
1702
1703 static void control_get_recmaster(TALLOC_CTX *mem_ctx,
1704                                   struct tevent_req *req,
1705                                   struct ctdb_req_header *header,
1706                                   struct ctdb_req_control *request)
1707 {
1708         struct client_state *state = tevent_req_data(
1709                 req, struct client_state);
1710         struct ctdbd_context *ctdb = state->ctdb;
1711         struct ctdb_reply_control reply;
1712
1713         reply.rdata.opcode = request->opcode;
1714         reply.status = ctdb->node_map->recmaster;
1715         reply.errmsg = NULL;
1716
1717         client_send_control(req, header, &reply);
1718 }
1719
1720 static void control_get_pnn(TALLOC_CTX *mem_ctx,
1721                             struct tevent_req *req,
1722                             struct ctdb_req_header *header,
1723                             struct ctdb_req_control *request)
1724 {
1725         struct ctdb_reply_control reply;
1726
1727         reply.rdata.opcode = request->opcode;
1728         reply.status = header->destnode;
1729         reply.errmsg = NULL;
1730
1731         client_send_control(req, header, &reply);
1732 }
1733
1734 static void control_shutdown(TALLOC_CTX *mem_ctx,
1735                              struct tevent_req *req,
1736                              struct ctdb_req_header *hdr,
1737                              struct ctdb_req_control *request)
1738 {
1739         struct client_state *state = tevent_req_data(
1740                 req, struct client_state);
1741
1742         state->status = 99;
1743 }
1744
1745 static void control_set_tunable(TALLOC_CTX *mem_ctx,
1746                                 struct tevent_req *req,
1747                                 struct ctdb_req_header *header,
1748                                 struct ctdb_req_control *request)
1749 {
1750         struct client_state *state = tevent_req_data(
1751                 req, struct client_state);
1752         struct ctdbd_context *ctdb = state->ctdb;
1753         struct ctdb_reply_control reply;
1754         bool ret, obsolete;
1755
1756         reply.rdata.opcode = request->opcode;
1757         reply.errmsg = NULL;
1758
1759         ret = ctdb_tunable_set_value(&ctdb->tun_list,
1760                                      request->rdata.data.tunable->name,
1761                                      request->rdata.data.tunable->value,
1762                                      &obsolete);
1763         if (! ret) {
1764                 reply.status = -1;
1765         } else if (obsolete) {
1766                 reply.status = 1;
1767         } else {
1768                 reply.status = 0;
1769         }
1770
1771         client_send_control(req, header, &reply);
1772 }
1773
1774 static void control_get_tunable(TALLOC_CTX *mem_ctx,
1775                                 struct tevent_req *req,
1776                                 struct ctdb_req_header *header,
1777                                 struct ctdb_req_control *request)
1778 {
1779         struct client_state *state = tevent_req_data(
1780                 req, struct client_state);
1781         struct ctdbd_context *ctdb = state->ctdb;
1782         struct ctdb_reply_control reply;
1783         uint32_t value;
1784         bool ret;
1785
1786         reply.rdata.opcode = request->opcode;
1787         reply.errmsg = NULL;
1788
1789         ret = ctdb_tunable_get_value(&ctdb->tun_list,
1790                                      request->rdata.data.tun_var, &value);
1791         if (! ret) {
1792                 reply.status = -1;
1793         } else {
1794                 reply.rdata.data.tun_value = value;
1795                 reply.status = 0;
1796         }
1797
1798         client_send_control(req, header, &reply);
1799 }
1800
1801 static void control_list_tunables(TALLOC_CTX *mem_ctx,
1802                                   struct tevent_req *req,
1803                                   struct ctdb_req_header *header,
1804                                   struct ctdb_req_control *request)
1805 {
1806         struct ctdb_reply_control reply;
1807         struct ctdb_var_list *var_list;
1808
1809         reply.rdata.opcode = request->opcode;
1810         reply.errmsg = NULL;
1811
1812         var_list = ctdb_tunable_names(mem_ctx);
1813         if (var_list == NULL) {
1814                 reply.status = -1;
1815         } else {
1816                 reply.rdata.data.tun_var_list = var_list;
1817                 reply.status = 0;
1818         }
1819
1820         client_send_control(req, header, &reply);
1821 }
1822
1823 static void control_modify_flags(TALLOC_CTX *mem_ctx,
1824                                  struct tevent_req *req,
1825                                  struct ctdb_req_header *header,
1826                                  struct ctdb_req_control *request)
1827 {
1828         struct client_state *state = tevent_req_data(
1829                 req, struct client_state);
1830         struct ctdbd_context *ctdb = state->ctdb;
1831         struct ctdb_node_flag_change *change = request->rdata.data.flag_change;
1832         struct ctdb_reply_control reply;
1833         struct node *node;
1834
1835         reply.rdata.opcode = request->opcode;
1836
1837         if ((change->old_flags & ~NODE_FLAGS_PERMANENTLY_DISABLED) ||
1838             (change->new_flags & ~NODE_FLAGS_PERMANENTLY_DISABLED) != 0) {
1839                 DEBUG(DEBUG_INFO,
1840                       ("MODIFY_FLAGS control not for PERMANENTLY_DISABLED\n"));
1841                 reply.status = EINVAL;
1842                 reply.errmsg = "Failed to MODIFY_FLAGS";
1843                 client_send_control(req, header, &reply);
1844                 return;
1845         }
1846
1847         /* There's all sorts of broadcast weirdness here.  Only change
1848          * the specified node, not the destination node of the
1849          * control. */
1850         node = &ctdb->node_map->node[change->pnn];
1851
1852         if ((node->flags &
1853              change->old_flags & NODE_FLAGS_PERMANENTLY_DISABLED) == 0 &&
1854             (change->new_flags & NODE_FLAGS_PERMANENTLY_DISABLED) != 0) {
1855                 DEBUG(DEBUG_INFO,("Disabling node %d\n", header->destnode));
1856                 node->flags |= NODE_FLAGS_PERMANENTLY_DISABLED;
1857                 goto done;
1858         }
1859
1860         if ((node->flags &
1861              change->old_flags & NODE_FLAGS_PERMANENTLY_DISABLED) != 0 &&
1862             (change->new_flags & NODE_FLAGS_PERMANENTLY_DISABLED) == 0) {
1863                 DEBUG(DEBUG_INFO,("Enabling node %d\n", header->destnode));
1864                 node->flags &= ~NODE_FLAGS_PERMANENTLY_DISABLED;
1865                 goto done;
1866         }
1867
1868         DEBUG(DEBUG_INFO, ("Flags unchanged for node %d\n", header->destnode));
1869
1870 done:
1871         reply.status = 0;
1872         reply.errmsg = NULL;
1873         client_send_control(req, header, &reply);
1874 }
1875
1876 static void control_get_all_tunables(TALLOC_CTX *mem_ctx,
1877                                      struct tevent_req *req,
1878                                      struct ctdb_req_header *header,
1879                                      struct ctdb_req_control *request)
1880 {
1881         struct client_state *state = tevent_req_data(
1882                 req, struct client_state);
1883         struct ctdbd_context *ctdb = state->ctdb;
1884         struct ctdb_reply_control reply;
1885
1886         reply.rdata.opcode = request->opcode;
1887         reply.rdata.data.tun_list = &ctdb->tun_list;
1888         reply.status = 0;
1889         reply.errmsg = NULL;
1890
1891         client_send_control(req, header, &reply);
1892 }
1893
1894 static void control_uptime(TALLOC_CTX *mem_ctx,
1895                            struct tevent_req *req,
1896                            struct ctdb_req_header *header,
1897                            struct ctdb_req_control *request)
1898 {
1899         struct client_state *state = tevent_req_data(
1900                 req, struct client_state);
1901         struct ctdbd_context *ctdb = state->ctdb;
1902         struct ctdb_reply_control reply;
1903         struct ctdb_uptime *uptime;;
1904
1905         reply.rdata.opcode = request->opcode;
1906
1907         uptime = talloc_zero(mem_ctx, struct ctdb_uptime);
1908         if (uptime == NULL) {
1909                 goto fail;
1910         }
1911
1912         uptime->current_time = tevent_timeval_current();
1913         uptime->ctdbd_start_time = ctdb->start_time;
1914         uptime->last_recovery_started = ctdb->recovery_start_time;
1915         uptime->last_recovery_finished = ctdb->recovery_end_time;
1916
1917         reply.rdata.data.uptime = uptime;
1918         reply.status = 0;
1919         reply.errmsg = NULL;
1920         client_send_control(req, header, &reply);
1921         return;
1922
1923 fail:
1924         reply.status = -1;
1925         reply.errmsg = "Memory error";
1926         client_send_control(req, header, &reply);
1927 }
1928
1929 static void control_reload_nodes_file(TALLOC_CTX *mem_ctx,
1930                                       struct tevent_req *req,
1931                                       struct ctdb_req_header *header,
1932                                       struct ctdb_req_control *request)
1933 {
1934         struct client_state *state = tevent_req_data(
1935                 req, struct client_state);
1936         struct ctdbd_context *ctdb = state->ctdb;
1937         struct ctdb_reply_control reply;
1938         struct ctdb_node_map *nodemap;
1939         struct node_map *node_map = ctdb->node_map;
1940         int i;
1941
1942         reply.rdata.opcode = request->opcode;
1943
1944         nodemap = read_nodes_file(mem_ctx, header->destnode);
1945         if (nodemap == NULL) {
1946                 goto fail;
1947         }
1948
1949         for (i=0; i<nodemap->num; i++) {
1950                 struct node *node;
1951
1952                 if (i < node_map->num_nodes &&
1953                     ctdb_sock_addr_same(&nodemap->node[i].addr,
1954                                         &node_map->node[i].addr)) {
1955                         continue;
1956                 }
1957
1958                 if (nodemap->node[i].flags & NODE_FLAGS_DELETED) {
1959                         node = &node_map->node[i];
1960
1961                         node->flags |= NODE_FLAGS_DELETED;
1962                         parse_ip("0.0.0.0", NULL, 0, &node->addr);
1963
1964                         continue;
1965                 }
1966
1967                 if (i < node_map->num_nodes &&
1968                     node_map->node[i].flags & NODE_FLAGS_DELETED) {
1969                         node = &node_map->node[i];
1970
1971                         node->flags &= ~NODE_FLAGS_DELETED;
1972                         node->addr = nodemap->node[i].addr;
1973
1974                         continue;
1975                 }
1976
1977                 node_map->node = talloc_realloc(node_map, node_map->node,
1978                                                 struct node,
1979                                                 node_map->num_nodes+1);
1980                 if (node_map->node == NULL) {
1981                         goto fail;
1982                 }
1983                 node = &node_map->node[node_map->num_nodes];
1984
1985                 node->addr = nodemap->node[i].addr;
1986                 node->pnn = nodemap->node[i].pnn;
1987                 node->flags = 0;
1988                 node->capabilities = CTDB_CAP_DEFAULT;
1989                 node->recovery_disabled = false;
1990                 node->recovery_substate = NULL;
1991
1992                 node_map->num_nodes += 1;
1993         }
1994
1995         talloc_free(nodemap);
1996
1997         reply.status = 0;
1998         reply.errmsg = NULL;
1999         client_send_control(req, header, &reply);
2000         return;
2001
2002 fail:
2003         reply.status = -1;
2004         reply.errmsg = "Memory error";
2005         client_send_control(req, header, &reply);
2006 }
2007
2008 static void control_get_capabilities(TALLOC_CTX *mem_ctx,
2009                                      struct tevent_req *req,
2010                                      struct ctdb_req_header *header,
2011                                      struct ctdb_req_control *request)
2012 {
2013         struct client_state *state = tevent_req_data(
2014                 req, struct client_state);
2015         struct ctdbd_context *ctdb = state->ctdb;
2016         struct ctdb_reply_control reply;
2017         struct node *node;
2018         uint32_t caps = 0;
2019
2020         reply.rdata.opcode = request->opcode;
2021
2022         node = &ctdb->node_map->node[header->destnode];
2023         caps = node->capabilities;
2024
2025         if (node->flags & NODE_FLAGS_FAKE_TIMEOUT) {
2026                 /* Don't send reply */
2027                 return;
2028         }
2029
2030         reply.rdata.data.caps = caps;
2031         reply.status = 0;
2032         reply.errmsg = NULL;
2033
2034         client_send_control(req, header, &reply);
2035 }
2036
2037 static void control_release_ip(TALLOC_CTX *mem_ctx,
2038                                struct tevent_req *req,
2039                                struct ctdb_req_header *header,
2040                                struct ctdb_req_control *request)
2041 {
2042         struct client_state *state = tevent_req_data(
2043                 req, struct client_state);
2044         struct ctdbd_context *ctdb = state->ctdb;
2045         struct ctdb_public_ip *ip = request->rdata.data.pubip;
2046         struct ctdb_reply_control reply;
2047         struct ctdb_public_ip_list *ips = NULL;
2048         struct ctdb_public_ip *t = NULL;
2049         int i;
2050
2051         reply.rdata.opcode = request->opcode;
2052
2053         if (ctdb->known_ips == NULL) {
2054                 D_INFO("RELEASE_IP %s - not a public IP\n",
2055                        ctdb_sock_addr_to_string(mem_ctx, &ip->addr, false));
2056                 goto done;
2057         }
2058
2059         ips = &ctdb->known_ips[header->destnode];
2060
2061         t = NULL;
2062         for (i = 0; i < ips->num; i++) {
2063                 if (ctdb_sock_addr_same_ip(&ips->ip[i].addr, &ip->addr)) {
2064                         t = &ips->ip[i];
2065                         break;
2066                 }
2067         }
2068         if (t == NULL) {
2069                 D_INFO("RELEASE_IP %s - not a public IP\n",
2070                        ctdb_sock_addr_to_string(mem_ctx, &ip->addr, false));
2071                 goto done;
2072         }
2073
2074         if (t->pnn != header->destnode) {
2075                 if (header->destnode == ip->pnn) {
2076                         D_ERR("error: RELEASE_IP %s - to TAKE_IP node %d\n",
2077                               ctdb_sock_addr_to_string(mem_ctx,
2078                                                        &ip->addr, false),
2079                               ip->pnn);
2080                         reply.status = -1;
2081                         reply.errmsg = "RELEASE_IP to TAKE_IP node";
2082                         client_send_control(req, header, &reply);
2083                         return;
2084                 }
2085
2086                 D_INFO("RELEASE_IP %s - to node %d - redundant\n",
2087                        ctdb_sock_addr_to_string(mem_ctx, &ip->addr, false),
2088                        ip->pnn);
2089                 t->pnn = ip->pnn;
2090         } else {
2091                 D_NOTICE("RELEASE_IP %s - to node %d\n",
2092                          ctdb_sock_addr_to_string(mem_ctx, &ip->addr, false),
2093                           ip->pnn);
2094                 t->pnn = ip->pnn;
2095         }
2096
2097 done:
2098         reply.status = 0;
2099         reply.errmsg = NULL;
2100         client_send_control(req, header, &reply);
2101 }
2102
2103 static void control_takeover_ip(TALLOC_CTX *mem_ctx,
2104                                 struct tevent_req *req,
2105                                 struct ctdb_req_header *header,
2106                                 struct ctdb_req_control *request)
2107 {
2108         struct client_state *state = tevent_req_data(
2109                 req, struct client_state);
2110         struct ctdbd_context *ctdb = state->ctdb;
2111         struct ctdb_public_ip *ip = request->rdata.data.pubip;
2112         struct ctdb_reply_control reply;
2113         struct ctdb_public_ip_list *ips = NULL;
2114         struct ctdb_public_ip *t = NULL;
2115         int i;
2116
2117         reply.rdata.opcode = request->opcode;
2118
2119         if (ctdb->known_ips == NULL) {
2120                 D_INFO("TAKEOVER_IP %s - not a public IP\n",
2121                        ctdb_sock_addr_to_string(mem_ctx, &ip->addr, false));
2122                 goto done;
2123         }
2124
2125         ips = &ctdb->known_ips[header->destnode];
2126
2127         t = NULL;
2128         for (i = 0; i < ips->num; i++) {
2129                 if (ctdb_sock_addr_same_ip(&ips->ip[i].addr, &ip->addr)) {
2130                         t = &ips->ip[i];
2131                         break;
2132                 }
2133         }
2134         if (t == NULL) {
2135                 D_INFO("TAKEOVER_IP %s - not a public IP\n",
2136                        ctdb_sock_addr_to_string(mem_ctx, &ip->addr, false));
2137                 goto done;
2138         }
2139
2140         if (t->pnn == header->destnode) {
2141                 D_INFO("TAKEOVER_IP %s - redundant\n",
2142                        ctdb_sock_addr_to_string(mem_ctx, &ip->addr, false));
2143         } else {
2144                 D_NOTICE("TAKEOVER_IP %s\n",
2145                          ctdb_sock_addr_to_string(mem_ctx, &ip->addr, false));
2146                 t->pnn = ip->pnn;
2147         }
2148
2149 done:
2150         reply.status = 0;
2151         reply.errmsg = NULL;
2152         client_send_control(req, header, &reply);
2153 }
2154
2155 static void control_get_public_ips(TALLOC_CTX *mem_ctx,
2156                                    struct tevent_req *req,
2157                                    struct ctdb_req_header *header,
2158                                    struct ctdb_req_control *request)
2159 {
2160         struct client_state *state = tevent_req_data(
2161                 req, struct client_state);
2162         struct ctdbd_context *ctdb = state->ctdb;
2163         struct ctdb_reply_control reply;
2164         struct ctdb_public_ip_list *ips = NULL;
2165
2166         reply.rdata.opcode = request->opcode;
2167
2168         if (ctdb->known_ips == NULL) {
2169                 /* No IPs defined so create a dummy empty struct and ship it */
2170                 ips = talloc_zero(mem_ctx, struct ctdb_public_ip_list);;
2171                 if (ips == NULL) {
2172                         reply.status = ENOMEM;
2173                         reply.errmsg = "Memory error";
2174                         goto done;
2175                 }
2176                 goto ok;
2177         }
2178
2179         ips = &ctdb->known_ips[header->destnode];
2180
2181         if (request->flags & CTDB_PUBLIC_IP_FLAGS_ONLY_AVAILABLE) {
2182                 /* If runstate is not RUNNING or a node is then return
2183                  * no available IPs.  Don't worry about interface
2184                  * states here - we're not faking down to that level.
2185                  */
2186                 if (ctdb->runstate != CTDB_RUNSTATE_RUNNING) {
2187                         /* No available IPs: return dummy empty struct */
2188                         ips = talloc_zero(mem_ctx, struct ctdb_public_ip_list);;
2189                         if (ips == NULL) {
2190                                 reply.status = ENOMEM;
2191                                 reply.errmsg = "Memory error";
2192                                 goto done;
2193                         }
2194                 }
2195         }
2196
2197 ok:
2198         reply.rdata.data.pubip_list = ips;
2199         reply.status = 0;
2200         reply.errmsg = NULL;
2201
2202 done:
2203         client_send_control(req, header, &reply);
2204 }
2205
2206 static void control_get_nodemap(TALLOC_CTX *mem_ctx,
2207                                 struct tevent_req *req,
2208                                 struct ctdb_req_header *header,
2209                                 struct ctdb_req_control *request)
2210 {
2211         struct client_state *state = tevent_req_data(
2212                 req, struct client_state);
2213         struct ctdbd_context *ctdb = state->ctdb;
2214         struct ctdb_reply_control reply;
2215         struct ctdb_node_map *nodemap;
2216         struct node *node;
2217         int i;
2218
2219         reply.rdata.opcode = request->opcode;
2220
2221         nodemap = talloc_zero(mem_ctx, struct ctdb_node_map);
2222         if (nodemap == NULL) {
2223                 goto fail;
2224         }
2225
2226         nodemap->num = ctdb->node_map->num_nodes;
2227         nodemap->node = talloc_array(nodemap, struct ctdb_node_and_flags,
2228                                      nodemap->num);
2229         if (nodemap->node == NULL) {
2230                 goto fail;
2231         }
2232
2233         for (i=0; i<nodemap->num; i++) {
2234                 node = &ctdb->node_map->node[i];
2235                 nodemap->node[i] = (struct ctdb_node_and_flags) {
2236                         .pnn = node->pnn,
2237                         .flags = node->flags,
2238                         .addr = node->addr,
2239                 };
2240         }
2241
2242         reply.rdata.data.nodemap = nodemap;
2243         reply.status = 0;
2244         reply.errmsg = NULL;
2245         client_send_control(req, header, &reply);
2246         return;
2247
2248 fail:
2249         reply.status = -1;
2250         reply.errmsg = "Memory error";
2251         client_send_control(req, header, &reply);
2252 }
2253
2254 static void control_get_reclock_file(TALLOC_CTX *mem_ctx,
2255                                      struct tevent_req *req,
2256                                      struct ctdb_req_header *header,
2257                                      struct ctdb_req_control *request)
2258 {
2259         struct client_state *state = tevent_req_data(
2260                 req, struct client_state);
2261         struct ctdbd_context *ctdb = state->ctdb;
2262         struct ctdb_reply_control reply;
2263
2264         reply.rdata.opcode = request->opcode;
2265
2266         if (ctdb->reclock != NULL) {
2267                 reply.rdata.data.reclock_file =
2268                         talloc_strdup(mem_ctx, ctdb->reclock);
2269                 if (reply.rdata.data.reclock_file == NULL) {
2270                         reply.status = ENOMEM;
2271                         reply.errmsg = "Memory error";
2272                         goto done;
2273                 }
2274         } else {
2275                 reply.rdata.data.reclock_file = NULL;
2276         }
2277
2278         reply.status = 0;
2279         reply.errmsg = NULL;
2280
2281 done:
2282         client_send_control(req, header, &reply);
2283 }
2284
2285 static void control_stop_node(TALLOC_CTX *mem_ctx,
2286                               struct tevent_req *req,
2287                               struct ctdb_req_header *header,
2288                               struct ctdb_req_control *request)
2289 {
2290         struct client_state *state = tevent_req_data(
2291                 req, struct client_state);
2292         struct ctdbd_context *ctdb = state->ctdb;
2293         struct ctdb_reply_control reply;
2294
2295         reply.rdata.opcode = request->opcode;
2296
2297         DEBUG(DEBUG_INFO, ("Stopping node\n"));
2298         ctdb->node_map->node[header->destnode].flags |= NODE_FLAGS_STOPPED;
2299
2300         reply.status = 0;
2301         reply.errmsg = NULL;
2302
2303         client_send_control(req, header, &reply);
2304         return;
2305 }
2306
2307 static void control_continue_node(TALLOC_CTX *mem_ctx,
2308                                   struct tevent_req *req,
2309                                   struct ctdb_req_header *header,
2310                                   struct ctdb_req_control *request)
2311 {
2312         struct client_state *state = tevent_req_data(
2313                 req, struct client_state);
2314         struct ctdbd_context *ctdb = state->ctdb;
2315         struct ctdb_reply_control reply;
2316
2317         reply.rdata.opcode = request->opcode;
2318
2319         DEBUG(DEBUG_INFO, ("Continue node\n"));
2320         ctdb->node_map->node[header->destnode].flags &= ~NODE_FLAGS_STOPPED;
2321
2322         reply.status = 0;
2323         reply.errmsg = NULL;
2324
2325         client_send_control(req, header, &reply);
2326         return;
2327 }
2328
2329 static void set_ban_state_callback(struct tevent_req *subreq)
2330 {
2331         struct node *node = tevent_req_callback_data(
2332                 subreq, struct node);
2333         bool status;
2334
2335         status = tevent_wakeup_recv(subreq);
2336         TALLOC_FREE(subreq);
2337         if (! status) {
2338                 DEBUG(DEBUG_INFO, ("tevent_wakeup_recv failed\n"));
2339         }
2340
2341         node->flags &= ~NODE_FLAGS_BANNED;
2342 }
2343
2344 static void control_set_ban_state(TALLOC_CTX *mem_ctx,
2345                                   struct tevent_req *req,
2346                                   struct ctdb_req_header *header,
2347                                   struct ctdb_req_control *request)
2348 {
2349         struct client_state *state = tevent_req_data(
2350                 req, struct client_state);
2351         struct tevent_req *subreq;
2352         struct ctdbd_context *ctdb = state->ctdb;
2353         struct ctdb_ban_state *ban = request->rdata.data.ban_state;
2354         struct ctdb_reply_control reply;
2355         struct node *node;
2356
2357         reply.rdata.opcode = request->opcode;
2358
2359         if (ban->pnn != header->destnode) {
2360                 DEBUG(DEBUG_INFO,
2361                       ("SET_BAN_STATE control for PNN %d rejected\n",
2362                        ban->pnn));
2363                 reply.status = EINVAL;
2364                 goto fail;
2365         }
2366
2367         node = &ctdb->node_map->node[header->destnode];
2368
2369         if (ban->time == 0) {
2370                 DEBUG(DEBUG_INFO,("Unbanning this node\n"));
2371                 node->flags &= ~NODE_FLAGS_BANNED;
2372                 goto done;
2373         }
2374
2375         subreq = tevent_wakeup_send(ctdb->node_map, state->ev,
2376                                     tevent_timeval_current_ofs(
2377                                             ban->time, 0));
2378         if (subreq == NULL) {
2379                 reply.status = ENOMEM;
2380                 goto fail;
2381         }
2382         tevent_req_set_callback(subreq, set_ban_state_callback, node);
2383
2384         DEBUG(DEBUG_INFO, ("Banning this node for %d seconds\n", ban->time));
2385         node->flags |= NODE_FLAGS_BANNED;
2386         ctdb->vnn_map->generation = INVALID_GENERATION;
2387
2388 done:
2389         reply.status = 0;
2390         reply.errmsg = NULL;
2391
2392         client_send_control(req, header, &reply);
2393         return;
2394
2395 fail:
2396         reply.errmsg = "Failed to ban node";
2397 }
2398
2399 static void control_get_db_seqnum(TALLOC_CTX *mem_ctx,
2400                                struct tevent_req *req,
2401                                struct ctdb_req_header *header,
2402                                struct ctdb_req_control *request)
2403 {
2404         struct client_state *state = tevent_req_data(
2405                 req, struct client_state);
2406         struct ctdbd_context *ctdb = state->ctdb;
2407         struct ctdb_reply_control reply;
2408         struct database *db;
2409
2410         reply.rdata.opcode = request->opcode;
2411
2412         db = database_find(ctdb->db_map, request->rdata.data.db_id);
2413         if (db == NULL) {
2414                 reply.status = ENOENT;
2415                 reply.errmsg = "Database not found";
2416         } else {
2417                 reply.rdata.data.seqnum = db->seq_num;
2418                 reply.status = 0;
2419                 reply.errmsg = NULL;
2420         }
2421
2422         client_send_control(req, header, &reply);
2423 }
2424
2425 static void control_db_get_health(TALLOC_CTX *mem_ctx,
2426                                   struct tevent_req *req,
2427                                   struct ctdb_req_header *header,
2428                                   struct ctdb_req_control *request)
2429 {
2430         struct client_state *state = tevent_req_data(
2431                 req, struct client_state);
2432         struct ctdbd_context *ctdb = state->ctdb;
2433         struct ctdb_reply_control reply;
2434         struct database *db;
2435
2436         reply.rdata.opcode = request->opcode;
2437
2438         db = database_find(ctdb->db_map, request->rdata.data.db_id);
2439         if (db == NULL) {
2440                 reply.status = ENOENT;
2441                 reply.errmsg = "Database not found";
2442         } else {
2443                 reply.rdata.data.reason = NULL;
2444                 reply.status = 0;
2445                 reply.errmsg = NULL;
2446         }
2447
2448         client_send_control(req, header, &reply);
2449 }
2450
2451 static struct ctdb_iface_list *get_ctdb_iface_list(TALLOC_CTX *mem_ctx,
2452                                                    struct ctdbd_context *ctdb)
2453 {
2454         struct ctdb_iface_list *iface_list;
2455         struct interface *iface;
2456         int i;
2457
2458         iface_list = talloc_zero(mem_ctx, struct ctdb_iface_list);
2459         if (iface_list == NULL) {
2460                 goto done;
2461         }
2462
2463         iface_list->num = ctdb->iface_map->num;
2464         iface_list->iface = talloc_array(iface_list, struct ctdb_iface,
2465                                          iface_list->num);
2466         if (iface_list->iface == NULL) {
2467                 TALLOC_FREE(iface_list);
2468                 goto done;
2469         }
2470
2471         for (i=0; i<iface_list->num; i++) {
2472                 iface = &ctdb->iface_map->iface[i];
2473                 iface_list->iface[i] = (struct ctdb_iface) {
2474                         .link_state = iface->link_up,
2475                         .references = iface->references,
2476                 };
2477                 strlcpy(iface_list->iface[i].name, iface->name,
2478                         sizeof(iface_list->iface[i].name));
2479         }
2480
2481 done:
2482         return iface_list;
2483 }
2484
2485 static void control_get_public_ip_info(TALLOC_CTX *mem_ctx,
2486                                        struct tevent_req *req,
2487                                        struct ctdb_req_header *header,
2488                                        struct ctdb_req_control *request)
2489 {
2490         struct client_state *state = tevent_req_data(
2491                 req, struct client_state);
2492         struct ctdbd_context *ctdb = state->ctdb;
2493         struct ctdb_reply_control reply;
2494         ctdb_sock_addr *addr = request->rdata.data.addr;
2495         struct ctdb_public_ip_list *known = NULL;
2496         struct ctdb_public_ip_info *info = NULL;
2497         unsigned i;
2498
2499         reply.rdata.opcode = request->opcode;
2500
2501         info = talloc_zero(mem_ctx, struct ctdb_public_ip_info);
2502         if (info == NULL) {
2503                 reply.status = ENOMEM;
2504                 reply.errmsg = "Memory error";
2505                 goto done;
2506         }
2507
2508         reply.rdata.data.ipinfo = info;
2509
2510         if (ctdb->known_ips != NULL) {
2511                 known = &ctdb->known_ips[header->destnode];
2512         } else {
2513                 /* No IPs defined so create a dummy empty struct and
2514                  * fall through.  The given IP won't be matched
2515                  * below...
2516                  */
2517                 known = talloc_zero(mem_ctx, struct ctdb_public_ip_list);;
2518                 if (known == NULL) {
2519                         reply.status = ENOMEM;
2520                         reply.errmsg = "Memory error";
2521                         goto done;
2522                 }
2523         }
2524
2525         for (i = 0; i < known->num; i++) {
2526                 if (ctdb_sock_addr_same_ip(&known->ip[i].addr,
2527                                            addr)) {
2528                         break;
2529                 }
2530         }
2531
2532         if (i == known->num) {
2533                 D_ERR("GET_PUBLIC_IP_INFO: not known public IP %s\n",
2534                       ctdb_sock_addr_to_string(mem_ctx, addr, false));
2535                 reply.status = -1;
2536                 reply.errmsg = "Unknown address";
2537                 goto done;
2538         }
2539
2540         info->ip = known->ip[i];
2541
2542         /* The fake PUBLICIPS stanza and resulting known_ips data
2543          * don't know anything about interfaces, so completely fake
2544          * this.
2545          */
2546         info->active_idx = 0;
2547
2548         info->ifaces = get_ctdb_iface_list(mem_ctx, ctdb);
2549         if (info->ifaces == NULL) {
2550                 reply.status = ENOMEM;
2551                 reply.errmsg = "Memory error";
2552                 goto done;
2553         }
2554
2555         reply.status = 0;
2556         reply.errmsg = NULL;
2557
2558 done:
2559         client_send_control(req, header, &reply);
2560 }
2561
2562 static void control_get_ifaces(TALLOC_CTX *mem_ctx,
2563                                struct tevent_req *req,
2564                                struct ctdb_req_header *header,
2565                                struct ctdb_req_control *request)
2566 {
2567         struct client_state *state = tevent_req_data(
2568                 req, struct client_state);
2569         struct ctdbd_context *ctdb = state->ctdb;
2570         struct ctdb_reply_control reply;
2571         struct ctdb_iface_list *iface_list;
2572
2573         reply.rdata.opcode = request->opcode;
2574
2575         iface_list = get_ctdb_iface_list(mem_ctx, ctdb);
2576         if (iface_list == NULL) {
2577                 goto fail;
2578         }
2579
2580         reply.rdata.data.iface_list = iface_list;
2581         reply.status = 0;
2582         reply.errmsg = NULL;
2583         client_send_control(req, header, &reply);
2584         return;
2585
2586 fail:
2587         reply.status = -1;
2588         reply.errmsg = "Memory error";
2589         client_send_control(req, header, &reply);
2590 }
2591
2592 static void control_set_iface_link_state(TALLOC_CTX *mem_ctx,
2593                                          struct tevent_req *req,
2594                                          struct ctdb_req_header *header,
2595                                          struct ctdb_req_control *request)
2596 {
2597         struct client_state *state = tevent_req_data(
2598                 req, struct client_state);
2599         struct ctdbd_context *ctdb = state->ctdb;
2600         struct ctdb_reply_control reply;
2601         struct ctdb_iface *in_iface;
2602         struct interface *iface = NULL;
2603         bool link_up = false;
2604         int i;
2605
2606         reply.rdata.opcode = request->opcode;
2607
2608         in_iface = request->rdata.data.iface;
2609
2610         if (in_iface->name[CTDB_IFACE_SIZE] != '\0') {
2611                 reply.errmsg = "interface name not terminated";
2612                 goto fail;
2613         }
2614
2615         switch (in_iface->link_state) {
2616                 case 0:
2617                         link_up = false;
2618                         break;
2619
2620                 case 1:
2621                         link_up = true;
2622                         break;
2623
2624                 default:
2625                         reply.errmsg = "invalid link state";
2626                         goto fail;
2627         }
2628
2629         if (in_iface->references != 0) {
2630                 reply.errmsg = "references should be 0";
2631                 goto fail;
2632         }
2633
2634         for (i=0; i<ctdb->iface_map->num; i++) {
2635                 if (strcmp(ctdb->iface_map->iface[i].name,
2636                            in_iface->name) == 0) {
2637                         iface = &ctdb->iface_map->iface[i];
2638                         break;
2639                 }
2640         }
2641
2642         if (iface == NULL) {
2643                 reply.errmsg = "interface not found";
2644                 goto fail;
2645         }
2646
2647         iface->link_up = link_up;
2648
2649         reply.status = 0;
2650         reply.errmsg = NULL;
2651         client_send_control(req, header, &reply);
2652         return;
2653
2654 fail:
2655         reply.status = -1;
2656         client_send_control(req, header, &reply);
2657 }
2658
2659 static void control_set_db_readonly(TALLOC_CTX *mem_ctx,
2660                                     struct tevent_req *req,
2661                                     struct ctdb_req_header *header,
2662                                     struct ctdb_req_control *request)
2663 {
2664         struct client_state *state = tevent_req_data(
2665                 req, struct client_state);
2666         struct ctdbd_context *ctdb = state->ctdb;
2667         struct ctdb_reply_control reply;
2668         struct database *db;
2669
2670         reply.rdata.opcode = request->opcode;
2671
2672         db = database_find(ctdb->db_map, request->rdata.data.db_id);
2673         if (db == NULL) {
2674                 reply.status = ENOENT;
2675                 reply.errmsg = "Database not found";
2676                 goto done;
2677         }
2678
2679         if (db->flags & CTDB_DB_FLAGS_PERSISTENT) {
2680                 reply.status = EINVAL;
2681                 reply.errmsg = "Can not set READONLY on persistent db";
2682                 goto done;
2683         }
2684
2685         db->flags |= CTDB_DB_FLAGS_READONLY;
2686         reply.status = 0;
2687         reply.errmsg = NULL;
2688
2689 done:
2690         client_send_control(req, header, &reply);
2691 }
2692
2693 static void control_set_db_sticky(TALLOC_CTX *mem_ctx,
2694                                     struct tevent_req *req,
2695                                     struct ctdb_req_header *header,
2696                                     struct ctdb_req_control *request)
2697 {
2698         struct client_state *state = tevent_req_data(
2699                 req, struct client_state);
2700         struct ctdbd_context *ctdb = state->ctdb;
2701         struct ctdb_reply_control reply;
2702         struct database *db;
2703
2704         reply.rdata.opcode = request->opcode;
2705
2706         db = database_find(ctdb->db_map, request->rdata.data.db_id);
2707         if (db == NULL) {
2708                 reply.status = ENOENT;
2709                 reply.errmsg = "Database not found";
2710                 goto done;
2711         }
2712
2713         if (db->flags & CTDB_DB_FLAGS_PERSISTENT) {
2714                 reply.status = EINVAL;
2715                 reply.errmsg = "Can not set STICKY on persistent db";
2716                 goto done;
2717         }
2718
2719         db->flags |= CTDB_DB_FLAGS_STICKY;
2720         reply.status = 0;
2721         reply.errmsg = NULL;
2722
2723 done:
2724         client_send_control(req, header, &reply);
2725 }
2726
2727 static void control_ipreallocated(TALLOC_CTX *mem_ctx,
2728                                   struct tevent_req *req,
2729                                   struct ctdb_req_header *header,
2730                                   struct ctdb_req_control *request)
2731 {
2732         struct ctdb_reply_control reply;
2733
2734         /* Always succeed */
2735         reply.rdata.opcode = request->opcode;
2736         reply.status = 0;
2737         reply.errmsg = NULL;
2738
2739         client_send_control(req, header, &reply);
2740 }
2741
2742 static void control_get_runstate(TALLOC_CTX *mem_ctx,
2743                                  struct tevent_req *req,
2744                                  struct ctdb_req_header *header,
2745                                  struct ctdb_req_control *request)
2746 {
2747         struct client_state *state = tevent_req_data(
2748                 req, struct client_state);
2749         struct ctdbd_context *ctdb = state->ctdb;
2750         struct ctdb_reply_control reply;
2751
2752         reply.rdata.opcode = request->opcode;
2753         reply.rdata.data.runstate = ctdb->runstate;
2754         reply.status = 0;
2755         reply.errmsg = NULL;
2756
2757         client_send_control(req, header, &reply);
2758 }
2759
2760 static void control_get_nodes_file(TALLOC_CTX *mem_ctx,
2761                                    struct tevent_req *req,
2762                                    struct ctdb_req_header *header,
2763                                    struct ctdb_req_control *request)
2764 {
2765         struct ctdb_reply_control reply;
2766         struct ctdb_node_map *nodemap;
2767
2768         reply.rdata.opcode = request->opcode;
2769
2770         nodemap = read_nodes_file(mem_ctx, header->destnode);
2771         if (nodemap == NULL) {
2772                 goto fail;
2773         }
2774
2775         reply.rdata.data.nodemap = nodemap;
2776         reply.status = 0;
2777         reply.errmsg = NULL;
2778         client_send_control(req, header, &reply);
2779         return;
2780
2781 fail:
2782         reply.status = -1;
2783         reply.errmsg = "Failed to read nodes file";
2784         client_send_control(req, header, &reply);
2785 }
2786
2787 static void control_check_pid_srvid(TALLOC_CTX *mem_ctx,
2788                                     struct tevent_req *req,
2789                                     struct ctdb_req_header *header,
2790                                     struct ctdb_req_control *request)
2791 {
2792         struct client_state *state = tevent_req_data(
2793                 req, struct client_state);
2794         struct ctdbd_context *ctdb = state->ctdb;
2795         struct ctdb_client *client;
2796         struct client_state *cstate;
2797         struct ctdb_reply_control reply;
2798         bool pid_found, srvid_found;
2799         int ret;
2800
2801         reply.rdata.opcode = request->opcode;
2802
2803         pid_found = false;
2804         srvid_found = false;
2805
2806         for (client=ctdb->client_list; client != NULL; client=client->next) {
2807                 if (client->pid == request->rdata.data.pid_srvid->pid) {
2808                         pid_found = true;
2809                         cstate = (struct client_state *)client->state;
2810                         ret = srvid_exists(ctdb->srv,
2811                                            request->rdata.data.pid_srvid->srvid,
2812                                            cstate);
2813                         if (ret == 0) {
2814                                 srvid_found = true;
2815                                 ret = kill(cstate->pid, 0);
2816                                 if (ret != 0) {
2817                                         reply.status = ret;
2818                                         reply.errmsg = strerror(errno);
2819                                 } else {
2820                                         reply.status = 0;
2821                                         reply.errmsg = NULL;
2822                                 }
2823                         }
2824                 }
2825         }
2826
2827         if (! pid_found) {
2828                 reply.status = -1;
2829                 reply.errmsg = "No client for PID";
2830         } else if (! srvid_found) {
2831                 reply.status = -1;
2832                 reply.errmsg = "No client for PID and SRVID";
2833         }
2834
2835         client_send_control(req, header, &reply);
2836 }
2837
2838 static bool fake_control_failure(TALLOC_CTX *mem_ctx,
2839                                  struct tevent_req *req,
2840                                  struct ctdb_req_header *header,
2841                                  struct ctdb_req_control *request)
2842 {
2843         struct client_state *state = tevent_req_data(
2844                 req, struct client_state);
2845         struct ctdbd_context *ctdb = state->ctdb;
2846         struct ctdb_reply_control reply;
2847         struct fake_control_failure *f = NULL;
2848
2849         D_DEBUG("Checking fake control failure for control %u on node %u\n",
2850                 request->opcode, header->destnode);
2851         for (f = ctdb->control_failures; f != NULL; f = f->next) {
2852                 if (f->opcode == request->opcode &&
2853                     (f->pnn == header->destnode ||
2854                      f->pnn == CTDB_UNKNOWN_PNN)) {
2855
2856                         reply.rdata.opcode = request->opcode;
2857                         if (strcmp(f->error, "TIMEOUT") == 0) {
2858                                 /* Causes no reply */
2859                                 D_ERR("Control %u fake timeout on node %u\n",
2860                                       request->opcode, header->destnode);
2861                                 return true;
2862                         } else if (strcmp(f->error, "ERROR") == 0) {
2863                                 D_ERR("Control %u fake error on node %u\n",
2864                                       request->opcode, header->destnode);
2865                                 reply.status = -1;
2866                                 reply.errmsg = f->comment;
2867                                 client_send_control(req, header, &reply);
2868                                 return true;
2869                         }
2870                 }
2871         }
2872
2873         return false;
2874 }
2875
2876 static void control_error(TALLOC_CTX *mem_ctx,
2877                           struct tevent_req *req,
2878                           struct ctdb_req_header *header,
2879                           struct ctdb_req_control *request)
2880 {
2881         struct ctdb_reply_control reply;
2882
2883         reply.rdata.opcode = request->opcode;
2884         reply.status = -1;
2885         reply.errmsg = "Not implemented";
2886
2887         client_send_control(req, header, &reply);
2888 }
2889
2890 /*
2891  * Handling protocol - messages
2892  */
2893
2894 struct disable_recoveries_state {
2895         struct node *node;
2896 };
2897
2898 static void disable_recoveries_callback(struct tevent_req *subreq)
2899 {
2900         struct disable_recoveries_state *substate = tevent_req_callback_data(
2901                 subreq, struct disable_recoveries_state);
2902         bool status;
2903
2904         status = tevent_wakeup_recv(subreq);
2905         TALLOC_FREE(subreq);
2906         if (! status) {
2907                 DEBUG(DEBUG_INFO, ("tevent_wakeup_recv failed\n"));
2908         }
2909
2910         substate->node->recovery_disabled = false;
2911         TALLOC_FREE(substate->node->recovery_substate);
2912 }
2913
2914 static void message_disable_recoveries(TALLOC_CTX *mem_ctx,
2915                                        struct tevent_req *req,
2916                                        struct ctdb_req_header *header,
2917                                        struct ctdb_req_message *request)
2918 {
2919         struct client_state *state = tevent_req_data(
2920                 req, struct client_state);
2921         struct tevent_req *subreq;
2922         struct ctdbd_context *ctdb = state->ctdb;
2923         struct disable_recoveries_state *substate;
2924         struct ctdb_disable_message *disable = request->data.disable;
2925         struct ctdb_req_message_data reply;
2926         struct node *node;
2927         int ret = -1;
2928         TDB_DATA data;
2929
2930         node = &ctdb->node_map->node[header->destnode];
2931
2932         if (disable->timeout == 0) {
2933                 TALLOC_FREE(node->recovery_substate);
2934                 node->recovery_disabled = false;
2935                 DEBUG(DEBUG_INFO, ("Enabled recoveries on node %u\n",
2936                                    header->destnode));
2937                 goto done;
2938         }
2939
2940         substate = talloc_zero(ctdb->node_map,
2941                                struct disable_recoveries_state);
2942         if (substate == NULL) {
2943                 goto fail;
2944         }
2945
2946         substate->node = node;
2947
2948         subreq = tevent_wakeup_send(substate, state->ev,
2949                                     tevent_timeval_current_ofs(
2950                                             disable->timeout, 0));
2951         if (subreq == NULL) {
2952                 talloc_free(substate);
2953                 goto fail;
2954         }
2955         tevent_req_set_callback(subreq, disable_recoveries_callback, substate);
2956
2957         DEBUG(DEBUG_INFO, ("Disabled recoveries for %d seconds on node %u\n",
2958                            disable->timeout, header->destnode));
2959         node->recovery_substate = substate;
2960         node->recovery_disabled = true;
2961
2962 done:
2963         ret = header->destnode;
2964
2965 fail:
2966         reply.srvid = disable->srvid;
2967         data.dptr = (uint8_t *)&ret;
2968         data.dsize = sizeof(int);
2969         reply.data = data;
2970
2971         client_send_message(req, header, &reply);
2972 }
2973
2974 static void message_takeover_run(TALLOC_CTX *mem_ctx,
2975                                  struct tevent_req *req,
2976                                  struct ctdb_req_header *header,
2977                                  struct ctdb_req_message *request)
2978 {
2979         struct client_state *state = tevent_req_data(
2980                 req, struct client_state);
2981         struct ctdbd_context *ctdb = state->ctdb;
2982         struct ctdb_srvid_message *srvid = request->data.msg;
2983         struct ctdb_req_message_data reply;
2984         int ret = -1;
2985         TDB_DATA data;
2986
2987         if (header->destnode != ctdb->node_map->recmaster) {
2988                 /* No reply! Only recmaster replies... */
2989                 return;
2990         }
2991
2992         DEBUG(DEBUG_INFO, ("IP takover run on node %u\n",
2993                            header->destnode));
2994         ret = header->destnode;
2995
2996         reply.srvid = srvid->srvid;
2997         data.dptr = (uint8_t *)&ret;
2998         data.dsize = sizeof(int);
2999         reply.data = data;
3000
3001         client_send_message(req, header, &reply);
3002 }
3003
3004 /*
3005  * Handle a single client
3006  */
3007
3008 static void client_read_handler(uint8_t *buf, size_t buflen,
3009                                 void *private_data);
3010 static void client_dead_handler(void *private_data);
3011 static void client_process_packet(struct tevent_req *req,
3012                                   uint8_t *buf, size_t buflen);
3013 static void client_process_message(struct tevent_req *req,
3014                                    uint8_t *buf, size_t buflen);
3015 static void client_process_control(struct tevent_req *req,
3016                                    uint8_t *buf, size_t buflen);
3017 static void client_reply_done(struct tevent_req *subreq);
3018
3019 static struct tevent_req *client_send(TALLOC_CTX *mem_ctx,
3020                                       struct tevent_context *ev,
3021                                       int fd, struct ctdbd_context *ctdb,
3022                                       int pnn)
3023 {
3024         struct tevent_req *req;
3025         struct client_state *state;
3026         struct ucred cr;
3027         socklen_t crl = sizeof(struct ucred);
3028         int ret;
3029
3030         req = tevent_req_create(mem_ctx, &state, struct client_state);
3031         if (req == NULL) {
3032                 return NULL;
3033         }
3034
3035         state->ev = ev;
3036         state->fd = fd;
3037         state->ctdb = ctdb;
3038         state->pnn = pnn;
3039
3040         ret = getsockopt(fd, SOL_SOCKET, SO_PEERCRED, &cr, &crl);
3041         if (ret != 0) {
3042                 tevent_req_error(req, ret);
3043                 return tevent_req_post(req, ev);
3044         }
3045         state->pid = cr.pid;
3046
3047         ret = comm_setup(state, ev, fd, client_read_handler, req,
3048                          client_dead_handler, req, &state->comm);
3049         if (ret != 0) {
3050                 tevent_req_error(req, ret);
3051                 return tevent_req_post(req, ev);
3052         }
3053
3054         ret = client_add(ctdb, state->pid, state);
3055         if (ret != 0) {
3056                 tevent_req_error(req, ret);
3057                 return tevent_req_post(req, ev);
3058         }
3059
3060         DEBUG(DEBUG_INFO, ("New client fd=%d\n", fd));
3061
3062         return req;
3063 }
3064
3065 static void client_read_handler(uint8_t *buf, size_t buflen,
3066                                 void *private_data)
3067 {
3068         struct tevent_req *req = talloc_get_type_abort(
3069                 private_data, struct tevent_req);
3070         struct client_state *state = tevent_req_data(
3071                 req, struct client_state);
3072         struct ctdbd_context *ctdb = state->ctdb;
3073         struct ctdb_req_header header;
3074         size_t np;
3075         int ret, i;
3076
3077         ret = ctdb_req_header_pull(buf, buflen, &header, &np);
3078         if (ret != 0) {
3079                 return;
3080         }
3081
3082         if (buflen != header.length) {
3083                 return;
3084         }
3085
3086         ret = ctdb_req_header_verify(&header, 0);
3087         if (ret != 0) {
3088                 return;
3089         }
3090
3091         header_fix_pnn(&header, ctdb);
3092
3093         if (header.destnode == CTDB_BROADCAST_ALL) {
3094                 for (i=0; i<ctdb->node_map->num_nodes; i++) {
3095                         header.destnode = i;
3096
3097                         ctdb_req_header_push(&header, buf, &np);
3098                         client_process_packet(req, buf, buflen);
3099                 }
3100                 return;
3101         }
3102
3103         if (header.destnode == CTDB_BROADCAST_CONNECTED) {
3104                 for (i=0; i<ctdb->node_map->num_nodes; i++) {
3105                         if (ctdb->node_map->node[i].flags &
3106                             NODE_FLAGS_DISCONNECTED) {
3107                                 continue;
3108                         }
3109
3110                         header.destnode = i;
3111
3112                         ctdb_req_header_push(&header, buf, &np);
3113                         client_process_packet(req, buf, buflen);
3114                 }
3115                 return;
3116         }
3117
3118         if (header.destnode > ctdb->node_map->num_nodes) {
3119                 fprintf(stderr, "Invalid destination pnn 0x%x\n",
3120                         header.destnode);
3121                 return;
3122         }
3123
3124
3125         if (ctdb->node_map->node[header.destnode].flags & NODE_FLAGS_DISCONNECTED) {
3126                 fprintf(stderr, "Packet for disconnected node pnn %u\n",
3127                         header.destnode);
3128                 return;
3129         }
3130
3131         ctdb_req_header_push(&header, buf, &np);
3132         client_process_packet(req, buf, buflen);
3133 }
3134
3135 static void client_dead_handler(void *private_data)
3136 {
3137         struct tevent_req *req = talloc_get_type_abort(
3138                 private_data, struct tevent_req);
3139
3140         tevent_req_done(req);
3141 }
3142
3143 static void client_process_packet(struct tevent_req *req,
3144                                   uint8_t *buf, size_t buflen)
3145 {
3146         struct ctdb_req_header header;
3147         size_t np;
3148         int ret;
3149
3150         ret = ctdb_req_header_pull(buf, buflen, &header, &np);
3151         if (ret != 0) {
3152                 return;
3153         }
3154
3155         switch (header.operation) {
3156         case CTDB_REQ_MESSAGE:
3157                 client_process_message(req, buf, buflen);
3158                 break;
3159
3160         case CTDB_REQ_CONTROL:
3161                 client_process_control(req, buf, buflen);
3162                 break;
3163
3164         default:
3165                 break;
3166         }
3167 }
3168
3169 static void client_process_message(struct tevent_req *req,
3170                                    uint8_t *buf, size_t buflen)
3171 {
3172         struct client_state *state = tevent_req_data(
3173                 req, struct client_state);
3174         struct ctdbd_context *ctdb = state->ctdb;
3175         TALLOC_CTX *mem_ctx;
3176         struct ctdb_req_header header;
3177         struct ctdb_req_message request;
3178         uint64_t srvid;
3179         int ret;
3180
3181         mem_ctx = talloc_new(state);
3182         if (tevent_req_nomem(mem_ctx, req)) {
3183                 return;
3184         }
3185
3186         ret = ctdb_req_message_pull(buf, buflen, &header, mem_ctx, &request);
3187         if (ret != 0) {
3188                 talloc_free(mem_ctx);
3189                 tevent_req_error(req, ret);
3190                 return;
3191         }
3192
3193         header_fix_pnn(&header, ctdb);
3194
3195         if (header.destnode >= ctdb->node_map->num_nodes) {
3196                 /* Many messages are not replied to, so just behave as
3197                  * though this message was not received */
3198                 fprintf(stderr, "Invalid node %d\n", header.destnode);
3199                 talloc_free(mem_ctx);
3200                 return;
3201         }
3202
3203         srvid = request.srvid;
3204         DEBUG(DEBUG_INFO, ("request srvid = 0x%"PRIx64"\n", srvid));
3205
3206         if (srvid == CTDB_SRVID_DISABLE_RECOVERIES) {
3207                 message_disable_recoveries(mem_ctx, req, &header, &request);
3208         } else if (srvid == CTDB_SRVID_TAKEOVER_RUN) {
3209                 message_takeover_run(mem_ctx, req, &header, &request);
3210         }
3211
3212         /* check srvid */
3213         talloc_free(mem_ctx);
3214 }
3215
3216 static void client_process_control(struct tevent_req *req,
3217                                    uint8_t *buf, size_t buflen)
3218 {
3219         struct client_state *state = tevent_req_data(
3220                 req, struct client_state);
3221         struct ctdbd_context *ctdb = state->ctdb;
3222         TALLOC_CTX *mem_ctx;
3223         struct ctdb_req_header header;
3224         struct ctdb_req_control request;
3225         int ret;
3226
3227         mem_ctx = talloc_new(state);
3228         if (tevent_req_nomem(mem_ctx, req)) {
3229                 return;
3230         }
3231
3232         ret = ctdb_req_control_pull(buf, buflen, &header, mem_ctx, &request);
3233         if (ret != 0) {
3234                 talloc_free(mem_ctx);
3235                 tevent_req_error(req, ret);
3236                 return;
3237         }
3238
3239         header_fix_pnn(&header, ctdb);
3240
3241         if (header.destnode >= ctdb->node_map->num_nodes) {
3242                 struct ctdb_reply_control reply;
3243
3244                 reply.rdata.opcode = request.opcode;
3245                 reply.errmsg = "Invalid node";
3246                 reply.status = -1;
3247                 client_send_control(req, &header, &reply);
3248                 return;
3249         }
3250
3251         DEBUG(DEBUG_INFO, ("request opcode = %u, reqid = %u\n",
3252                            request.opcode, header.reqid));
3253
3254         if (fake_control_failure(mem_ctx, req, &header, &request)) {
3255                 goto done;
3256         }
3257
3258         switch (request.opcode) {
3259         case CTDB_CONTROL_PROCESS_EXISTS:
3260                 control_process_exists(mem_ctx, req, &header, &request);
3261                 break;
3262
3263         case CTDB_CONTROL_PING:
3264                 control_ping(mem_ctx, req, &header, &request);
3265                 break;
3266
3267         case CTDB_CONTROL_GETDBPATH:
3268                 control_getdbpath(mem_ctx, req, &header, &request);
3269                 break;
3270
3271         case CTDB_CONTROL_GETVNNMAP:
3272                 control_getvnnmap(mem_ctx, req, &header, &request);
3273                 break;
3274
3275         case CTDB_CONTROL_GET_DEBUG:
3276                 control_get_debug(mem_ctx, req, &header, &request);
3277                 break;
3278
3279         case CTDB_CONTROL_SET_DEBUG:
3280                 control_set_debug(mem_ctx, req, &header, &request);
3281                 break;
3282
3283         case CTDB_CONTROL_GET_DBMAP:
3284                 control_get_dbmap(mem_ctx, req, &header, &request);
3285                 break;
3286
3287         case CTDB_CONTROL_GET_RECMODE:
3288                 control_get_recmode(mem_ctx, req, &header, &request);
3289                 break;
3290
3291         case CTDB_CONTROL_SET_RECMODE:
3292                 control_set_recmode(mem_ctx, req, &header, &request);
3293                 break;
3294
3295         case CTDB_CONTROL_REGISTER_SRVID:
3296                 control_register_srvid(mem_ctx, req, &header, &request);
3297                 break;
3298
3299         case CTDB_CONTROL_DEREGISTER_SRVID:
3300                 control_deregister_srvid(mem_ctx, req, &header, &request);
3301                 break;
3302
3303         case CTDB_CONTROL_GET_DBNAME:
3304                 control_get_dbname(mem_ctx, req, &header, &request);
3305                 break;
3306
3307         case CTDB_CONTROL_GET_PID:
3308                 control_get_pid(mem_ctx, req, &header, &request);
3309                 break;
3310
3311         case CTDB_CONTROL_GET_RECMASTER:
3312                 control_get_recmaster(mem_ctx, req, &header, &request);
3313                 break;
3314
3315         case CTDB_CONTROL_GET_PNN:
3316                 control_get_pnn(mem_ctx, req, &header, &request);
3317                 break;
3318
3319         case CTDB_CONTROL_SHUTDOWN:
3320                 control_shutdown(mem_ctx, req, &header, &request);
3321                 break;
3322
3323         case CTDB_CONTROL_SET_TUNABLE:
3324                 control_set_tunable(mem_ctx, req, &header, &request);
3325                 break;
3326
3327         case CTDB_CONTROL_GET_TUNABLE:
3328                 control_get_tunable(mem_ctx, req, &header, &request);
3329                 break;
3330
3331         case CTDB_CONTROL_LIST_TUNABLES:
3332                 control_list_tunables(mem_ctx, req, &header, &request);
3333                 break;
3334
3335         case CTDB_CONTROL_MODIFY_FLAGS:
3336                 control_modify_flags(mem_ctx, req, &header, &request);
3337                 break;
3338
3339         case CTDB_CONTROL_GET_ALL_TUNABLES:
3340                 control_get_all_tunables(mem_ctx, req, &header, &request);
3341                 break;
3342
3343         case CTDB_CONTROL_UPTIME:
3344                 control_uptime(mem_ctx, req, &header, &request);
3345                 break;
3346
3347         case CTDB_CONTROL_RELOAD_NODES_FILE:
3348                 control_reload_nodes_file(mem_ctx, req, &header, &request);
3349                 break;
3350
3351         case CTDB_CONTROL_GET_CAPABILITIES:
3352                 control_get_capabilities(mem_ctx, req, &header, &request);
3353                 break;
3354
3355         case CTDB_CONTROL_RELEASE_IP:
3356                 control_release_ip(mem_ctx, req, &header, &request);
3357                 break;
3358
3359         case CTDB_CONTROL_TAKEOVER_IP:
3360                 control_takeover_ip(mem_ctx, req, &header, &request);
3361                 break;
3362
3363         case CTDB_CONTROL_GET_PUBLIC_IPS:
3364                 control_get_public_ips(mem_ctx, req, &header, &request);
3365                 break;
3366
3367         case CTDB_CONTROL_GET_NODEMAP:
3368                 control_get_nodemap(mem_ctx, req, &header, &request);
3369                 break;
3370
3371         case CTDB_CONTROL_GET_RECLOCK_FILE:
3372                 control_get_reclock_file(mem_ctx, req, &header, &request);
3373                 break;
3374
3375         case CTDB_CONTROL_STOP_NODE:
3376                 control_stop_node(mem_ctx, req, &header, &request);
3377                 break;
3378
3379         case CTDB_CONTROL_CONTINUE_NODE:
3380                 control_continue_node(mem_ctx, req, &header, &request);
3381                 break;
3382
3383         case CTDB_CONTROL_SET_BAN_STATE:
3384                 control_set_ban_state(mem_ctx, req, &header, &request);
3385                 break;
3386
3387         case CTDB_CONTROL_GET_DB_SEQNUM:
3388                 control_get_db_seqnum(mem_ctx, req, &header, &request);
3389                 break;
3390
3391         case CTDB_CONTROL_DB_GET_HEALTH:
3392                 control_db_get_health(mem_ctx, req, &header, &request);
3393                 break;
3394
3395         case CTDB_CONTROL_GET_PUBLIC_IP_INFO:
3396                 control_get_public_ip_info(mem_ctx, req, &header, &request);
3397                 break;
3398
3399         case CTDB_CONTROL_GET_IFACES:
3400                 control_get_ifaces(mem_ctx, req, &header, &request);
3401                 break;
3402
3403         case CTDB_CONTROL_SET_IFACE_LINK_STATE:
3404                 control_set_iface_link_state(mem_ctx, req, &header, &request);
3405                 break;
3406
3407         case CTDB_CONTROL_SET_DB_READONLY:
3408                 control_set_db_readonly(mem_ctx, req, &header, &request);
3409                 break;
3410
3411         case CTDB_CONTROL_SET_DB_STICKY:
3412                 control_set_db_sticky(mem_ctx, req, &header, &request);
3413                 break;
3414
3415         case CTDB_CONTROL_IPREALLOCATED:
3416                 control_ipreallocated(mem_ctx, req, &header, &request);
3417                 break;
3418
3419         case CTDB_CONTROL_GET_RUNSTATE:
3420                 control_get_runstate(mem_ctx, req, &header, &request);
3421                 break;
3422
3423         case CTDB_CONTROL_GET_NODES_FILE:
3424                 control_get_nodes_file(mem_ctx, req, &header, &request);
3425                 break;
3426
3427         case CTDB_CONTROL_CHECK_PID_SRVID:
3428                 control_check_pid_srvid(mem_ctx, req, &header, &request);
3429                 break;
3430
3431         default:
3432                 if (! (request.flags & CTDB_CTRL_FLAG_NOREPLY)) {
3433                         control_error(mem_ctx, req, &header, &request);
3434                 }
3435                 break;
3436         }
3437
3438 done:
3439         talloc_free(mem_ctx);
3440 }
3441
3442 static int client_recv(struct tevent_req *req, int *perr)
3443 {
3444         struct client_state *state = tevent_req_data(
3445                 req, struct client_state);
3446         int err;
3447
3448         DEBUG(DEBUG_INFO, ("Client done fd=%d\n", state->fd));
3449         close(state->fd);
3450
3451         if (tevent_req_is_unix_error(req, &err)) {
3452                 if (perr != NULL) {
3453                         *perr = err;
3454                 }
3455                 return -1;
3456         }
3457
3458         return state->status;
3459 }
3460
3461 /*
3462  * Fake CTDB server
3463  */
3464
3465 struct server_state {
3466         struct tevent_context *ev;
3467         struct ctdbd_context *ctdb;
3468         int fd;
3469 };
3470
3471 static void server_new_client(struct tevent_req *subreq);
3472 static void server_client_done(struct tevent_req *subreq);
3473
3474 static struct tevent_req *server_send(TALLOC_CTX *mem_ctx,
3475                                       struct tevent_context *ev,
3476                                       struct ctdbd_context *ctdb,
3477                                       int fd)
3478 {
3479         struct tevent_req *req, *subreq;
3480         struct server_state *state;
3481
3482         req = tevent_req_create(mem_ctx, &state, struct server_state);
3483         if (req == NULL) {
3484                 return NULL;
3485         }
3486
3487         state->ev = ev;
3488         state->ctdb = ctdb;
3489         state->fd = fd;
3490
3491         subreq = accept_send(state, ev, fd);
3492         if (tevent_req_nomem(subreq, req)) {
3493                 return tevent_req_post(req, ev);
3494         }
3495         tevent_req_set_callback(subreq, server_new_client, req);
3496
3497         return req;
3498 }
3499
3500 static void server_new_client(struct tevent_req *subreq)
3501 {
3502         struct tevent_req *req = tevent_req_callback_data(
3503                 subreq, struct tevent_req);
3504         struct server_state *state = tevent_req_data(
3505                 req, struct server_state);
3506         struct ctdbd_context *ctdb = state->ctdb;
3507         int client_fd;
3508         int ret = 0;
3509
3510         client_fd = accept_recv(subreq, NULL, NULL, &ret);
3511         TALLOC_FREE(subreq);
3512         if (client_fd == -1) {
3513                 tevent_req_error(req, ret);
3514                 return;
3515         }
3516
3517         subreq = client_send(state, state->ev, client_fd,
3518                              ctdb, ctdb->node_map->pnn);
3519         if (tevent_req_nomem(subreq, req)) {
3520                 return;
3521         }
3522         tevent_req_set_callback(subreq, server_client_done, req);
3523
3524         ctdb->num_clients += 1;
3525
3526         subreq = accept_send(state, state->ev, state->fd);
3527         if (tevent_req_nomem(subreq, req)) {
3528                 return;
3529         }
3530         tevent_req_set_callback(subreq, server_new_client, req);
3531 }
3532
3533 static void server_client_done(struct tevent_req *subreq)
3534 {
3535         struct tevent_req *req = tevent_req_callback_data(
3536                 subreq, struct tevent_req);
3537         struct server_state *state = tevent_req_data(
3538                 req, struct server_state);
3539         struct ctdbd_context *ctdb = state->ctdb;
3540         int ret = 0;
3541         int status;
3542
3543         status = client_recv(subreq, &ret);
3544         TALLOC_FREE(subreq);
3545         if (status < 0) {
3546                 tevent_req_error(req, ret);
3547                 return;
3548         }
3549
3550         ctdb->num_clients -= 1;
3551
3552         if (status == 99) {
3553                 /* Special status, to shutdown server */
3554                 DEBUG(DEBUG_INFO, ("Shutting down server\n"));
3555                 tevent_req_done(req);
3556         }
3557 }
3558
3559 static bool server_recv(struct tevent_req *req, int *perr)
3560 {
3561         int err;
3562
3563         if (tevent_req_is_unix_error(req, &err)) {
3564                 if (perr != NULL) {
3565                         *perr = err;
3566                 }
3567                 return false;
3568         }
3569         return true;
3570 }
3571
3572 /*
3573  * Main functions
3574  */
3575
3576 static int socket_init(const char *sockpath)
3577 {
3578         struct sockaddr_un addr;
3579         size_t len;
3580         int ret, fd;
3581
3582         memset(&addr, 0, sizeof(addr));
3583         addr.sun_family = AF_UNIX;
3584
3585         len = strlcpy(addr.sun_path, sockpath, sizeof(addr.sun_path));
3586         if (len >= sizeof(addr.sun_path)) {
3587                 fprintf(stderr, "path too long: %s\n", sockpath);
3588                 return -1;
3589         }
3590
3591         fd = socket(AF_UNIX, SOCK_STREAM, 0);
3592         if (fd == -1) {
3593                 fprintf(stderr, "socket failed - %s\n", sockpath);
3594                 return -1;
3595         }
3596
3597         ret = bind(fd, (struct sockaddr *)&addr, sizeof(addr));
3598         if (ret != 0) {
3599                 fprintf(stderr, "bind failed - %s\n", sockpath);
3600                 goto fail;
3601         }
3602
3603         ret = listen(fd, 10);
3604         if (ret != 0) {
3605                 fprintf(stderr, "listen failed\n");
3606                 goto fail;
3607         }
3608
3609         DEBUG(DEBUG_INFO, ("Socket init done\n"));
3610
3611         return fd;
3612
3613 fail:
3614         if (fd != -1) {
3615                 close(fd);
3616         }
3617         return -1;
3618 }
3619
3620 static struct options {
3621         const char *sockpath;
3622         const char *pidfile;
3623         const char *debuglevel;
3624 } options;
3625
3626 static struct poptOption cmdline_options[] = {
3627         { "socket", 's', POPT_ARG_STRING, &options.sockpath, 0,
3628                 "Unix domain socket path", "filename" },
3629         { "pidfile", 'p', POPT_ARG_STRING, &options.pidfile, 0,
3630                 "pid file", "filename" } ,
3631         { "debug", 'd', POPT_ARG_STRING, &options.debuglevel, 0,
3632                 "debug level", "ERR|WARNING|NOTICE|INFO|DEBUG" } ,
3633 };
3634
3635 static void cleanup(void)
3636 {
3637         unlink(options.sockpath);
3638         unlink(options.pidfile);
3639 }
3640
3641 static void signal_handler(int sig)
3642 {
3643         cleanup();
3644         exit(0);
3645 }
3646
3647 static void start_server(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
3648                          struct ctdbd_context *ctdb, int fd, int pfd)
3649 {
3650         struct tevent_req *req;
3651         int ret = 0;
3652         ssize_t len;
3653
3654         atexit(cleanup);
3655         signal(SIGTERM, signal_handler);
3656
3657         req = server_send(mem_ctx, ev, ctdb, fd);
3658         if (req == NULL) {
3659                 fprintf(stderr, "Memory error\n");
3660                 exit(1);
3661         }
3662
3663         len = write(pfd, &ret, sizeof(ret));
3664         if (len != sizeof(ret)) {
3665                 fprintf(stderr, "Failed to send message to parent\n");
3666                 exit(1);
3667         }
3668         close(pfd);
3669
3670         tevent_req_poll(req, ev);
3671
3672         server_recv(req, &ret);
3673         if (ret != 0) {
3674                 exit(1);
3675         }
3676 }
3677
3678 int main(int argc, const char *argv[])
3679 {
3680         TALLOC_CTX *mem_ctx;
3681         struct ctdbd_context *ctdb;
3682         struct tevent_context *ev;
3683         poptContext pc;
3684         int opt, fd, ret, pfd[2];
3685         ssize_t len;
3686         pid_t pid;
3687         FILE *fp;
3688
3689         pc = poptGetContext(argv[0], argc, argv, cmdline_options,
3690                             POPT_CONTEXT_KEEP_FIRST);
3691         while ((opt = poptGetNextOpt(pc)) != -1) {
3692                 fprintf(stderr, "Invalid option %s\n", poptBadOption(pc, 0));
3693                 exit(1);
3694         }
3695
3696         if (options.sockpath == NULL) {
3697                 fprintf(stderr, "Please specify socket path\n");
3698                 poptPrintHelp(pc, stdout, 0);
3699                 exit(1);
3700         }
3701
3702         if (options.pidfile == NULL) {
3703                 fprintf(stderr, "Please specify pid file\n");
3704                 poptPrintHelp(pc, stdout, 0);
3705                 exit(1);
3706         }
3707
3708         mem_ctx = talloc_new(NULL);
3709         if (mem_ctx == NULL) {
3710                 fprintf(stderr, "Memory error\n");
3711                 exit(1);
3712         }
3713
3714         ret = logging_init(mem_ctx, "file:", options.debuglevel, "fake-ctdbd");
3715         if (ret != 0) {
3716                 fprintf(stderr, "Invalid debug level\n");
3717                 poptPrintHelp(pc, stdout, 0);
3718                 exit(1);
3719         }
3720
3721         ctdb = ctdbd_setup(mem_ctx);
3722         if (ctdb == NULL) {
3723                 exit(1);
3724         }
3725
3726         if (! ctdbd_verify(ctdb)) {
3727                 exit(1);
3728         }
3729
3730         ev = tevent_context_init(mem_ctx);
3731         if (ev == NULL) {
3732                 fprintf(stderr, "Memory error\n");
3733                 exit(1);
3734         }
3735
3736         fd = socket_init(options.sockpath);
3737         if (fd == -1) {
3738                 exit(1);
3739         }
3740
3741         ret = pipe(pfd);
3742         if (ret != 0) {
3743                 fprintf(stderr, "Failed to create pipe\n");
3744                 cleanup();
3745                 exit(1);
3746         }
3747
3748         pid = fork();
3749         if (pid == -1) {
3750                 fprintf(stderr, "Failed to fork\n");
3751                 cleanup();
3752                 exit(1);
3753         }
3754
3755         if (pid == 0) {
3756                 /* Child */
3757                 close(pfd[0]);
3758                 start_server(mem_ctx, ev, ctdb, fd, pfd[1]);
3759                 exit(1);
3760         }
3761
3762         /* Parent */
3763         close(pfd[1]);
3764
3765         len = read(pfd[0], &ret, sizeof(ret));
3766         close(pfd[0]);
3767         if (len != sizeof(ret)) {
3768                 fprintf(stderr, "len = %zi\n", len);
3769                 fprintf(stderr, "Failed to get message from child\n");
3770                 kill(pid, SIGTERM);
3771                 exit(1);
3772         }
3773
3774         fp = fopen(options.pidfile, "w");
3775         if (fp == NULL) {
3776                 fprintf(stderr, "Failed to open pid file %s\n",
3777                         options.pidfile);
3778                 kill(pid, SIGTERM);
3779                 exit(1);
3780         }
3781         fprintf(fp, "%d\n", pid);
3782         fclose(fp);
3783
3784         return 0;
3785 }