ctdb-tests: Check an unchecked return value
[amitay/samba.git] / ctdb / tests / src / fake_ctdbd.c
1 /*
2    Fake CTDB server for testing
3
4    Copyright (C) Amitay Isaacs  2016
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "replace.h"
21 #include "system/network.h"
22 #include "system/time.h"
23
24 #include <popt.h>
25 #include <talloc.h>
26 #include <tevent.h>
27 #include <tdb.h>
28
29 #include "lib/util/dlinklist.h"
30 #include "lib/util/tevent_unix.h"
31 #include "lib/util/debug.h"
32 #include "lib/util/samba_util.h"
33 #include "lib/async_req/async_sock.h"
34
35 #include "protocol/protocol.h"
36 #include "protocol/protocol_api.h"
37 #include "protocol/protocol_util.h"
38
39 #include "common/comm.h"
40 #include "common/logging.h"
41 #include "common/tunable.h"
42 #include "common/srvid.h"
43
44 #include "ipalloc_read_known_ips.h"
45
46
47 #define CTDB_PORT 4379
48
49 /* A fake flag that is only supported by some functions */
50 #define NODE_FLAGS_FAKE_TIMEOUT 0x80000000
51
52 struct node {
53         ctdb_sock_addr addr;
54         uint32_t pnn;
55         uint32_t flags;
56         uint32_t capabilities;
57         bool recovery_disabled;
58         void *recovery_substate;
59 };
60
61 struct node_map {
62         uint32_t num_nodes;
63         struct node *node;
64         uint32_t pnn;
65         uint32_t recmaster;
66 };
67
68 struct interface {
69         const char *name;
70         bool link_up;
71         uint32_t references;
72 };
73
74 struct interface_map {
75         int num;
76         struct interface *iface;
77 };
78
79 struct vnn_map {
80         uint32_t recmode;
81         uint32_t generation;
82         uint32_t size;
83         uint32_t *map;
84 };
85
86 struct database {
87         const char *name;
88         uint32_t id;
89         uint8_t flags;
90         uint64_t seq_num;
91 };
92
93 struct database_map {
94         int num_dbs;
95         struct database *db;
96 };
97
98 struct fake_control_failure {
99         struct fake_control_failure  *prev, *next;
100         enum ctdb_controls opcode;
101         uint32_t pnn;
102         const char *error;
103         const char *comment;
104 };
105
106 struct ctdb_client {
107         struct ctdb_client *prev, *next;
108         struct ctdbd_context *ctdb;
109         pid_t pid;
110         void *state;
111 };
112
113 struct ctdbd_context {
114         struct node_map *node_map;
115         struct interface_map *iface_map;
116         struct vnn_map *vnn_map;
117         struct database_map *db_map;
118         struct srvid_context *srv;
119         int num_clients;
120         struct timeval start_time;
121         struct timeval recovery_start_time;
122         struct timeval recovery_end_time;
123         bool takeover_disabled;
124         int log_level;
125         enum ctdb_runstate runstate;
126         struct ctdb_tunable_list tun_list;
127         char *reclock;
128         struct ctdb_public_ip_list *known_ips;
129         struct fake_control_failure *control_failures;
130         struct ctdb_client *client_list;
131 };
132
133 /*
134  * Parse routines
135  */
136
137 static struct node_map *nodemap_init(TALLOC_CTX *mem_ctx)
138 {
139         struct node_map *node_map;
140
141         node_map = talloc_zero(mem_ctx, struct node_map);
142         if (node_map == NULL) {
143                 return NULL;
144         }
145
146         node_map->pnn = CTDB_UNKNOWN_PNN;
147         node_map->recmaster = CTDB_UNKNOWN_PNN;
148
149         return node_map;
150 }
151
152 /* Read a nodemap from stdin.  Each line looks like:
153  *  <PNN> <FLAGS> [RECMASTER] [CURRENT] [CAPABILITIES]
154  * EOF or a blank line terminates input.
155  *
156  * By default, capablities for each node are
157  * CTDB_CAP_RECMASTER|CTDB_CAP_LMASTER.  These 2
158  * capabilities can be faked off by adding, for example,
159  * -CTDB_CAP_RECMASTER.
160  */
161
162 static bool nodemap_parse(struct node_map *node_map)
163 {
164         char line[1024];
165
166         while ((fgets(line, sizeof(line), stdin) != NULL)) {
167                 uint32_t pnn, flags, capabilities;
168                 char *tok, *t;
169                 char *ip;
170                 ctdb_sock_addr saddr;
171                 struct node *node;
172                 int ret;
173
174                 if (line[0] == '\n') {
175                         break;
176                 }
177
178                 /* Get rid of pesky newline */
179                 if ((t = strchr(line, '\n')) != NULL) {
180                         *t = '\0';
181                 }
182
183                 /* Get PNN */
184                 tok = strtok(line, " \t");
185                 if (tok == NULL) {
186                         fprintf(stderr, "bad line (%s) - missing PNN\n", line);
187                         continue;
188                 }
189                 pnn = (uint32_t)strtoul(tok, NULL, 0);
190
191                 /* Get IP */
192                 tok = strtok(NULL, " \t");
193                 if (tok == NULL) {
194                         fprintf(stderr, "bad line (%s) - missing IP\n", line);
195                         continue;
196                 }
197                 ret = ctdb_sock_addr_from_string(tok, &saddr, false);
198                 if (ret != 0) {
199                         fprintf(stderr, "bad line (%s) - invalid IP\n", line);
200                         continue;
201                 }
202                 ctdb_sock_addr_set_port(&saddr, CTDB_PORT);
203                 ip = talloc_strdup(node_map, tok);
204                 if (ip == NULL) {
205                         goto fail;
206                 }
207
208                 /* Get flags */
209                 tok = strtok(NULL, " \t");
210                 if (tok == NULL) {
211                         fprintf(stderr, "bad line (%s) - missing flags\n",
212                                 line);
213                         continue;
214                 }
215                 flags = (uint32_t)strtoul(tok, NULL, 0);
216                 /* Handle deleted nodes */
217                 if (flags & NODE_FLAGS_DELETED) {
218                         talloc_free(ip);
219                         ip = talloc_strdup(node_map, "0.0.0.0");
220                         if (ip == NULL) {
221                                 goto fail;
222                         }
223                 }
224                 capabilities = CTDB_CAP_RECMASTER|CTDB_CAP_LMASTER;
225
226                 tok = strtok(NULL, " \t");
227                 while (tok != NULL) {
228                         if (strcmp(tok, "CURRENT") == 0) {
229                                 node_map->pnn = pnn;
230                         } else if (strcmp(tok, "RECMASTER") == 0) {
231                                 node_map->recmaster = pnn;
232                         } else if (strcmp(tok, "-CTDB_CAP_RECMASTER") == 0) {
233                                 capabilities &= ~CTDB_CAP_RECMASTER;
234                         } else if (strcmp(tok, "-CTDB_CAP_LMASTER") == 0) {
235                                 capabilities &= ~CTDB_CAP_LMASTER;
236                         } else if (strcmp(tok, "TIMEOUT") == 0) {
237                                 /* This can be done with just a flag
238                                  * value but it is probably clearer
239                                  * and less error-prone to fake this
240                                  * with an explicit token */
241                                 flags |= NODE_FLAGS_FAKE_TIMEOUT;
242                         }
243                         tok = strtok(NULL, " \t");
244                 }
245
246                 node_map->node = talloc_realloc(node_map, node_map->node,
247                                                 struct node,
248                                                 node_map->num_nodes + 1);
249                 if (node_map->node == NULL) {
250                         goto fail;
251                 }
252                 node = &node_map->node[node_map->num_nodes];
253
254                 ret = ctdb_sock_addr_from_string(ip, &node->addr, false);
255                 if (ret != 0) {
256                         fprintf(stderr, "bad line (%s) - invalid IP\n", line);
257                         continue;
258                 }
259                 ctdb_sock_addr_set_port(&node->addr, CTDB_PORT);
260                 node->pnn = pnn;
261                 node->flags = flags;
262                 node->capabilities = capabilities;
263                 node->recovery_disabled = false;
264                 node->recovery_substate = NULL;
265
266                 node_map->num_nodes += 1;
267         }
268
269         DEBUG(DEBUG_INFO, ("Parsing nodemap done\n"));
270         return true;
271
272 fail:
273         DEBUG(DEBUG_INFO, ("Parsing nodemap failed\n"));
274         return false;
275
276 }
277
278 /* Append a node to a node map with given address and flags */
279 static bool node_map_add(struct ctdb_node_map *nodemap,
280                          const char *nstr, uint32_t flags)
281 {
282         ctdb_sock_addr addr;
283         uint32_t num;
284         struct ctdb_node_and_flags *n;
285         int ret;
286
287         ret = ctdb_sock_addr_from_string(nstr, &addr, false);
288         if (ret != 0) {
289                 fprintf(stderr, "Invalid IP address %s\n", nstr);
290                 return false;
291         }
292         ctdb_sock_addr_set_port(&addr, CTDB_PORT);
293
294         num = nodemap->num;
295         nodemap->node = talloc_realloc(nodemap, nodemap->node,
296                                        struct ctdb_node_and_flags, num+1);
297         if (nodemap->node == NULL) {
298                 return false;
299         }
300
301         n = &nodemap->node[num];
302         n->addr = addr;
303         n->pnn = num;
304         n->flags = flags;
305
306         nodemap->num = num+1;
307         return true;
308 }
309
310 /* Read a nodes file into a node map */
311 static struct ctdb_node_map *ctdb_read_nodes_file(TALLOC_CTX *mem_ctx,
312                                                   const char *nlist)
313 {
314         char **lines;
315         int nlines;
316         int i;
317         struct ctdb_node_map *nodemap;
318
319         nodemap = talloc_zero(mem_ctx, struct ctdb_node_map);
320         if (nodemap == NULL) {
321                 return NULL;
322         }
323
324         lines = file_lines_load(nlist, &nlines, 0, mem_ctx);
325         if (lines == NULL) {
326                 return NULL;
327         }
328
329         while (nlines > 0 && strcmp(lines[nlines-1], "") == 0) {
330                 nlines--;
331         }
332
333         for (i=0; i<nlines; i++) {
334                 char *node;
335                 uint32_t flags;
336                 size_t len;
337
338                 node = lines[i];
339                 /* strip leading spaces */
340                 while((*node == ' ') || (*node == '\t')) {
341                         node++;
342                 }
343
344                 len = strlen(node);
345
346                 /* strip trailing spaces */
347                 while ((len > 1) &&
348                        ((node[len-1] == ' ') || (node[len-1] == '\t')))
349                 {
350                         node[len-1] = '\0';
351                         len--;
352                 }
353
354                 if (len == 0) {
355                         continue;
356                 }
357                 if (*node == '#') {
358                         /* A "deleted" node is a node that is
359                            commented out in the nodes file.  This is
360                            used instead of removing a line, which
361                            would cause subsequent nodes to change
362                            their PNN. */
363                         flags = NODE_FLAGS_DELETED;
364                         node = discard_const("0.0.0.0");
365                 } else {
366                         flags = 0;
367                 }
368                 if (! node_map_add(nodemap, node, flags)) {
369                         talloc_free(lines);
370                         TALLOC_FREE(nodemap);
371                         return NULL;
372                 }
373         }
374
375         talloc_free(lines);
376         return nodemap;
377 }
378
379 static struct ctdb_node_map *read_nodes_file(TALLOC_CTX *mem_ctx,
380                                              uint32_t pnn)
381 {
382         struct ctdb_node_map *nodemap;
383         char nodepath[PATH_MAX];
384         const char *nodes_list;
385
386         /* read the nodes file */
387         sprintf(nodepath, "CTDB_NODES_%u", pnn);
388         nodes_list = getenv(nodepath);
389         if (nodes_list == NULL) {
390                 nodes_list = getenv("CTDB_NODES");
391                 if (nodes_list == NULL) {
392                         DEBUG(DEBUG_INFO, ("Nodes file not defined\n"));
393                         return NULL;
394                 }
395         }
396
397         nodemap = ctdb_read_nodes_file(mem_ctx, nodes_list);
398         if (nodemap == NULL) {
399                 DEBUG(DEBUG_INFO, ("Failed to read nodes file \"%s\"\n",
400                                    nodes_list));
401                 return NULL;
402         }
403
404         return nodemap;
405 }
406
407 static struct interface_map *interfaces_init(TALLOC_CTX *mem_ctx)
408 {
409         struct interface_map *iface_map;
410
411         iface_map = talloc_zero(mem_ctx, struct interface_map);
412         if (iface_map == NULL) {
413                 return NULL;
414         }
415
416         return iface_map;
417 }
418
419 /* Read interfaces information.  Same format as "ctdb ifaces -Y"
420  * output:
421  *   :Name:LinkStatus:References:
422  *   :eth2:1:4294967294
423  *   :eth1:1:4294967292
424  */
425
426 static bool interfaces_parse(struct interface_map *iface_map)
427 {
428         char line[1024];
429
430         while ((fgets(line, sizeof(line), stdin) != NULL)) {
431                 uint16_t link_state;
432                 uint32_t references;
433                 char *tok, *t, *name;
434                 struct interface *iface;
435
436                 if (line[0] == '\n') {
437                         break;
438                 }
439
440                 /* Get rid of pesky newline */
441                 if ((t = strchr(line, '\n')) != NULL) {
442                         *t = '\0';
443                 }
444
445                 if (strcmp(line, ":Name:LinkStatus:References:") == 0) {
446                         continue;
447                 }
448
449                 /* Leading colon... */
450                 // tok = strtok(line, ":");
451
452                 /* name */
453                 tok = strtok(line, ":");
454                 if (tok == NULL) {
455                         fprintf(stderr, "bad line (%s) - missing name\n", line);
456                         continue;
457                 }
458                 name = tok;
459
460                 /* link_state */
461                 tok = strtok(NULL, ":");
462                 if (tok == NULL) {
463                         fprintf(stderr, "bad line (%s) - missing link state\n",
464                                 line);
465                         continue;
466                 }
467                 link_state = (uint16_t)strtoul(tok, NULL, 0);
468
469                 /* references... */
470                 tok = strtok(NULL, ":");
471                 if (tok == NULL) {
472                         fprintf(stderr, "bad line (%s) - missing references\n",
473                                 line);
474                         continue;
475                 }
476                 references = (uint32_t)strtoul(tok, NULL, 0);
477
478                 iface_map->iface = talloc_realloc(iface_map, iface_map->iface,
479                                                   struct interface,
480                                                   iface_map->num + 1);
481                 if (iface_map->iface == NULL) {
482                         goto fail;
483                 }
484
485                 iface = &iface_map->iface[iface_map->num];
486
487                 iface->name = talloc_strdup(iface_map, name);
488                 if (iface->name == NULL) {
489                         goto fail;
490                 }
491                 iface->link_up = link_state;
492                 iface->references = references;
493
494                 iface_map->num += 1;
495         }
496
497         DEBUG(DEBUG_INFO, ("Parsing interfaces done\n"));
498         return true;
499
500 fail:
501         fprintf(stderr, "Parsing interfaces failed\n");
502         return false;
503 }
504
505 static struct vnn_map *vnnmap_init(TALLOC_CTX *mem_ctx)
506 {
507         struct vnn_map *vnn_map;
508
509         vnn_map = talloc_zero(mem_ctx, struct vnn_map);
510         if (vnn_map == NULL) {
511                 fprintf(stderr, "Memory error\n");
512                 return NULL;
513         }
514         vnn_map->recmode = CTDB_RECOVERY_ACTIVE;
515         vnn_map->generation = INVALID_GENERATION;
516
517         return vnn_map;
518 }
519
520 /* Read vnn map.
521  * output:
522  *   <GENERATION>
523  *   <LMASTER0>
524  *   <LMASTER1>
525  *   ...
526  */
527
528 static bool vnnmap_parse(struct vnn_map *vnn_map)
529 {
530         char line[1024];
531
532         while (fgets(line, sizeof(line), stdin) != NULL) {
533                 uint32_t n;
534                 char *t;
535
536                 if (line[0] == '\n') {
537                         break;
538                 }
539
540                 /* Get rid of pesky newline */
541                 if ((t = strchr(line, '\n')) != NULL) {
542                         *t = '\0';
543                 }
544
545                 n = (uint32_t) strtol(line, NULL, 0);
546
547                 /* generation */
548                 if (vnn_map->generation == INVALID_GENERATION) {
549                         vnn_map->generation = n;
550                         continue;
551                 }
552
553                 vnn_map->map = talloc_realloc(vnn_map, vnn_map->map, uint32_t,
554                                               vnn_map->size + 1);
555                 if (vnn_map->map == NULL) {
556                         fprintf(stderr, "Memory error\n");
557                         goto fail;
558                 }
559
560                 vnn_map->map[vnn_map->size] = n;
561                 vnn_map->size += 1;
562         }
563
564         DEBUG(DEBUG_INFO, ("Parsing vnnmap done\n"));
565         return true;
566
567 fail:
568         fprintf(stderr, "Parsing vnnmap failed\n");
569         return false;
570 }
571
572 static bool reclock_parse(struct ctdbd_context *ctdb)
573 {
574         char line[1024];
575         char *t;
576
577         if (fgets(line, sizeof(line), stdin) == NULL) {
578                 goto fail;
579         }
580
581         if (line[0] == '\n') {
582                 /* Recovery lock remains unset */
583                 goto ok;
584         }
585
586         /* Get rid of pesky newline */
587         if ((t = strchr(line, '\n')) != NULL) {
588                 *t = '\0';
589         }
590
591         ctdb->reclock = talloc_strdup(ctdb, line);
592         if (ctdb->reclock == NULL) {
593                 goto fail;
594         }
595 ok:
596         /* Swallow possible blank line following section.  Picky
597          * compiler settings don't allow the return value to be
598          * ignored, so make the compiler happy.
599          */
600         if (fgets(line, sizeof(line), stdin) == NULL) {
601                 ;
602         }
603         DEBUG(DEBUG_INFO, ("Parsing reclock done\n"));
604         return true;
605
606 fail:
607         fprintf(stderr, "Parsing reclock failed\n");
608         return false;
609 }
610
611 static struct database_map *dbmap_init(TALLOC_CTX *mem_ctx)
612 {
613         struct database_map *db_map;
614
615         db_map = talloc_zero(mem_ctx, struct database_map);
616         if (db_map == NULL) {
617                 return NULL;
618         }
619
620         return db_map;
621 }
622
623 /* Read a database map from stdin.  Each line looks like:
624  *  <ID> <NAME> [FLAGS] [SEQ_NUM]
625  * EOF or a blank line terminates input.
626  *
627  * By default, flags and seq_num are 0
628  */
629
630 static bool dbmap_parse(struct database_map *db_map)
631 {
632         char line[1024];
633
634         while ((fgets(line, sizeof(line), stdin) != NULL)) {
635                 uint32_t id;
636                 uint8_t flags = 0;
637                 uint32_t seq_num = 0;
638                 char *tok, *t;
639                 char *name;
640                 struct database *db;
641
642                 if (line[0] == '\n') {
643                         break;
644                 }
645
646                 /* Get rid of pesky newline */
647                 if ((t = strchr(line, '\n')) != NULL) {
648                         *t = '\0';
649                 }
650
651                 /* Get ID */
652                 tok = strtok(line, " \t");
653                 if (tok == NULL) {
654                         fprintf(stderr, "bad line (%s) - missing ID\n", line);
655                         continue;
656                 }
657                 id = (uint32_t)strtoul(tok, NULL, 0);
658
659                 /* Get NAME */
660                 tok = strtok(NULL, " \t");
661                 if (tok == NULL) {
662                         fprintf(stderr, "bad line (%s) - missing NAME\n", line);
663                         continue;
664                 }
665                 name = talloc_strdup(db_map, tok);
666                 if (name == NULL) {
667                         goto fail;
668                 }
669
670                 /* Get flags */
671                 tok = strtok(NULL, " \t");
672                 while (tok != NULL) {
673                         if (strcmp(tok, "PERSISTENT") == 0) {
674                                 flags |= CTDB_DB_FLAGS_PERSISTENT;
675                         } else if (strcmp(tok, "STICKY") == 0) {
676                                 flags |= CTDB_DB_FLAGS_STICKY;
677                         } else if (strcmp(tok, "READONLY") == 0) {
678                                 flags |= CTDB_DB_FLAGS_READONLY;
679                         } else if (strcmp(tok, "REPLICATED") == 0) {
680                                 flags |= CTDB_DB_FLAGS_REPLICATED;
681                         } else if (tok[0] >= '0'&& tok[0] <= '9') {
682                                 uint8_t nv = CTDB_DB_FLAGS_PERSISTENT |
683                                              CTDB_DB_FLAGS_REPLICATED;
684
685                                 if ((flags & nv) == 0) {
686                                         fprintf(stderr,
687                                                 "seq_num for volatile db\n");
688                                         goto fail;
689                                 }
690                                 seq_num = (uint64_t)strtoull(tok, NULL, 0);
691                         }
692
693                         tok = strtok(NULL, " \t");
694                 }
695
696                 db_map->db = talloc_realloc(db_map, db_map->db,
697                                             struct database,
698                                             db_map->num_dbs + 1);
699                 if (db_map->db == NULL) {
700                         goto fail;
701                 }
702                 db = &db_map->db[db_map->num_dbs];
703
704                 db->id = id;
705                 db->name = name;
706                 db->flags = flags;
707                 db->seq_num = seq_num;
708
709                 db_map->num_dbs += 1;
710         }
711
712         DEBUG(DEBUG_INFO, ("Parsing dbmap done\n"));
713         return true;
714
715 fail:
716         DEBUG(DEBUG_INFO, ("Parsing dbmap failed\n"));
717         return false;
718
719 }
720
721 static struct database *database_find(struct database_map *map,
722                                       uint32_t db_id)
723 {
724         int i;
725
726         for (i = 0; i < map->num_dbs; i++) {
727                 struct database *db = &map->db[i];
728
729                 if (db->id == db_id) {
730                         return db;
731                 }
732         }
733
734         return NULL;
735 }
736
737 static bool public_ips_parse(struct ctdbd_context *ctdb,
738                              uint32_t numnodes)
739 {
740         if (numnodes == 0) {
741                 D_ERR("Must initialise nodemap before public IPs\n");
742                 return false;
743         }
744
745         ctdb->known_ips = ipalloc_read_known_ips(ctdb, numnodes, false);
746
747         return (ctdb->known_ips != NULL);
748 }
749
750 /* Read information about controls to fail.  Format is:
751  *   <opcode> <pnn> {ERROR|TIMEOUT} <comment>
752  */
753 static bool control_failures_parse(struct ctdbd_context *ctdb)
754 {
755         char line[1024];
756
757         while ((fgets(line, sizeof(line), stdin) != NULL)) {
758                 char *tok, *t;
759                 enum ctdb_controls opcode;
760                 uint32_t pnn;
761                 const char *error;
762                 const char *comment;
763                 struct fake_control_failure *failure = NULL;
764
765                 if (line[0] == '\n') {
766                         break;
767                 }
768
769                 /* Get rid of pesky newline */
770                 if ((t = strchr(line, '\n')) != NULL) {
771                         *t = '\0';
772                 }
773
774                 /* Get opcode */
775                 tok = strtok(line, " \t");
776                 if (tok == NULL) {
777                         D_ERR("bad line (%s) - missing opcode\n", line);
778                         continue;
779                 }
780                 opcode = (enum ctdb_controls)strtoul(tok, NULL, 0);
781
782                 /* Get PNN */
783                 tok = strtok(NULL, " \t");
784                 if (tok == NULL) {
785                         D_ERR("bad line (%s) - missing PNN\n", line);
786                         continue;
787                 }
788                 pnn = (uint32_t)strtoul(tok, NULL, 0);
789
790                 /* Get error */
791                 tok = strtok(NULL, " \t");
792                 if (tok == NULL) {
793                         D_ERR("bad line (%s) - missing errno\n", line);
794                         continue;
795                 }
796                 error = talloc_strdup(ctdb, tok);
797                 if (error == NULL) {
798                         goto fail;
799                 }
800                 if (strcmp(error, "ERROR") != 0 &&
801                     strcmp(error, "TIMEOUT") != 0) {
802                         D_ERR("bad line (%s) "
803                               "- error must be \"ERROR\" or \"TIMEOUT\"\n",
804                               line);
805                         goto fail;
806                 }
807
808                 /* Get comment */
809                 tok = strtok(NULL, "\n"); /* rest of line */
810                 if (tok == NULL) {
811                         D_ERR("bad line (%s) - missing comment\n", line);
812                         continue;
813                 }
814                 comment = talloc_strdup(ctdb, tok);
815                 if (comment == NULL) {
816                         goto fail;
817                 }
818
819                 failure = talloc_zero(ctdb, struct fake_control_failure);
820                 if (failure == NULL) {
821                         goto fail;
822                 }
823
824                 failure->opcode = opcode;
825                 failure->pnn = pnn;
826                 failure->error = error;
827                 failure->comment = comment;
828
829                 DLIST_ADD(ctdb->control_failures, failure);
830         }
831
832         D_INFO("Parsing fake control failures done\n");
833         return true;
834
835 fail:
836         D_INFO("Parsing fake control failures failed\n");
837         return false;
838 }
839
840 /*
841  * Manage clients
842  */
843
844 static int ctdb_client_destructor(struct ctdb_client *client)
845 {
846         DLIST_REMOVE(client->ctdb->client_list, client);
847         return 0;
848 }
849
850 static int client_add(struct ctdbd_context *ctdb, pid_t client_pid,
851                       void *client_state)
852 {
853         struct ctdb_client *client;
854
855         client = talloc_zero(client_state, struct ctdb_client);
856         if (client == NULL) {
857                 return ENOMEM;
858         }
859
860         client->ctdb = ctdb;
861         client->pid = client_pid;
862         client->state = client_state;
863
864         DLIST_ADD(ctdb->client_list, client);
865         talloc_set_destructor(client, ctdb_client_destructor);
866         return 0;
867 }
868
869 static void *client_find(struct ctdbd_context *ctdb, pid_t client_pid)
870 {
871         struct ctdb_client *client;
872
873         for (client=ctdb->client_list; client != NULL; client=client->next) {
874                 if (client->pid == client_pid) {
875                         return client->state;
876                 }
877         }
878
879         return NULL;
880 }
881
882 /*
883  * CTDB context setup
884  */
885
886 static uint32_t new_generation(uint32_t old_generation)
887 {
888         uint32_t generation;
889
890         while (1) {
891                 generation = random();
892                 if (generation != INVALID_GENERATION &&
893                     generation != old_generation) {
894                         break;
895                 }
896         }
897
898         return generation;
899 }
900
901 static struct ctdbd_context *ctdbd_setup(TALLOC_CTX *mem_ctx)
902 {
903         struct ctdbd_context *ctdb;
904         char line[1024];
905         bool status;
906         int ret;
907
908         ctdb = talloc_zero(mem_ctx, struct ctdbd_context);
909         if (ctdb == NULL) {
910                 return NULL;
911         }
912
913         ctdb->node_map = nodemap_init(ctdb);
914         if (ctdb->node_map == NULL) {
915                 goto fail;
916         }
917
918         ctdb->iface_map = interfaces_init(ctdb);
919         if (ctdb->iface_map == NULL) {
920                 goto fail;
921         }
922
923         ctdb->vnn_map = vnnmap_init(ctdb);
924         if (ctdb->vnn_map == NULL) {
925                 goto fail;
926         }
927
928         ctdb->db_map = dbmap_init(ctdb);
929         if (ctdb->db_map == NULL) {
930                 goto fail;
931         }
932
933         ret = srvid_init(ctdb, &ctdb->srv);
934         if (ret != 0) {
935                 goto fail;
936         }
937
938         while (fgets(line, sizeof(line), stdin) != NULL) {
939                 char *t;
940
941                 if ((t = strchr(line, '\n')) != NULL) {
942                         *t = '\0';
943                 }
944
945                 if (strcmp(line, "NODEMAP") == 0) {
946                         status = nodemap_parse(ctdb->node_map);
947                 } else if (strcmp(line, "IFACES") == 0) {
948                         status = interfaces_parse(ctdb->iface_map);
949                 } else if (strcmp(line, "VNNMAP") == 0) {
950                         status = vnnmap_parse(ctdb->vnn_map);
951                 } else if (strcmp(line, "DBMAP") == 0) {
952                         status = dbmap_parse(ctdb->db_map);
953                 } else if (strcmp(line, "PUBLICIPS") == 0) {
954                         status = public_ips_parse(ctdb,
955                                                   ctdb->node_map->num_nodes);
956                 } else if (strcmp(line, "RECLOCK") == 0) {
957                         status = reclock_parse(ctdb);
958                 } else if (strcmp(line, "CONTROLFAILS") == 0) {
959                         status = control_failures_parse(ctdb);
960                 } else {
961                         fprintf(stderr, "Unknown line %s\n", line);
962                         status = false;
963                 }
964
965                 if (! status) {
966                         goto fail;
967                 }
968         }
969
970         ctdb->start_time = tevent_timeval_current();
971         ctdb->recovery_start_time = tevent_timeval_current();
972         ctdb->vnn_map->recmode = CTDB_RECOVERY_NORMAL;
973         if (ctdb->vnn_map->generation == INVALID_GENERATION) {
974                 ctdb->vnn_map->generation =
975                         new_generation(ctdb->vnn_map->generation);
976         }
977         ctdb->recovery_end_time = tevent_timeval_current();
978
979         ctdb->log_level = DEBUG_ERR;
980         ctdb->runstate = CTDB_RUNSTATE_RUNNING;
981
982         ctdb_tunable_set_defaults(&ctdb->tun_list);
983
984         return ctdb;
985
986 fail:
987         TALLOC_FREE(ctdb);
988         return NULL;
989 }
990
991 static bool ctdbd_verify(struct ctdbd_context *ctdb)
992 {
993         struct node *node;
994         int i;
995
996         if (ctdb->node_map->num_nodes == 0) {
997                 return true;
998         }
999
1000         /* Make sure all the nodes are in order */
1001         for (i=0; i<ctdb->node_map->num_nodes; i++) {
1002                 node = &ctdb->node_map->node[i];
1003                 if (node->pnn != i) {
1004                         fprintf(stderr, "Expected node %u, found %u\n",
1005                                 i, node->pnn);
1006                         return false;
1007                 }
1008         }
1009
1010         node = &ctdb->node_map->node[ctdb->node_map->pnn];
1011         if (node->flags & NODE_FLAGS_DISCONNECTED) {
1012                 DEBUG(DEBUG_INFO, ("Node disconnected, exiting\n"));
1013                 exit(0);
1014         }
1015
1016         return true;
1017 }
1018
1019 /*
1020  * Doing a recovery
1021  */
1022
1023 struct recover_state {
1024         struct tevent_context *ev;
1025         struct ctdbd_context *ctdb;
1026 };
1027
1028 static int recover_check(struct tevent_req *req);
1029 static void recover_wait_done(struct tevent_req *subreq);
1030 static void recover_done(struct tevent_req *subreq);
1031
1032 static struct tevent_req *recover_send(TALLOC_CTX *mem_ctx,
1033                                        struct tevent_context *ev,
1034                                        struct ctdbd_context *ctdb)
1035 {
1036         struct tevent_req *req;
1037         struct recover_state *state;
1038         int ret;
1039
1040         req = tevent_req_create(mem_ctx, &state, struct recover_state);
1041         if (req == NULL) {
1042                 return NULL;
1043         }
1044
1045         state->ev = ev;
1046         state->ctdb = ctdb;
1047
1048         ret = recover_check(req);
1049         if (ret != 0) {
1050                 tevent_req_error(req, ret);
1051                 return tevent_req_post(req, ev);
1052         }
1053
1054         return req;
1055 }
1056
1057 static int recover_check(struct tevent_req *req)
1058 {
1059         struct recover_state *state = tevent_req_data(
1060                 req, struct recover_state);
1061         struct ctdbd_context *ctdb = state->ctdb;
1062         struct tevent_req *subreq;
1063         bool recovery_disabled;
1064         int i;
1065
1066         recovery_disabled = false;
1067         for (i=0; i<ctdb->node_map->num_nodes; i++) {
1068                 if (ctdb->node_map->node[i].recovery_disabled) {
1069                         recovery_disabled = true;
1070                         break;
1071                 }
1072         }
1073
1074         subreq = tevent_wakeup_send(state, state->ev,
1075                                     tevent_timeval_current_ofs(1, 0));
1076         if (subreq == NULL) {
1077                 return ENOMEM;
1078         }
1079
1080         if (recovery_disabled) {
1081                 tevent_req_set_callback(subreq, recover_wait_done, req);
1082         } else {
1083                 ctdb->recovery_start_time = tevent_timeval_current();
1084                 tevent_req_set_callback(subreq, recover_done, req);
1085         }
1086
1087         return 0;
1088 }
1089
1090 static void recover_wait_done(struct tevent_req *subreq)
1091 {
1092         struct tevent_req *req = tevent_req_callback_data(
1093                 subreq, struct tevent_req);
1094         int ret;
1095         bool status;
1096
1097         status = tevent_wakeup_recv(subreq);
1098         TALLOC_FREE(subreq);
1099         if (! status) {
1100                 tevent_req_error(req, EIO);
1101                 return;
1102         }
1103
1104         ret = recover_check(req);
1105         if (ret != 0) {
1106                 tevent_req_error(req, ret);
1107         }
1108 }
1109
1110 static void recover_done(struct tevent_req *subreq)
1111 {
1112         struct tevent_req *req = tevent_req_callback_data(
1113                 subreq, struct tevent_req);
1114         struct recover_state *state = tevent_req_data(
1115                 req, struct recover_state);
1116         struct ctdbd_context *ctdb = state->ctdb;
1117         bool status;
1118
1119         status = tevent_wakeup_recv(subreq);
1120         TALLOC_FREE(subreq);
1121         if (! status) {
1122                 tevent_req_error(req, EIO);
1123                 return;
1124         }
1125
1126         ctdb->vnn_map->recmode = CTDB_RECOVERY_NORMAL;
1127         ctdb->recovery_end_time = tevent_timeval_current();
1128         ctdb->vnn_map->generation = new_generation(ctdb->vnn_map->generation);
1129
1130         tevent_req_done(req);
1131 }
1132
1133 static bool recover_recv(struct tevent_req *req, int *perr)
1134 {
1135         int err;
1136
1137         if (tevent_req_is_unix_error(req, &err)) {
1138                 if (perr != NULL) {
1139                         *perr = err;
1140                 }
1141                 return false;
1142         }
1143
1144         return true;
1145 }
1146
1147 /*
1148  * Routines for ctdb_req_header
1149  */
1150
1151 static void header_fix_pnn(struct ctdb_req_header *header,
1152                            struct ctdbd_context *ctdb)
1153 {
1154         if (header->srcnode == CTDB_CURRENT_NODE) {
1155                 header->srcnode = ctdb->node_map->pnn;
1156         }
1157
1158         if (header->destnode == CTDB_CURRENT_NODE) {
1159                 header->destnode = ctdb->node_map->pnn;
1160         }
1161 }
1162
1163 static struct ctdb_req_header header_reply_control(
1164                                         struct ctdb_req_header *header,
1165                                         struct ctdbd_context *ctdb)
1166 {
1167         struct ctdb_req_header reply_header;
1168
1169         reply_header = (struct ctdb_req_header) {
1170                 .ctdb_magic = CTDB_MAGIC,
1171                 .ctdb_version = CTDB_PROTOCOL,
1172                 .generation = ctdb->vnn_map->generation,
1173                 .operation = CTDB_REPLY_CONTROL,
1174                 .destnode = header->srcnode,
1175                 .srcnode = header->destnode,
1176                 .reqid = header->reqid,
1177         };
1178
1179         return reply_header;
1180 }
1181
1182 static struct ctdb_req_header header_reply_message(
1183                                         struct ctdb_req_header *header,
1184                                         struct ctdbd_context *ctdb)
1185 {
1186         struct ctdb_req_header reply_header;
1187
1188         reply_header = (struct ctdb_req_header) {
1189                 .ctdb_magic = CTDB_MAGIC,
1190                 .ctdb_version = CTDB_PROTOCOL,
1191                 .generation = ctdb->vnn_map->generation,
1192                 .operation = CTDB_REQ_MESSAGE,
1193                 .destnode = header->srcnode,
1194                 .srcnode = header->destnode,
1195                 .reqid = 0,
1196         };
1197
1198         return reply_header;
1199 }
1200
1201 /*
1202  * Client state
1203  */
1204
1205 struct client_state {
1206         struct tevent_context *ev;
1207         int fd;
1208         struct ctdbd_context *ctdb;
1209         int pnn;
1210         pid_t pid;
1211         struct comm_context *comm;
1212         struct srvid_register_state *rstate;
1213         int status;
1214 };
1215
1216 /*
1217  * Send replies to controls and messages
1218  */
1219
1220 static void client_reply_done(struct tevent_req *subreq);
1221
1222 static void client_send_message(struct tevent_req *req,
1223                                 struct ctdb_req_header *header,
1224                                 struct ctdb_req_message_data *message)
1225 {
1226         struct client_state *state = tevent_req_data(
1227                 req, struct client_state);
1228         struct ctdbd_context *ctdb = state->ctdb;
1229         struct tevent_req *subreq;
1230         struct ctdb_req_header reply_header;
1231         uint8_t *buf;
1232         size_t datalen, buflen;
1233         int ret;
1234
1235         reply_header = header_reply_message(header, ctdb);
1236
1237         datalen = ctdb_req_message_data_len(&reply_header, message);
1238         ret = ctdb_allocate_pkt(state, datalen, &buf, &buflen);
1239         if (ret != 0) {
1240                 tevent_req_error(req, ret);
1241                 return;
1242         }
1243
1244         ret = ctdb_req_message_data_push(&reply_header, message,
1245                                          buf, &buflen);
1246         if (ret != 0) {
1247                 tevent_req_error(req, ret);
1248                 return;
1249         }
1250
1251         DEBUG(DEBUG_INFO, ("message srvid = 0x%"PRIx64"\n", message->srvid));
1252
1253         subreq = comm_write_send(state, state->ev, state->comm, buf, buflen);
1254         if (tevent_req_nomem(subreq, req)) {
1255                 return;
1256         }
1257         tevent_req_set_callback(subreq, client_reply_done, req);
1258
1259         talloc_steal(subreq, buf);
1260 }
1261
1262 static void client_send_control(struct tevent_req *req,
1263                                 struct ctdb_req_header *header,
1264                                 struct ctdb_reply_control *reply)
1265 {
1266         struct client_state *state = tevent_req_data(
1267                 req, struct client_state);
1268         struct ctdbd_context *ctdb = state->ctdb;
1269         struct tevent_req *subreq;
1270         struct ctdb_req_header reply_header;
1271         uint8_t *buf;
1272         size_t datalen, buflen;
1273         int ret;
1274
1275         reply_header = header_reply_control(header, ctdb);
1276
1277         datalen = ctdb_reply_control_len(&reply_header, reply);
1278         ret = ctdb_allocate_pkt(state, datalen, &buf, &buflen);
1279         if (ret != 0) {
1280                 tevent_req_error(req, ret);
1281                 return;
1282         }
1283
1284         ret = ctdb_reply_control_push(&reply_header, reply, buf, &buflen);
1285         if (ret != 0) {
1286                 tevent_req_error(req, ret);
1287                 return;
1288         }
1289
1290         DEBUG(DEBUG_INFO, ("reply opcode = %u\n", reply->rdata.opcode));
1291
1292         subreq = comm_write_send(state, state->ev, state->comm, buf, buflen);
1293         if (tevent_req_nomem(subreq, req)) {
1294                 return;
1295         }
1296         tevent_req_set_callback(subreq, client_reply_done, req);
1297
1298         talloc_steal(subreq, buf);
1299 }
1300
1301 static void client_reply_done(struct tevent_req *subreq)
1302 {
1303         struct tevent_req *req = tevent_req_callback_data(
1304                 subreq, struct tevent_req);
1305         int ret;
1306         bool status;
1307
1308         status = comm_write_recv(subreq, &ret);
1309         TALLOC_FREE(subreq);
1310         if (! status) {
1311                 tevent_req_error(req, ret);
1312         }
1313 }
1314
1315 /*
1316  * Handling protocol - controls
1317  */
1318
1319 static void control_process_exists(TALLOC_CTX *mem_ctx,
1320                                    struct tevent_req *req,
1321                                    struct ctdb_req_header *header,
1322                                    struct ctdb_req_control *request)
1323 {
1324         struct client_state *state = tevent_req_data(
1325                 req, struct client_state);
1326         struct ctdbd_context *ctdb = state->ctdb;
1327         struct client_state *cstate;
1328         struct ctdb_reply_control reply;
1329
1330         reply.rdata.opcode = request->opcode;
1331
1332         cstate = client_find(ctdb, request->rdata.data.pid);
1333         if (cstate == NULL) {
1334                 reply.status = -1;
1335                 reply.errmsg = "No client for PID";
1336         } else {
1337                 reply.status = kill(request->rdata.data.pid, 0);
1338                 reply.errmsg = NULL;
1339         }
1340
1341         client_send_control(req, header, &reply);
1342 }
1343
1344 static void control_ping(TALLOC_CTX *mem_ctx,
1345                          struct tevent_req *req,
1346                          struct ctdb_req_header *header,
1347                          struct ctdb_req_control *request)
1348 {
1349         struct client_state *state = tevent_req_data(
1350                 req, struct client_state);
1351         struct ctdbd_context *ctdb = state->ctdb;
1352         struct ctdb_reply_control reply;
1353
1354         reply.rdata.opcode = request->opcode;
1355         reply.status = ctdb->num_clients;
1356         reply.errmsg = NULL;
1357
1358         client_send_control(req, header, &reply);
1359 }
1360
1361 static void control_getdbpath(TALLOC_CTX *mem_ctx,
1362                               struct tevent_req *req,
1363                               struct ctdb_req_header *header,
1364                               struct ctdb_req_control *request)
1365 {
1366         struct client_state *state = tevent_req_data(
1367                 req, struct client_state);
1368         struct ctdbd_context *ctdb = state->ctdb;
1369         struct ctdb_reply_control reply;
1370         struct database *db;
1371
1372         reply.rdata.opcode = request->opcode;
1373
1374         db = database_find(ctdb->db_map, request->rdata.data.db_id);
1375         if (db == NULL) {
1376                 reply.status = ENOENT;
1377                 reply.errmsg = "Database not found";
1378         } else {
1379                 const char *base;
1380                 if (db->flags & CTDB_DB_FLAGS_PERSISTENT) {
1381                         base = "/var/lib/ctdb/persistent";
1382                 } else {
1383                         base = "/var/run/ctdb/DB_DIR";
1384                 }
1385                 reply.rdata.data.db_path =
1386                         talloc_asprintf(mem_ctx, "%s/%s.%u",
1387                                         base, db->name, header->destnode);
1388                 if (reply.rdata.data.db_path == NULL) {
1389                         reply.status = ENOMEM;
1390                         reply.errmsg = "Memory error";
1391                 } else {
1392                         reply.status = 0;
1393                         reply.errmsg = NULL;
1394                 }
1395         }
1396
1397         client_send_control(req, header, &reply);
1398 }
1399
1400 static void control_getvnnmap(TALLOC_CTX *mem_ctx,
1401                               struct tevent_req *req,
1402                               struct ctdb_req_header *header,
1403                               struct ctdb_req_control *request)
1404 {
1405         struct client_state *state = tevent_req_data(
1406                 req, struct client_state);
1407         struct ctdbd_context *ctdb = state->ctdb;
1408         struct ctdb_reply_control reply;
1409         struct ctdb_vnn_map *vnnmap;
1410
1411         reply.rdata.opcode = request->opcode;
1412
1413         vnnmap = talloc_zero(mem_ctx, struct ctdb_vnn_map);
1414         if (vnnmap == NULL) {
1415                 reply.status = ENOMEM;
1416                 reply.errmsg = "Memory error";
1417         } else {
1418                 vnnmap->generation = ctdb->vnn_map->generation;
1419                 vnnmap->size = ctdb->vnn_map->size;
1420                 vnnmap->map = ctdb->vnn_map->map;
1421
1422                 reply.rdata.data.vnnmap = vnnmap;
1423                 reply.status = 0;
1424                 reply.errmsg = NULL;
1425         }
1426
1427         client_send_control(req, header, &reply);
1428 }
1429
1430 static void control_get_debug(TALLOC_CTX *mem_ctx,
1431                               struct tevent_req *req,
1432                               struct ctdb_req_header *header,
1433                               struct ctdb_req_control *request)
1434 {
1435         struct client_state *state = tevent_req_data(
1436                 req, struct client_state);
1437         struct ctdbd_context *ctdb = state->ctdb;
1438         struct ctdb_reply_control reply;
1439
1440         reply.rdata.opcode = request->opcode;
1441         reply.rdata.data.loglevel = (uint32_t)ctdb->log_level;
1442         reply.status = 0;
1443         reply.errmsg = NULL;
1444
1445         client_send_control(req, header, &reply);
1446 }
1447
1448 static void control_set_debug(TALLOC_CTX *mem_ctx,
1449                               struct tevent_req *req,
1450                               struct ctdb_req_header *header,
1451                               struct ctdb_req_control *request)
1452 {
1453         struct client_state *state = tevent_req_data(
1454                 req, struct client_state);
1455         struct ctdbd_context *ctdb = state->ctdb;
1456         struct ctdb_reply_control reply;
1457
1458         ctdb->log_level = (int)request->rdata.data.loglevel;
1459
1460         reply.rdata.opcode = request->opcode;
1461         reply.status = 0;
1462         reply.errmsg = NULL;
1463
1464         client_send_control(req, header, &reply);
1465 }
1466
1467 static void control_get_dbmap(TALLOC_CTX *mem_ctx,
1468                               struct tevent_req *req,
1469                                struct ctdb_req_header *header,
1470                               struct ctdb_req_control *request)
1471 {
1472         struct client_state *state = tevent_req_data(
1473                 req, struct client_state);
1474         struct ctdbd_context *ctdb = state->ctdb;
1475         struct ctdb_reply_control reply;
1476         struct ctdb_dbid_map *dbmap;
1477         int i;
1478
1479         reply.rdata.opcode = request->opcode;
1480
1481         dbmap = talloc_zero(mem_ctx, struct ctdb_dbid_map);
1482         if (dbmap == NULL) {
1483                 goto fail;
1484         }
1485
1486         dbmap->num = ctdb->db_map->num_dbs;
1487         dbmap->dbs = talloc_zero_array(dbmap, struct ctdb_dbid, dbmap->num);
1488         if (dbmap->dbs == NULL) {
1489                 goto fail;
1490         }
1491
1492         for (i = 0; i < dbmap->num; i++) {
1493                 struct database *db = &ctdb->db_map->db[i];
1494                 dbmap->dbs[i] = (struct ctdb_dbid) {
1495                         .db_id = db->id,
1496                         .flags = db->flags,
1497                 };
1498         }
1499
1500         reply.rdata.data.dbmap = dbmap;
1501         reply.status = 0;
1502         reply.errmsg = NULL;
1503         client_send_control(req, header, &reply);
1504         return;
1505
1506 fail:
1507         reply.status = -1;
1508         reply.errmsg = "Memory error";
1509         client_send_control(req, header, &reply);
1510 }
1511
1512 static void control_get_recmode(TALLOC_CTX *mem_ctx,
1513                                 struct tevent_req *req,
1514                                 struct ctdb_req_header *header,
1515                                 struct ctdb_req_control *request)
1516 {
1517         struct client_state *state = tevent_req_data(
1518                 req, struct client_state);
1519         struct ctdbd_context *ctdb = state->ctdb;
1520         struct ctdb_reply_control reply;
1521
1522         reply.rdata.opcode = request->opcode;
1523         reply.status = ctdb->vnn_map->recmode;
1524         reply.errmsg = NULL;
1525
1526         client_send_control(req, header, &reply);
1527 }
1528
1529 struct set_recmode_state {
1530         struct tevent_req *req;
1531         struct ctdbd_context *ctdb;
1532         struct ctdb_req_header header;
1533         struct ctdb_reply_control reply;
1534 };
1535
1536 static void set_recmode_callback(struct tevent_req *subreq)
1537 {
1538         struct set_recmode_state *substate = tevent_req_callback_data(
1539                 subreq, struct set_recmode_state);
1540         bool status;
1541         int ret;
1542
1543         status = recover_recv(subreq, &ret);
1544         TALLOC_FREE(subreq);
1545         if (! status) {
1546                 substate->reply.status = ret;
1547                 substate->reply.errmsg = "recovery failed";
1548         } else {
1549                 substate->reply.status = 0;
1550                 substate->reply.errmsg = NULL;
1551         }
1552
1553         client_send_control(substate->req, &substate->header, &substate->reply);
1554         talloc_free(substate);
1555 }
1556
1557 static void control_set_recmode(TALLOC_CTX *mem_ctx,
1558                                 struct tevent_req *req,
1559                                 struct ctdb_req_header *header,
1560                                 struct ctdb_req_control *request)
1561 {
1562         struct client_state *state = tevent_req_data(
1563                 req, struct client_state);
1564         struct tevent_req *subreq;
1565         struct ctdbd_context *ctdb = state->ctdb;
1566         struct set_recmode_state *substate;
1567         struct ctdb_reply_control reply;
1568
1569         reply.rdata.opcode = request->opcode;
1570
1571         if (request->rdata.data.recmode == CTDB_RECOVERY_NORMAL) {
1572                 reply.status = -1;
1573                 reply.errmsg = "Client cannot set recmode to NORMAL";
1574                 goto fail;
1575         }
1576
1577         substate = talloc_zero(ctdb, struct set_recmode_state);
1578         if (substate == NULL) {
1579                 reply.status = -1;
1580                 reply.errmsg = "Memory error";
1581                 goto fail;
1582         }
1583
1584         substate->req = req;
1585         substate->ctdb = ctdb;
1586         substate->header = *header;
1587         substate->reply.rdata.opcode = request->opcode;
1588
1589         subreq = recover_send(substate, state->ev, state->ctdb);
1590         if (subreq == NULL) {
1591                 talloc_free(substate);
1592                 goto fail;
1593         }
1594         tevent_req_set_callback(subreq, set_recmode_callback, substate);
1595
1596         ctdb->vnn_map->recmode = CTDB_RECOVERY_ACTIVE;
1597         return;
1598
1599 fail:
1600         client_send_control(req, header, &reply);
1601
1602 }
1603
1604 static void srvid_handler(uint64_t srvid, TDB_DATA data, void *private_data)
1605 {
1606         printf("Received a message for SRVID 0x%"PRIx64"\n", srvid);
1607 }
1608
1609 static void control_register_srvid(TALLOC_CTX *mem_ctx,
1610                                    struct tevent_req *req,
1611                                    struct ctdb_req_header *header,
1612                                    struct ctdb_req_control *request)
1613 {
1614         struct client_state *state = tevent_req_data(
1615                 req, struct client_state);
1616         struct ctdbd_context *ctdb = state->ctdb;
1617         struct ctdb_reply_control reply;
1618         int ret;
1619
1620         reply.rdata.opcode = request->opcode;
1621
1622         ret = srvid_register(ctdb->srv, state, request->srvid,
1623                              srvid_handler, state);
1624         if (ret != 0) {
1625                 reply.status = -1;
1626                 reply.errmsg = "Memory error";
1627                 goto fail;
1628         }
1629
1630         DEBUG(DEBUG_INFO, ("Register srvid 0x%"PRIx64"\n", request->srvid));
1631
1632         reply.status = 0;
1633         reply.errmsg = NULL;
1634
1635 fail:
1636         client_send_control(req, header, &reply);
1637 }
1638
1639 static void control_deregister_srvid(TALLOC_CTX *mem_ctx,
1640                                      struct tevent_req *req,
1641                                      struct ctdb_req_header *header,
1642                                      struct ctdb_req_control *request)
1643 {
1644         struct client_state *state = tevent_req_data(
1645                 req, struct client_state);
1646         struct ctdbd_context *ctdb = state->ctdb;
1647         struct ctdb_reply_control reply;
1648         int ret;
1649
1650         reply.rdata.opcode = request->opcode;
1651
1652         ret = srvid_deregister(ctdb->srv, request->srvid, state);
1653         if (ret != 0) {
1654                 reply.status = -1;
1655                 reply.errmsg = "srvid not registered";
1656                 goto fail;
1657         }
1658
1659         DEBUG(DEBUG_INFO, ("Deregister srvid 0x%"PRIx64"\n", request->srvid));
1660
1661         reply.status = 0;
1662         reply.errmsg = NULL;
1663
1664 fail:
1665         client_send_control(req, header, &reply);
1666 }
1667
1668 static void control_get_dbname(TALLOC_CTX *mem_ctx,
1669                                struct tevent_req *req,
1670                                struct ctdb_req_header *header,
1671                                struct ctdb_req_control *request)
1672 {
1673         struct client_state *state = tevent_req_data(
1674                 req, struct client_state);
1675         struct ctdbd_context *ctdb = state->ctdb;
1676         struct ctdb_reply_control reply;
1677         struct database *db;
1678
1679         reply.rdata.opcode = request->opcode;
1680
1681         db = database_find(ctdb->db_map, request->rdata.data.db_id);
1682         if (db == NULL) {
1683                 reply.status = ENOENT;
1684                 reply.errmsg = "Database not found";
1685         } else {
1686                 reply.rdata.data.db_name = talloc_strdup(mem_ctx, db->name);
1687                 if (reply.rdata.data.db_name == NULL) {
1688                         reply.status = ENOMEM;
1689                         reply.errmsg = "Memory error";
1690                 } else {
1691                         reply.status = 0;
1692                         reply.errmsg = NULL;
1693                 }
1694         }
1695
1696         client_send_control(req, header, &reply);
1697 }
1698
1699 static void control_get_pid(TALLOC_CTX *mem_ctx,
1700                             struct tevent_req *req,
1701                             struct ctdb_req_header *header,
1702                             struct ctdb_req_control *request)
1703 {
1704         struct ctdb_reply_control reply;
1705
1706         reply.rdata.opcode = request->opcode;
1707         reply.status = getpid();
1708         reply.errmsg = NULL;
1709
1710         client_send_control(req, header, &reply);
1711 }
1712
1713 static void control_get_recmaster(TALLOC_CTX *mem_ctx,
1714                                   struct tevent_req *req,
1715                                   struct ctdb_req_header *header,
1716                                   struct ctdb_req_control *request)
1717 {
1718         struct client_state *state = tevent_req_data(
1719                 req, struct client_state);
1720         struct ctdbd_context *ctdb = state->ctdb;
1721         struct ctdb_reply_control reply;
1722
1723         reply.rdata.opcode = request->opcode;
1724         reply.status = ctdb->node_map->recmaster;
1725         reply.errmsg = NULL;
1726
1727         client_send_control(req, header, &reply);
1728 }
1729
1730 static void control_get_pnn(TALLOC_CTX *mem_ctx,
1731                             struct tevent_req *req,
1732                             struct ctdb_req_header *header,
1733                             struct ctdb_req_control *request)
1734 {
1735         struct ctdb_reply_control reply;
1736
1737         reply.rdata.opcode = request->opcode;
1738         reply.status = header->destnode;
1739         reply.errmsg = NULL;
1740
1741         client_send_control(req, header, &reply);
1742 }
1743
1744 static void control_shutdown(TALLOC_CTX *mem_ctx,
1745                              struct tevent_req *req,
1746                              struct ctdb_req_header *hdr,
1747                              struct ctdb_req_control *request)
1748 {
1749         struct client_state *state = tevent_req_data(
1750                 req, struct client_state);
1751
1752         state->status = 99;
1753 }
1754
1755 static void control_set_tunable(TALLOC_CTX *mem_ctx,
1756                                 struct tevent_req *req,
1757                                 struct ctdb_req_header *header,
1758                                 struct ctdb_req_control *request)
1759 {
1760         struct client_state *state = tevent_req_data(
1761                 req, struct client_state);
1762         struct ctdbd_context *ctdb = state->ctdb;
1763         struct ctdb_reply_control reply;
1764         bool ret, obsolete;
1765
1766         reply.rdata.opcode = request->opcode;
1767         reply.errmsg = NULL;
1768
1769         ret = ctdb_tunable_set_value(&ctdb->tun_list,
1770                                      request->rdata.data.tunable->name,
1771                                      request->rdata.data.tunable->value,
1772                                      &obsolete);
1773         if (! ret) {
1774                 reply.status = -1;
1775         } else if (obsolete) {
1776                 reply.status = 1;
1777         } else {
1778                 reply.status = 0;
1779         }
1780
1781         client_send_control(req, header, &reply);
1782 }
1783
1784 static void control_get_tunable(TALLOC_CTX *mem_ctx,
1785                                 struct tevent_req *req,
1786                                 struct ctdb_req_header *header,
1787                                 struct ctdb_req_control *request)
1788 {
1789         struct client_state *state = tevent_req_data(
1790                 req, struct client_state);
1791         struct ctdbd_context *ctdb = state->ctdb;
1792         struct ctdb_reply_control reply;
1793         uint32_t value;
1794         bool ret;
1795
1796         reply.rdata.opcode = request->opcode;
1797         reply.errmsg = NULL;
1798
1799         ret = ctdb_tunable_get_value(&ctdb->tun_list,
1800                                      request->rdata.data.tun_var, &value);
1801         if (! ret) {
1802                 reply.status = -1;
1803         } else {
1804                 reply.rdata.data.tun_value = value;
1805                 reply.status = 0;
1806         }
1807
1808         client_send_control(req, header, &reply);
1809 }
1810
1811 static void control_list_tunables(TALLOC_CTX *mem_ctx,
1812                                   struct tevent_req *req,
1813                                   struct ctdb_req_header *header,
1814                                   struct ctdb_req_control *request)
1815 {
1816         struct ctdb_reply_control reply;
1817         struct ctdb_var_list *var_list;
1818
1819         reply.rdata.opcode = request->opcode;
1820         reply.errmsg = NULL;
1821
1822         var_list = ctdb_tunable_names(mem_ctx);
1823         if (var_list == NULL) {
1824                 reply.status = -1;
1825         } else {
1826                 reply.rdata.data.tun_var_list = var_list;
1827                 reply.status = 0;
1828         }
1829
1830         client_send_control(req, header, &reply);
1831 }
1832
1833 static void control_modify_flags(TALLOC_CTX *mem_ctx,
1834                                  struct tevent_req *req,
1835                                  struct ctdb_req_header *header,
1836                                  struct ctdb_req_control *request)
1837 {
1838         struct client_state *state = tevent_req_data(
1839                 req, struct client_state);
1840         struct ctdbd_context *ctdb = state->ctdb;
1841         struct ctdb_node_flag_change *change = request->rdata.data.flag_change;
1842         struct ctdb_reply_control reply;
1843         struct node *node;
1844
1845         reply.rdata.opcode = request->opcode;
1846
1847         if ((change->old_flags & ~NODE_FLAGS_PERMANENTLY_DISABLED) ||
1848             (change->new_flags & ~NODE_FLAGS_PERMANENTLY_DISABLED) != 0) {
1849                 DEBUG(DEBUG_INFO,
1850                       ("MODIFY_FLAGS control not for PERMANENTLY_DISABLED\n"));
1851                 reply.status = EINVAL;
1852                 reply.errmsg = "Failed to MODIFY_FLAGS";
1853                 client_send_control(req, header, &reply);
1854                 return;
1855         }
1856
1857         /* There's all sorts of broadcast weirdness here.  Only change
1858          * the specified node, not the destination node of the
1859          * control. */
1860         node = &ctdb->node_map->node[change->pnn];
1861
1862         if ((node->flags &
1863              change->old_flags & NODE_FLAGS_PERMANENTLY_DISABLED) == 0 &&
1864             (change->new_flags & NODE_FLAGS_PERMANENTLY_DISABLED) != 0) {
1865                 DEBUG(DEBUG_INFO,("Disabling node %d\n", header->destnode));
1866                 node->flags |= NODE_FLAGS_PERMANENTLY_DISABLED;
1867                 goto done;
1868         }
1869
1870         if ((node->flags &
1871              change->old_flags & NODE_FLAGS_PERMANENTLY_DISABLED) != 0 &&
1872             (change->new_flags & NODE_FLAGS_PERMANENTLY_DISABLED) == 0) {
1873                 DEBUG(DEBUG_INFO,("Enabling node %d\n", header->destnode));
1874                 node->flags &= ~NODE_FLAGS_PERMANENTLY_DISABLED;
1875                 goto done;
1876         }
1877
1878         DEBUG(DEBUG_INFO, ("Flags unchanged for node %d\n", header->destnode));
1879
1880 done:
1881         reply.status = 0;
1882         reply.errmsg = NULL;
1883         client_send_control(req, header, &reply);
1884 }
1885
1886 static void control_get_all_tunables(TALLOC_CTX *mem_ctx,
1887                                      struct tevent_req *req,
1888                                      struct ctdb_req_header *header,
1889                                      struct ctdb_req_control *request)
1890 {
1891         struct client_state *state = tevent_req_data(
1892                 req, struct client_state);
1893         struct ctdbd_context *ctdb = state->ctdb;
1894         struct ctdb_reply_control reply;
1895
1896         reply.rdata.opcode = request->opcode;
1897         reply.rdata.data.tun_list = &ctdb->tun_list;
1898         reply.status = 0;
1899         reply.errmsg = NULL;
1900
1901         client_send_control(req, header, &reply);
1902 }
1903
1904 static void control_uptime(TALLOC_CTX *mem_ctx,
1905                            struct tevent_req *req,
1906                            struct ctdb_req_header *header,
1907                            struct ctdb_req_control *request)
1908 {
1909         struct client_state *state = tevent_req_data(
1910                 req, struct client_state);
1911         struct ctdbd_context *ctdb = state->ctdb;
1912         struct ctdb_reply_control reply;
1913         struct ctdb_uptime *uptime;;
1914
1915         reply.rdata.opcode = request->opcode;
1916
1917         uptime = talloc_zero(mem_ctx, struct ctdb_uptime);
1918         if (uptime == NULL) {
1919                 goto fail;
1920         }
1921
1922         uptime->current_time = tevent_timeval_current();
1923         uptime->ctdbd_start_time = ctdb->start_time;
1924         uptime->last_recovery_started = ctdb->recovery_start_time;
1925         uptime->last_recovery_finished = ctdb->recovery_end_time;
1926
1927         reply.rdata.data.uptime = uptime;
1928         reply.status = 0;
1929         reply.errmsg = NULL;
1930         client_send_control(req, header, &reply);
1931         return;
1932
1933 fail:
1934         reply.status = -1;
1935         reply.errmsg = "Memory error";
1936         client_send_control(req, header, &reply);
1937 }
1938
1939 static void control_reload_nodes_file(TALLOC_CTX *mem_ctx,
1940                                       struct tevent_req *req,
1941                                       struct ctdb_req_header *header,
1942                                       struct ctdb_req_control *request)
1943 {
1944         struct client_state *state = tevent_req_data(
1945                 req, struct client_state);
1946         struct ctdbd_context *ctdb = state->ctdb;
1947         struct ctdb_reply_control reply;
1948         struct ctdb_node_map *nodemap;
1949         struct node_map *node_map = ctdb->node_map;
1950         int i;
1951
1952         reply.rdata.opcode = request->opcode;
1953
1954         nodemap = read_nodes_file(mem_ctx, header->destnode);
1955         if (nodemap == NULL) {
1956                 goto fail;
1957         }
1958
1959         for (i=0; i<nodemap->num; i++) {
1960                 struct node *node;
1961
1962                 if (i < node_map->num_nodes &&
1963                     ctdb_sock_addr_same(&nodemap->node[i].addr,
1964                                         &node_map->node[i].addr)) {
1965                         continue;
1966                 }
1967
1968                 if (nodemap->node[i].flags & NODE_FLAGS_DELETED) {
1969                         int ret;
1970
1971                         node = &node_map->node[i];
1972
1973                         node->flags |= NODE_FLAGS_DELETED;
1974                         ret = ctdb_sock_addr_from_string("0.0.0.0", &node->addr,
1975                                                          false);
1976                         if (ret != 0) {
1977                                 /* Can't happen, but Coverity... */
1978                                 goto fail;
1979                         }
1980
1981                         continue;
1982                 }
1983
1984                 if (i < node_map->num_nodes &&
1985                     node_map->node[i].flags & NODE_FLAGS_DELETED) {
1986                         node = &node_map->node[i];
1987
1988                         node->flags &= ~NODE_FLAGS_DELETED;
1989                         node->addr = nodemap->node[i].addr;
1990
1991                         continue;
1992                 }
1993
1994                 node_map->node = talloc_realloc(node_map, node_map->node,
1995                                                 struct node,
1996                                                 node_map->num_nodes+1);
1997                 if (node_map->node == NULL) {
1998                         goto fail;
1999                 }
2000                 node = &node_map->node[node_map->num_nodes];
2001
2002                 node->addr = nodemap->node[i].addr;
2003                 node->pnn = nodemap->node[i].pnn;
2004                 node->flags = 0;
2005                 node->capabilities = CTDB_CAP_DEFAULT;
2006                 node->recovery_disabled = false;
2007                 node->recovery_substate = NULL;
2008
2009                 node_map->num_nodes += 1;
2010         }
2011
2012         talloc_free(nodemap);
2013
2014         reply.status = 0;
2015         reply.errmsg = NULL;
2016         client_send_control(req, header, &reply);
2017         return;
2018
2019 fail:
2020         reply.status = -1;
2021         reply.errmsg = "Memory error";
2022         client_send_control(req, header, &reply);
2023 }
2024
2025 static void control_get_capabilities(TALLOC_CTX *mem_ctx,
2026                                      struct tevent_req *req,
2027                                      struct ctdb_req_header *header,
2028                                      struct ctdb_req_control *request)
2029 {
2030         struct client_state *state = tevent_req_data(
2031                 req, struct client_state);
2032         struct ctdbd_context *ctdb = state->ctdb;
2033         struct ctdb_reply_control reply;
2034         struct node *node;
2035         uint32_t caps = 0;
2036
2037         reply.rdata.opcode = request->opcode;
2038
2039         node = &ctdb->node_map->node[header->destnode];
2040         caps = node->capabilities;
2041
2042         if (node->flags & NODE_FLAGS_FAKE_TIMEOUT) {
2043                 /* Don't send reply */
2044                 return;
2045         }
2046
2047         reply.rdata.data.caps = caps;
2048         reply.status = 0;
2049         reply.errmsg = NULL;
2050
2051         client_send_control(req, header, &reply);
2052 }
2053
2054 static void control_release_ip(TALLOC_CTX *mem_ctx,
2055                                struct tevent_req *req,
2056                                struct ctdb_req_header *header,
2057                                struct ctdb_req_control *request)
2058 {
2059         struct client_state *state = tevent_req_data(
2060                 req, struct client_state);
2061         struct ctdbd_context *ctdb = state->ctdb;
2062         struct ctdb_public_ip *ip = request->rdata.data.pubip;
2063         struct ctdb_reply_control reply;
2064         struct ctdb_public_ip_list *ips = NULL;
2065         struct ctdb_public_ip *t = NULL;
2066         int i;
2067
2068         reply.rdata.opcode = request->opcode;
2069
2070         if (ctdb->known_ips == NULL) {
2071                 D_INFO("RELEASE_IP %s - not a public IP\n",
2072                        ctdb_sock_addr_to_string(mem_ctx, &ip->addr, false));
2073                 goto done;
2074         }
2075
2076         ips = &ctdb->known_ips[header->destnode];
2077
2078         t = NULL;
2079         for (i = 0; i < ips->num; i++) {
2080                 if (ctdb_sock_addr_same_ip(&ips->ip[i].addr, &ip->addr)) {
2081                         t = &ips->ip[i];
2082                         break;
2083                 }
2084         }
2085         if (t == NULL) {
2086                 D_INFO("RELEASE_IP %s - not a public IP\n",
2087                        ctdb_sock_addr_to_string(mem_ctx, &ip->addr, false));
2088                 goto done;
2089         }
2090
2091         if (t->pnn != header->destnode) {
2092                 if (header->destnode == ip->pnn) {
2093                         D_ERR("error: RELEASE_IP %s - to TAKE_IP node %d\n",
2094                               ctdb_sock_addr_to_string(mem_ctx,
2095                                                        &ip->addr, false),
2096                               ip->pnn);
2097                         reply.status = -1;
2098                         reply.errmsg = "RELEASE_IP to TAKE_IP node";
2099                         client_send_control(req, header, &reply);
2100                         return;
2101                 }
2102
2103                 D_INFO("RELEASE_IP %s - to node %d - redundant\n",
2104                        ctdb_sock_addr_to_string(mem_ctx, &ip->addr, false),
2105                        ip->pnn);
2106                 t->pnn = ip->pnn;
2107         } else {
2108                 D_NOTICE("RELEASE_IP %s - to node %d\n",
2109                          ctdb_sock_addr_to_string(mem_ctx, &ip->addr, false),
2110                           ip->pnn);
2111                 t->pnn = ip->pnn;
2112         }
2113
2114 done:
2115         reply.status = 0;
2116         reply.errmsg = NULL;
2117         client_send_control(req, header, &reply);
2118 }
2119
2120 static void control_takeover_ip(TALLOC_CTX *mem_ctx,
2121                                 struct tevent_req *req,
2122                                 struct ctdb_req_header *header,
2123                                 struct ctdb_req_control *request)
2124 {
2125         struct client_state *state = tevent_req_data(
2126                 req, struct client_state);
2127         struct ctdbd_context *ctdb = state->ctdb;
2128         struct ctdb_public_ip *ip = request->rdata.data.pubip;
2129         struct ctdb_reply_control reply;
2130         struct ctdb_public_ip_list *ips = NULL;
2131         struct ctdb_public_ip *t = NULL;
2132         int i;
2133
2134         reply.rdata.opcode = request->opcode;
2135
2136         if (ctdb->known_ips == NULL) {
2137                 D_INFO("TAKEOVER_IP %s - not a public IP\n",
2138                        ctdb_sock_addr_to_string(mem_ctx, &ip->addr, false));
2139                 goto done;
2140         }
2141
2142         ips = &ctdb->known_ips[header->destnode];
2143
2144         t = NULL;
2145         for (i = 0; i < ips->num; i++) {
2146                 if (ctdb_sock_addr_same_ip(&ips->ip[i].addr, &ip->addr)) {
2147                         t = &ips->ip[i];
2148                         break;
2149                 }
2150         }
2151         if (t == NULL) {
2152                 D_INFO("TAKEOVER_IP %s - not a public IP\n",
2153                        ctdb_sock_addr_to_string(mem_ctx, &ip->addr, false));
2154                 goto done;
2155         }
2156
2157         if (t->pnn == header->destnode) {
2158                 D_INFO("TAKEOVER_IP %s - redundant\n",
2159                        ctdb_sock_addr_to_string(mem_ctx, &ip->addr, false));
2160         } else {
2161                 D_NOTICE("TAKEOVER_IP %s\n",
2162                          ctdb_sock_addr_to_string(mem_ctx, &ip->addr, false));
2163                 t->pnn = ip->pnn;
2164         }
2165
2166 done:
2167         reply.status = 0;
2168         reply.errmsg = NULL;
2169         client_send_control(req, header, &reply);
2170 }
2171
2172 static void control_get_public_ips(TALLOC_CTX *mem_ctx,
2173                                    struct tevent_req *req,
2174                                    struct ctdb_req_header *header,
2175                                    struct ctdb_req_control *request)
2176 {
2177         struct client_state *state = tevent_req_data(
2178                 req, struct client_state);
2179         struct ctdbd_context *ctdb = state->ctdb;
2180         struct ctdb_reply_control reply;
2181         struct ctdb_public_ip_list *ips = NULL;
2182
2183         reply.rdata.opcode = request->opcode;
2184
2185         if (ctdb->known_ips == NULL) {
2186                 /* No IPs defined so create a dummy empty struct and ship it */
2187                 ips = talloc_zero(mem_ctx, struct ctdb_public_ip_list);;
2188                 if (ips == NULL) {
2189                         reply.status = ENOMEM;
2190                         reply.errmsg = "Memory error";
2191                         goto done;
2192                 }
2193                 goto ok;
2194         }
2195
2196         ips = &ctdb->known_ips[header->destnode];
2197
2198         if (request->flags & CTDB_PUBLIC_IP_FLAGS_ONLY_AVAILABLE) {
2199                 /* If runstate is not RUNNING or a node is then return
2200                  * no available IPs.  Don't worry about interface
2201                  * states here - we're not faking down to that level.
2202                  */
2203                 if (ctdb->runstate != CTDB_RUNSTATE_RUNNING) {
2204                         /* No available IPs: return dummy empty struct */
2205                         ips = talloc_zero(mem_ctx, struct ctdb_public_ip_list);;
2206                         if (ips == NULL) {
2207                                 reply.status = ENOMEM;
2208                                 reply.errmsg = "Memory error";
2209                                 goto done;
2210                         }
2211                 }
2212         }
2213
2214 ok:
2215         reply.rdata.data.pubip_list = ips;
2216         reply.status = 0;
2217         reply.errmsg = NULL;
2218
2219 done:
2220         client_send_control(req, header, &reply);
2221 }
2222
2223 static void control_get_nodemap(TALLOC_CTX *mem_ctx,
2224                                 struct tevent_req *req,
2225                                 struct ctdb_req_header *header,
2226                                 struct ctdb_req_control *request)
2227 {
2228         struct client_state *state = tevent_req_data(
2229                 req, struct client_state);
2230         struct ctdbd_context *ctdb = state->ctdb;
2231         struct ctdb_reply_control reply;
2232         struct ctdb_node_map *nodemap;
2233         struct node *node;
2234         int i;
2235
2236         reply.rdata.opcode = request->opcode;
2237
2238         nodemap = talloc_zero(mem_ctx, struct ctdb_node_map);
2239         if (nodemap == NULL) {
2240                 goto fail;
2241         }
2242
2243         nodemap->num = ctdb->node_map->num_nodes;
2244         nodemap->node = talloc_array(nodemap, struct ctdb_node_and_flags,
2245                                      nodemap->num);
2246         if (nodemap->node == NULL) {
2247                 goto fail;
2248         }
2249
2250         for (i=0; i<nodemap->num; i++) {
2251                 node = &ctdb->node_map->node[i];
2252                 nodemap->node[i] = (struct ctdb_node_and_flags) {
2253                         .pnn = node->pnn,
2254                         .flags = node->flags,
2255                         .addr = node->addr,
2256                 };
2257         }
2258
2259         reply.rdata.data.nodemap = nodemap;
2260         reply.status = 0;
2261         reply.errmsg = NULL;
2262         client_send_control(req, header, &reply);
2263         return;
2264
2265 fail:
2266         reply.status = -1;
2267         reply.errmsg = "Memory error";
2268         client_send_control(req, header, &reply);
2269 }
2270
2271 static void control_get_reclock_file(TALLOC_CTX *mem_ctx,
2272                                      struct tevent_req *req,
2273                                      struct ctdb_req_header *header,
2274                                      struct ctdb_req_control *request)
2275 {
2276         struct client_state *state = tevent_req_data(
2277                 req, struct client_state);
2278         struct ctdbd_context *ctdb = state->ctdb;
2279         struct ctdb_reply_control reply;
2280
2281         reply.rdata.opcode = request->opcode;
2282
2283         if (ctdb->reclock != NULL) {
2284                 reply.rdata.data.reclock_file =
2285                         talloc_strdup(mem_ctx, ctdb->reclock);
2286                 if (reply.rdata.data.reclock_file == NULL) {
2287                         reply.status = ENOMEM;
2288                         reply.errmsg = "Memory error";
2289                         goto done;
2290                 }
2291         } else {
2292                 reply.rdata.data.reclock_file = NULL;
2293         }
2294
2295         reply.status = 0;
2296         reply.errmsg = NULL;
2297
2298 done:
2299         client_send_control(req, header, &reply);
2300 }
2301
2302 static void control_stop_node(TALLOC_CTX *mem_ctx,
2303                               struct tevent_req *req,
2304                               struct ctdb_req_header *header,
2305                               struct ctdb_req_control *request)
2306 {
2307         struct client_state *state = tevent_req_data(
2308                 req, struct client_state);
2309         struct ctdbd_context *ctdb = state->ctdb;
2310         struct ctdb_reply_control reply;
2311
2312         reply.rdata.opcode = request->opcode;
2313
2314         DEBUG(DEBUG_INFO, ("Stopping node\n"));
2315         ctdb->node_map->node[header->destnode].flags |= NODE_FLAGS_STOPPED;
2316
2317         reply.status = 0;
2318         reply.errmsg = NULL;
2319
2320         client_send_control(req, header, &reply);
2321         return;
2322 }
2323
2324 static void control_continue_node(TALLOC_CTX *mem_ctx,
2325                                   struct tevent_req *req,
2326                                   struct ctdb_req_header *header,
2327                                   struct ctdb_req_control *request)
2328 {
2329         struct client_state *state = tevent_req_data(
2330                 req, struct client_state);
2331         struct ctdbd_context *ctdb = state->ctdb;
2332         struct ctdb_reply_control reply;
2333
2334         reply.rdata.opcode = request->opcode;
2335
2336         DEBUG(DEBUG_INFO, ("Continue node\n"));
2337         ctdb->node_map->node[header->destnode].flags &= ~NODE_FLAGS_STOPPED;
2338
2339         reply.status = 0;
2340         reply.errmsg = NULL;
2341
2342         client_send_control(req, header, &reply);
2343         return;
2344 }
2345
2346 static void set_ban_state_callback(struct tevent_req *subreq)
2347 {
2348         struct node *node = tevent_req_callback_data(
2349                 subreq, struct node);
2350         bool status;
2351
2352         status = tevent_wakeup_recv(subreq);
2353         TALLOC_FREE(subreq);
2354         if (! status) {
2355                 DEBUG(DEBUG_INFO, ("tevent_wakeup_recv failed\n"));
2356         }
2357
2358         node->flags &= ~NODE_FLAGS_BANNED;
2359 }
2360
2361 static void control_set_ban_state(TALLOC_CTX *mem_ctx,
2362                                   struct tevent_req *req,
2363                                   struct ctdb_req_header *header,
2364                                   struct ctdb_req_control *request)
2365 {
2366         struct client_state *state = tevent_req_data(
2367                 req, struct client_state);
2368         struct tevent_req *subreq;
2369         struct ctdbd_context *ctdb = state->ctdb;
2370         struct ctdb_ban_state *ban = request->rdata.data.ban_state;
2371         struct ctdb_reply_control reply;
2372         struct node *node;
2373
2374         reply.rdata.opcode = request->opcode;
2375
2376         if (ban->pnn != header->destnode) {
2377                 DEBUG(DEBUG_INFO,
2378                       ("SET_BAN_STATE control for PNN %d rejected\n",
2379                        ban->pnn));
2380                 reply.status = EINVAL;
2381                 goto fail;
2382         }
2383
2384         node = &ctdb->node_map->node[header->destnode];
2385
2386         if (ban->time == 0) {
2387                 DEBUG(DEBUG_INFO,("Unbanning this node\n"));
2388                 node->flags &= ~NODE_FLAGS_BANNED;
2389                 goto done;
2390         }
2391
2392         subreq = tevent_wakeup_send(ctdb->node_map, state->ev,
2393                                     tevent_timeval_current_ofs(
2394                                             ban->time, 0));
2395         if (subreq == NULL) {
2396                 reply.status = ENOMEM;
2397                 goto fail;
2398         }
2399         tevent_req_set_callback(subreq, set_ban_state_callback, node);
2400
2401         DEBUG(DEBUG_INFO, ("Banning this node for %d seconds\n", ban->time));
2402         node->flags |= NODE_FLAGS_BANNED;
2403         ctdb->vnn_map->generation = INVALID_GENERATION;
2404
2405 done:
2406         reply.status = 0;
2407         reply.errmsg = NULL;
2408
2409         client_send_control(req, header, &reply);
2410         return;
2411
2412 fail:
2413         reply.errmsg = "Failed to ban node";
2414 }
2415
2416 static void control_get_db_seqnum(TALLOC_CTX *mem_ctx,
2417                                struct tevent_req *req,
2418                                struct ctdb_req_header *header,
2419                                struct ctdb_req_control *request)
2420 {
2421         struct client_state *state = tevent_req_data(
2422                 req, struct client_state);
2423         struct ctdbd_context *ctdb = state->ctdb;
2424         struct ctdb_reply_control reply;
2425         struct database *db;
2426
2427         reply.rdata.opcode = request->opcode;
2428
2429         db = database_find(ctdb->db_map, request->rdata.data.db_id);
2430         if (db == NULL) {
2431                 reply.status = ENOENT;
2432                 reply.errmsg = "Database not found";
2433         } else {
2434                 reply.rdata.data.seqnum = db->seq_num;
2435                 reply.status = 0;
2436                 reply.errmsg = NULL;
2437         }
2438
2439         client_send_control(req, header, &reply);
2440 }
2441
2442 static void control_db_get_health(TALLOC_CTX *mem_ctx,
2443                                   struct tevent_req *req,
2444                                   struct ctdb_req_header *header,
2445                                   struct ctdb_req_control *request)
2446 {
2447         struct client_state *state = tevent_req_data(
2448                 req, struct client_state);
2449         struct ctdbd_context *ctdb = state->ctdb;
2450         struct ctdb_reply_control reply;
2451         struct database *db;
2452
2453         reply.rdata.opcode = request->opcode;
2454
2455         db = database_find(ctdb->db_map, request->rdata.data.db_id);
2456         if (db == NULL) {
2457                 reply.status = ENOENT;
2458                 reply.errmsg = "Database not found";
2459         } else {
2460                 reply.rdata.data.reason = NULL;
2461                 reply.status = 0;
2462                 reply.errmsg = NULL;
2463         }
2464
2465         client_send_control(req, header, &reply);
2466 }
2467
2468 static struct ctdb_iface_list *get_ctdb_iface_list(TALLOC_CTX *mem_ctx,
2469                                                    struct ctdbd_context *ctdb)
2470 {
2471         struct ctdb_iface_list *iface_list;
2472         struct interface *iface;
2473         int i;
2474
2475         iface_list = talloc_zero(mem_ctx, struct ctdb_iface_list);
2476         if (iface_list == NULL) {
2477                 goto done;
2478         }
2479
2480         iface_list->num = ctdb->iface_map->num;
2481         iface_list->iface = talloc_array(iface_list, struct ctdb_iface,
2482                                          iface_list->num);
2483         if (iface_list->iface == NULL) {
2484                 TALLOC_FREE(iface_list);
2485                 goto done;
2486         }
2487
2488         for (i=0; i<iface_list->num; i++) {
2489                 iface = &ctdb->iface_map->iface[i];
2490                 iface_list->iface[i] = (struct ctdb_iface) {
2491                         .link_state = iface->link_up,
2492                         .references = iface->references,
2493                 };
2494                 strlcpy(iface_list->iface[i].name, iface->name,
2495                         sizeof(iface_list->iface[i].name));
2496         }
2497
2498 done:
2499         return iface_list;
2500 }
2501
2502 static void control_get_public_ip_info(TALLOC_CTX *mem_ctx,
2503                                        struct tevent_req *req,
2504                                        struct ctdb_req_header *header,
2505                                        struct ctdb_req_control *request)
2506 {
2507         struct client_state *state = tevent_req_data(
2508                 req, struct client_state);
2509         struct ctdbd_context *ctdb = state->ctdb;
2510         struct ctdb_reply_control reply;
2511         ctdb_sock_addr *addr = request->rdata.data.addr;
2512         struct ctdb_public_ip_list *known = NULL;
2513         struct ctdb_public_ip_info *info = NULL;
2514         unsigned i;
2515
2516         reply.rdata.opcode = request->opcode;
2517
2518         info = talloc_zero(mem_ctx, struct ctdb_public_ip_info);
2519         if (info == NULL) {
2520                 reply.status = ENOMEM;
2521                 reply.errmsg = "Memory error";
2522                 goto done;
2523         }
2524
2525         reply.rdata.data.ipinfo = info;
2526
2527         if (ctdb->known_ips != NULL) {
2528                 known = &ctdb->known_ips[header->destnode];
2529         } else {
2530                 /* No IPs defined so create a dummy empty struct and
2531                  * fall through.  The given IP won't be matched
2532                  * below...
2533                  */
2534                 known = talloc_zero(mem_ctx, struct ctdb_public_ip_list);;
2535                 if (known == NULL) {
2536                         reply.status = ENOMEM;
2537                         reply.errmsg = "Memory error";
2538                         goto done;
2539                 }
2540         }
2541
2542         for (i = 0; i < known->num; i++) {
2543                 if (ctdb_sock_addr_same_ip(&known->ip[i].addr,
2544                                            addr)) {
2545                         break;
2546                 }
2547         }
2548
2549         if (i == known->num) {
2550                 D_ERR("GET_PUBLIC_IP_INFO: not known public IP %s\n",
2551                       ctdb_sock_addr_to_string(mem_ctx, addr, false));
2552                 reply.status = -1;
2553                 reply.errmsg = "Unknown address";
2554                 goto done;
2555         }
2556
2557         info->ip = known->ip[i];
2558
2559         /* The fake PUBLICIPS stanza and resulting known_ips data
2560          * don't know anything about interfaces, so completely fake
2561          * this.
2562          */
2563         info->active_idx = 0;
2564
2565         info->ifaces = get_ctdb_iface_list(mem_ctx, ctdb);
2566         if (info->ifaces == NULL) {
2567                 reply.status = ENOMEM;
2568                 reply.errmsg = "Memory error";
2569                 goto done;
2570         }
2571
2572         reply.status = 0;
2573         reply.errmsg = NULL;
2574
2575 done:
2576         client_send_control(req, header, &reply);
2577 }
2578
2579 static void control_get_ifaces(TALLOC_CTX *mem_ctx,
2580                                struct tevent_req *req,
2581                                struct ctdb_req_header *header,
2582                                struct ctdb_req_control *request)
2583 {
2584         struct client_state *state = tevent_req_data(
2585                 req, struct client_state);
2586         struct ctdbd_context *ctdb = state->ctdb;
2587         struct ctdb_reply_control reply;
2588         struct ctdb_iface_list *iface_list;
2589
2590         reply.rdata.opcode = request->opcode;
2591
2592         iface_list = get_ctdb_iface_list(mem_ctx, ctdb);
2593         if (iface_list == NULL) {
2594                 goto fail;
2595         }
2596
2597         reply.rdata.data.iface_list = iface_list;
2598         reply.status = 0;
2599         reply.errmsg = NULL;
2600         client_send_control(req, header, &reply);
2601         return;
2602
2603 fail:
2604         reply.status = -1;
2605         reply.errmsg = "Memory error";
2606         client_send_control(req, header, &reply);
2607 }
2608
2609 static void control_set_iface_link_state(TALLOC_CTX *mem_ctx,
2610                                          struct tevent_req *req,
2611                                          struct ctdb_req_header *header,
2612                                          struct ctdb_req_control *request)
2613 {
2614         struct client_state *state = tevent_req_data(
2615                 req, struct client_state);
2616         struct ctdbd_context *ctdb = state->ctdb;
2617         struct ctdb_reply_control reply;
2618         struct ctdb_iface *in_iface;
2619         struct interface *iface = NULL;
2620         bool link_up = false;
2621         int i;
2622
2623         reply.rdata.opcode = request->opcode;
2624
2625         in_iface = request->rdata.data.iface;
2626
2627         if (in_iface->name[CTDB_IFACE_SIZE] != '\0') {
2628                 reply.errmsg = "interface name not terminated";
2629                 goto fail;
2630         }
2631
2632         switch (in_iface->link_state) {
2633                 case 0:
2634                         link_up = false;
2635                         break;
2636
2637                 case 1:
2638                         link_up = true;
2639                         break;
2640
2641                 default:
2642                         reply.errmsg = "invalid link state";
2643                         goto fail;
2644         }
2645
2646         if (in_iface->references != 0) {
2647                 reply.errmsg = "references should be 0";
2648                 goto fail;
2649         }
2650
2651         for (i=0; i<ctdb->iface_map->num; i++) {
2652                 if (strcmp(ctdb->iface_map->iface[i].name,
2653                            in_iface->name) == 0) {
2654                         iface = &ctdb->iface_map->iface[i];
2655                         break;
2656                 }
2657         }
2658
2659         if (iface == NULL) {
2660                 reply.errmsg = "interface not found";
2661                 goto fail;
2662         }
2663
2664         iface->link_up = link_up;
2665
2666         reply.status = 0;
2667         reply.errmsg = NULL;
2668         client_send_control(req, header, &reply);
2669         return;
2670
2671 fail:
2672         reply.status = -1;
2673         client_send_control(req, header, &reply);
2674 }
2675
2676 static void control_set_db_readonly(TALLOC_CTX *mem_ctx,
2677                                     struct tevent_req *req,
2678                                     struct ctdb_req_header *header,
2679                                     struct ctdb_req_control *request)
2680 {
2681         struct client_state *state = tevent_req_data(
2682                 req, struct client_state);
2683         struct ctdbd_context *ctdb = state->ctdb;
2684         struct ctdb_reply_control reply;
2685         struct database *db;
2686
2687         reply.rdata.opcode = request->opcode;
2688
2689         db = database_find(ctdb->db_map, request->rdata.data.db_id);
2690         if (db == NULL) {
2691                 reply.status = ENOENT;
2692                 reply.errmsg = "Database not found";
2693                 goto done;
2694         }
2695
2696         if (db->flags & CTDB_DB_FLAGS_PERSISTENT) {
2697                 reply.status = EINVAL;
2698                 reply.errmsg = "Can not set READONLY on persistent db";
2699                 goto done;
2700         }
2701
2702         db->flags |= CTDB_DB_FLAGS_READONLY;
2703         reply.status = 0;
2704         reply.errmsg = NULL;
2705
2706 done:
2707         client_send_control(req, header, &reply);
2708 }
2709
2710 static void control_set_db_sticky(TALLOC_CTX *mem_ctx,
2711                                     struct tevent_req *req,
2712                                     struct ctdb_req_header *header,
2713                                     struct ctdb_req_control *request)
2714 {
2715         struct client_state *state = tevent_req_data(
2716                 req, struct client_state);
2717         struct ctdbd_context *ctdb = state->ctdb;
2718         struct ctdb_reply_control reply;
2719         struct database *db;
2720
2721         reply.rdata.opcode = request->opcode;
2722
2723         db = database_find(ctdb->db_map, request->rdata.data.db_id);
2724         if (db == NULL) {
2725                 reply.status = ENOENT;
2726                 reply.errmsg = "Database not found";
2727                 goto done;
2728         }
2729
2730         if (db->flags & CTDB_DB_FLAGS_PERSISTENT) {
2731                 reply.status = EINVAL;
2732                 reply.errmsg = "Can not set STICKY on persistent db";
2733                 goto done;
2734         }
2735
2736         db->flags |= CTDB_DB_FLAGS_STICKY;
2737         reply.status = 0;
2738         reply.errmsg = NULL;
2739
2740 done:
2741         client_send_control(req, header, &reply);
2742 }
2743
2744 static void control_ipreallocated(TALLOC_CTX *mem_ctx,
2745                                   struct tevent_req *req,
2746                                   struct ctdb_req_header *header,
2747                                   struct ctdb_req_control *request)
2748 {
2749         struct ctdb_reply_control reply;
2750
2751         /* Always succeed */
2752         reply.rdata.opcode = request->opcode;
2753         reply.status = 0;
2754         reply.errmsg = NULL;
2755
2756         client_send_control(req, header, &reply);
2757 }
2758
2759 static void control_get_runstate(TALLOC_CTX *mem_ctx,
2760                                  struct tevent_req *req,
2761                                  struct ctdb_req_header *header,
2762                                  struct ctdb_req_control *request)
2763 {
2764         struct client_state *state = tevent_req_data(
2765                 req, struct client_state);
2766         struct ctdbd_context *ctdb = state->ctdb;
2767         struct ctdb_reply_control reply;
2768
2769         reply.rdata.opcode = request->opcode;
2770         reply.rdata.data.runstate = ctdb->runstate;
2771         reply.status = 0;
2772         reply.errmsg = NULL;
2773
2774         client_send_control(req, header, &reply);
2775 }
2776
2777 static void control_get_nodes_file(TALLOC_CTX *mem_ctx,
2778                                    struct tevent_req *req,
2779                                    struct ctdb_req_header *header,
2780                                    struct ctdb_req_control *request)
2781 {
2782         struct ctdb_reply_control reply;
2783         struct ctdb_node_map *nodemap;
2784
2785         reply.rdata.opcode = request->opcode;
2786
2787         nodemap = read_nodes_file(mem_ctx, header->destnode);
2788         if (nodemap == NULL) {
2789                 goto fail;
2790         }
2791
2792         reply.rdata.data.nodemap = nodemap;
2793         reply.status = 0;
2794         reply.errmsg = NULL;
2795         client_send_control(req, header, &reply);
2796         return;
2797
2798 fail:
2799         reply.status = -1;
2800         reply.errmsg = "Failed to read nodes file";
2801         client_send_control(req, header, &reply);
2802 }
2803
2804 static void control_check_pid_srvid(TALLOC_CTX *mem_ctx,
2805                                     struct tevent_req *req,
2806                                     struct ctdb_req_header *header,
2807                                     struct ctdb_req_control *request)
2808 {
2809         struct client_state *state = tevent_req_data(
2810                 req, struct client_state);
2811         struct ctdbd_context *ctdb = state->ctdb;
2812         struct ctdb_client *client;
2813         struct client_state *cstate;
2814         struct ctdb_reply_control reply;
2815         bool pid_found, srvid_found;
2816         int ret;
2817
2818         reply.rdata.opcode = request->opcode;
2819
2820         pid_found = false;
2821         srvid_found = false;
2822
2823         for (client=ctdb->client_list; client != NULL; client=client->next) {
2824                 if (client->pid == request->rdata.data.pid_srvid->pid) {
2825                         pid_found = true;
2826                         cstate = (struct client_state *)client->state;
2827                         ret = srvid_exists(ctdb->srv,
2828                                            request->rdata.data.pid_srvid->srvid,
2829                                            cstate);
2830                         if (ret == 0) {
2831                                 srvid_found = true;
2832                                 ret = kill(cstate->pid, 0);
2833                                 if (ret != 0) {
2834                                         reply.status = ret;
2835                                         reply.errmsg = strerror(errno);
2836                                 } else {
2837                                         reply.status = 0;
2838                                         reply.errmsg = NULL;
2839                                 }
2840                         }
2841                 }
2842         }
2843
2844         if (! pid_found) {
2845                 reply.status = -1;
2846                 reply.errmsg = "No client for PID";
2847         } else if (! srvid_found) {
2848                 reply.status = -1;
2849                 reply.errmsg = "No client for PID and SRVID";
2850         }
2851
2852         client_send_control(req, header, &reply);
2853 }
2854
2855 static bool fake_control_failure(TALLOC_CTX *mem_ctx,
2856                                  struct tevent_req *req,
2857                                  struct ctdb_req_header *header,
2858                                  struct ctdb_req_control *request)
2859 {
2860         struct client_state *state = tevent_req_data(
2861                 req, struct client_state);
2862         struct ctdbd_context *ctdb = state->ctdb;
2863         struct ctdb_reply_control reply;
2864         struct fake_control_failure *f = NULL;
2865
2866         D_DEBUG("Checking fake control failure for control %u on node %u\n",
2867                 request->opcode, header->destnode);
2868         for (f = ctdb->control_failures; f != NULL; f = f->next) {
2869                 if (f->opcode == request->opcode &&
2870                     (f->pnn == header->destnode ||
2871                      f->pnn == CTDB_UNKNOWN_PNN)) {
2872
2873                         reply.rdata.opcode = request->opcode;
2874                         if (strcmp(f->error, "TIMEOUT") == 0) {
2875                                 /* Causes no reply */
2876                                 D_ERR("Control %u fake timeout on node %u\n",
2877                                       request->opcode, header->destnode);
2878                                 return true;
2879                         } else if (strcmp(f->error, "ERROR") == 0) {
2880                                 D_ERR("Control %u fake error on node %u\n",
2881                                       request->opcode, header->destnode);
2882                                 reply.status = -1;
2883                                 reply.errmsg = f->comment;
2884                                 client_send_control(req, header, &reply);
2885                                 return true;
2886                         }
2887                 }
2888         }
2889
2890         return false;
2891 }
2892
2893 static void control_error(TALLOC_CTX *mem_ctx,
2894                           struct tevent_req *req,
2895                           struct ctdb_req_header *header,
2896                           struct ctdb_req_control *request)
2897 {
2898         struct ctdb_reply_control reply;
2899
2900         reply.rdata.opcode = request->opcode;
2901         reply.status = -1;
2902         reply.errmsg = "Not implemented";
2903
2904         client_send_control(req, header, &reply);
2905 }
2906
2907 /*
2908  * Handling protocol - messages
2909  */
2910
2911 struct disable_recoveries_state {
2912         struct node *node;
2913 };
2914
2915 static void disable_recoveries_callback(struct tevent_req *subreq)
2916 {
2917         struct disable_recoveries_state *substate = tevent_req_callback_data(
2918                 subreq, struct disable_recoveries_state);
2919         bool status;
2920
2921         status = tevent_wakeup_recv(subreq);
2922         TALLOC_FREE(subreq);
2923         if (! status) {
2924                 DEBUG(DEBUG_INFO, ("tevent_wakeup_recv failed\n"));
2925         }
2926
2927         substate->node->recovery_disabled = false;
2928         TALLOC_FREE(substate->node->recovery_substate);
2929 }
2930
2931 static void message_disable_recoveries(TALLOC_CTX *mem_ctx,
2932                                        struct tevent_req *req,
2933                                        struct ctdb_req_header *header,
2934                                        struct ctdb_req_message *request)
2935 {
2936         struct client_state *state = tevent_req_data(
2937                 req, struct client_state);
2938         struct tevent_req *subreq;
2939         struct ctdbd_context *ctdb = state->ctdb;
2940         struct disable_recoveries_state *substate;
2941         struct ctdb_disable_message *disable = request->data.disable;
2942         struct ctdb_req_message_data reply;
2943         struct node *node;
2944         int ret = -1;
2945         TDB_DATA data;
2946
2947         node = &ctdb->node_map->node[header->destnode];
2948
2949         if (disable->timeout == 0) {
2950                 TALLOC_FREE(node->recovery_substate);
2951                 node->recovery_disabled = false;
2952                 DEBUG(DEBUG_INFO, ("Enabled recoveries on node %u\n",
2953                                    header->destnode));
2954                 goto done;
2955         }
2956
2957         substate = talloc_zero(ctdb->node_map,
2958                                struct disable_recoveries_state);
2959         if (substate == NULL) {
2960                 goto fail;
2961         }
2962
2963         substate->node = node;
2964
2965         subreq = tevent_wakeup_send(substate, state->ev,
2966                                     tevent_timeval_current_ofs(
2967                                             disable->timeout, 0));
2968         if (subreq == NULL) {
2969                 talloc_free(substate);
2970                 goto fail;
2971         }
2972         tevent_req_set_callback(subreq, disable_recoveries_callback, substate);
2973
2974         DEBUG(DEBUG_INFO, ("Disabled recoveries for %d seconds on node %u\n",
2975                            disable->timeout, header->destnode));
2976         node->recovery_substate = substate;
2977         node->recovery_disabled = true;
2978
2979 done:
2980         ret = header->destnode;
2981
2982 fail:
2983         reply.srvid = disable->srvid;
2984         data.dptr = (uint8_t *)&ret;
2985         data.dsize = sizeof(int);
2986         reply.data = data;
2987
2988         client_send_message(req, header, &reply);
2989 }
2990
2991 static void message_takeover_run(TALLOC_CTX *mem_ctx,
2992                                  struct tevent_req *req,
2993                                  struct ctdb_req_header *header,
2994                                  struct ctdb_req_message *request)
2995 {
2996         struct client_state *state = tevent_req_data(
2997                 req, struct client_state);
2998         struct ctdbd_context *ctdb = state->ctdb;
2999         struct ctdb_srvid_message *srvid = request->data.msg;
3000         struct ctdb_req_message_data reply;
3001         int ret = -1;
3002         TDB_DATA data;
3003
3004         if (header->destnode != ctdb->node_map->recmaster) {
3005                 /* No reply! Only recmaster replies... */
3006                 return;
3007         }
3008
3009         DEBUG(DEBUG_INFO, ("IP takover run on node %u\n",
3010                            header->destnode));
3011         ret = header->destnode;
3012
3013         reply.srvid = srvid->srvid;
3014         data.dptr = (uint8_t *)&ret;
3015         data.dsize = sizeof(int);
3016         reply.data = data;
3017
3018         client_send_message(req, header, &reply);
3019 }
3020
3021 /*
3022  * Handle a single client
3023  */
3024
3025 static void client_read_handler(uint8_t *buf, size_t buflen,
3026                                 void *private_data);
3027 static void client_dead_handler(void *private_data);
3028 static void client_process_packet(struct tevent_req *req,
3029                                   uint8_t *buf, size_t buflen);
3030 static void client_process_message(struct tevent_req *req,
3031                                    uint8_t *buf, size_t buflen);
3032 static void client_process_control(struct tevent_req *req,
3033                                    uint8_t *buf, size_t buflen);
3034 static void client_reply_done(struct tevent_req *subreq);
3035
3036 static struct tevent_req *client_send(TALLOC_CTX *mem_ctx,
3037                                       struct tevent_context *ev,
3038                                       int fd, struct ctdbd_context *ctdb,
3039                                       int pnn)
3040 {
3041         struct tevent_req *req;
3042         struct client_state *state;
3043         struct ucred cr;
3044         socklen_t crl = sizeof(struct ucred);
3045         int ret;
3046
3047         req = tevent_req_create(mem_ctx, &state, struct client_state);
3048         if (req == NULL) {
3049                 return NULL;
3050         }
3051
3052         state->ev = ev;
3053         state->fd = fd;
3054         state->ctdb = ctdb;
3055         state->pnn = pnn;
3056
3057         ret = getsockopt(fd, SOL_SOCKET, SO_PEERCRED, &cr, &crl);
3058         if (ret != 0) {
3059                 tevent_req_error(req, ret);
3060                 return tevent_req_post(req, ev);
3061         }
3062         state->pid = cr.pid;
3063
3064         ret = comm_setup(state, ev, fd, client_read_handler, req,
3065                          client_dead_handler, req, &state->comm);
3066         if (ret != 0) {
3067                 tevent_req_error(req, ret);
3068                 return tevent_req_post(req, ev);
3069         }
3070
3071         ret = client_add(ctdb, state->pid, state);
3072         if (ret != 0) {
3073                 tevent_req_error(req, ret);
3074                 return tevent_req_post(req, ev);
3075         }
3076
3077         DEBUG(DEBUG_INFO, ("New client fd=%d\n", fd));
3078
3079         return req;
3080 }
3081
3082 static void client_read_handler(uint8_t *buf, size_t buflen,
3083                                 void *private_data)
3084 {
3085         struct tevent_req *req = talloc_get_type_abort(
3086                 private_data, struct tevent_req);
3087         struct client_state *state = tevent_req_data(
3088                 req, struct client_state);
3089         struct ctdbd_context *ctdb = state->ctdb;
3090         struct ctdb_req_header header;
3091         size_t np;
3092         int ret, i;
3093
3094         ret = ctdb_req_header_pull(buf, buflen, &header, &np);
3095         if (ret != 0) {
3096                 return;
3097         }
3098
3099         if (buflen != header.length) {
3100                 return;
3101         }
3102
3103         ret = ctdb_req_header_verify(&header, 0);
3104         if (ret != 0) {
3105                 return;
3106         }
3107
3108         header_fix_pnn(&header, ctdb);
3109
3110         if (header.destnode == CTDB_BROADCAST_ALL) {
3111                 for (i=0; i<ctdb->node_map->num_nodes; i++) {
3112                         header.destnode = i;
3113
3114                         ctdb_req_header_push(&header, buf, &np);
3115                         client_process_packet(req, buf, buflen);
3116                 }
3117                 return;
3118         }
3119
3120         if (header.destnode == CTDB_BROADCAST_CONNECTED) {
3121                 for (i=0; i<ctdb->node_map->num_nodes; i++) {
3122                         if (ctdb->node_map->node[i].flags &
3123                             NODE_FLAGS_DISCONNECTED) {
3124                                 continue;
3125                         }
3126
3127                         header.destnode = i;
3128
3129                         ctdb_req_header_push(&header, buf, &np);
3130                         client_process_packet(req, buf, buflen);
3131                 }
3132                 return;
3133         }
3134
3135         if (header.destnode > ctdb->node_map->num_nodes) {
3136                 fprintf(stderr, "Invalid destination pnn 0x%x\n",
3137                         header.destnode);
3138                 return;
3139         }
3140
3141
3142         if (ctdb->node_map->node[header.destnode].flags & NODE_FLAGS_DISCONNECTED) {
3143                 fprintf(stderr, "Packet for disconnected node pnn %u\n",
3144                         header.destnode);
3145                 return;
3146         }
3147
3148         ctdb_req_header_push(&header, buf, &np);
3149         client_process_packet(req, buf, buflen);
3150 }
3151
3152 static void client_dead_handler(void *private_data)
3153 {
3154         struct tevent_req *req = talloc_get_type_abort(
3155                 private_data, struct tevent_req);
3156
3157         tevent_req_done(req);
3158 }
3159
3160 static void client_process_packet(struct tevent_req *req,
3161                                   uint8_t *buf, size_t buflen)
3162 {
3163         struct ctdb_req_header header;
3164         size_t np;
3165         int ret;
3166
3167         ret = ctdb_req_header_pull(buf, buflen, &header, &np);
3168         if (ret != 0) {
3169                 return;
3170         }
3171
3172         switch (header.operation) {
3173         case CTDB_REQ_MESSAGE:
3174                 client_process_message(req, buf, buflen);
3175                 break;
3176
3177         case CTDB_REQ_CONTROL:
3178                 client_process_control(req, buf, buflen);
3179                 break;
3180
3181         default:
3182                 break;
3183         }
3184 }
3185
3186 static void client_process_message(struct tevent_req *req,
3187                                    uint8_t *buf, size_t buflen)
3188 {
3189         struct client_state *state = tevent_req_data(
3190                 req, struct client_state);
3191         struct ctdbd_context *ctdb = state->ctdb;
3192         TALLOC_CTX *mem_ctx;
3193         struct ctdb_req_header header;
3194         struct ctdb_req_message request;
3195         uint64_t srvid;
3196         int ret;
3197
3198         mem_ctx = talloc_new(state);
3199         if (tevent_req_nomem(mem_ctx, req)) {
3200                 return;
3201         }
3202
3203         ret = ctdb_req_message_pull(buf, buflen, &header, mem_ctx, &request);
3204         if (ret != 0) {
3205                 talloc_free(mem_ctx);
3206                 tevent_req_error(req, ret);
3207                 return;
3208         }
3209
3210         header_fix_pnn(&header, ctdb);
3211
3212         if (header.destnode >= ctdb->node_map->num_nodes) {
3213                 /* Many messages are not replied to, so just behave as
3214                  * though this message was not received */
3215                 fprintf(stderr, "Invalid node %d\n", header.destnode);
3216                 talloc_free(mem_ctx);
3217                 return;
3218         }
3219
3220         srvid = request.srvid;
3221         DEBUG(DEBUG_INFO, ("request srvid = 0x%"PRIx64"\n", srvid));
3222
3223         if (srvid == CTDB_SRVID_DISABLE_RECOVERIES) {
3224                 message_disable_recoveries(mem_ctx, req, &header, &request);
3225         } else if (srvid == CTDB_SRVID_TAKEOVER_RUN) {
3226                 message_takeover_run(mem_ctx, req, &header, &request);
3227         }
3228
3229         /* check srvid */
3230         talloc_free(mem_ctx);
3231 }
3232
3233 static void client_process_control(struct tevent_req *req,
3234                                    uint8_t *buf, size_t buflen)
3235 {
3236         struct client_state *state = tevent_req_data(
3237                 req, struct client_state);
3238         struct ctdbd_context *ctdb = state->ctdb;
3239         TALLOC_CTX *mem_ctx;
3240         struct ctdb_req_header header;
3241         struct ctdb_req_control request;
3242         int ret;
3243
3244         mem_ctx = talloc_new(state);
3245         if (tevent_req_nomem(mem_ctx, req)) {
3246                 return;
3247         }
3248
3249         ret = ctdb_req_control_pull(buf, buflen, &header, mem_ctx, &request);
3250         if (ret != 0) {
3251                 talloc_free(mem_ctx);
3252                 tevent_req_error(req, ret);
3253                 return;
3254         }
3255
3256         header_fix_pnn(&header, ctdb);
3257
3258         if (header.destnode >= ctdb->node_map->num_nodes) {
3259                 struct ctdb_reply_control reply;
3260
3261                 reply.rdata.opcode = request.opcode;
3262                 reply.errmsg = "Invalid node";
3263                 reply.status = -1;
3264                 client_send_control(req, &header, &reply);
3265                 return;
3266         }
3267
3268         DEBUG(DEBUG_INFO, ("request opcode = %u, reqid = %u\n",
3269                            request.opcode, header.reqid));
3270
3271         if (fake_control_failure(mem_ctx, req, &header, &request)) {
3272                 goto done;
3273         }
3274
3275         switch (request.opcode) {
3276         case CTDB_CONTROL_PROCESS_EXISTS:
3277                 control_process_exists(mem_ctx, req, &header, &request);
3278                 break;
3279
3280         case CTDB_CONTROL_PING:
3281                 control_ping(mem_ctx, req, &header, &request);
3282                 break;
3283
3284         case CTDB_CONTROL_GETDBPATH:
3285                 control_getdbpath(mem_ctx, req, &header, &request);
3286                 break;
3287
3288         case CTDB_CONTROL_GETVNNMAP:
3289                 control_getvnnmap(mem_ctx, req, &header, &request);
3290                 break;
3291
3292         case CTDB_CONTROL_GET_DEBUG:
3293                 control_get_debug(mem_ctx, req, &header, &request);
3294                 break;
3295
3296         case CTDB_CONTROL_SET_DEBUG:
3297                 control_set_debug(mem_ctx, req, &header, &request);
3298                 break;
3299
3300         case CTDB_CONTROL_GET_DBMAP:
3301                 control_get_dbmap(mem_ctx, req, &header, &request);
3302                 break;
3303
3304         case CTDB_CONTROL_GET_RECMODE:
3305                 control_get_recmode(mem_ctx, req, &header, &request);
3306                 break;
3307
3308         case CTDB_CONTROL_SET_RECMODE:
3309                 control_set_recmode(mem_ctx, req, &header, &request);
3310                 break;
3311
3312         case CTDB_CONTROL_REGISTER_SRVID:
3313                 control_register_srvid(mem_ctx, req, &header, &request);
3314                 break;
3315
3316         case CTDB_CONTROL_DEREGISTER_SRVID:
3317                 control_deregister_srvid(mem_ctx, req, &header, &request);
3318                 break;
3319
3320         case CTDB_CONTROL_GET_DBNAME:
3321                 control_get_dbname(mem_ctx, req, &header, &request);
3322                 break;
3323
3324         case CTDB_CONTROL_GET_PID:
3325                 control_get_pid(mem_ctx, req, &header, &request);
3326                 break;
3327
3328         case CTDB_CONTROL_GET_RECMASTER:
3329                 control_get_recmaster(mem_ctx, req, &header, &request);
3330                 break;
3331
3332         case CTDB_CONTROL_GET_PNN:
3333                 control_get_pnn(mem_ctx, req, &header, &request);
3334                 break;
3335
3336         case CTDB_CONTROL_SHUTDOWN:
3337                 control_shutdown(mem_ctx, req, &header, &request);
3338                 break;
3339
3340         case CTDB_CONTROL_SET_TUNABLE:
3341                 control_set_tunable(mem_ctx, req, &header, &request);
3342                 break;
3343
3344         case CTDB_CONTROL_GET_TUNABLE:
3345                 control_get_tunable(mem_ctx, req, &header, &request);
3346                 break;
3347
3348         case CTDB_CONTROL_LIST_TUNABLES:
3349                 control_list_tunables(mem_ctx, req, &header, &request);
3350                 break;
3351
3352         case CTDB_CONTROL_MODIFY_FLAGS:
3353                 control_modify_flags(mem_ctx, req, &header, &request);
3354                 break;
3355
3356         case CTDB_CONTROL_GET_ALL_TUNABLES:
3357                 control_get_all_tunables(mem_ctx, req, &header, &request);
3358                 break;
3359
3360         case CTDB_CONTROL_UPTIME:
3361                 control_uptime(mem_ctx, req, &header, &request);
3362                 break;
3363
3364         case CTDB_CONTROL_RELOAD_NODES_FILE:
3365                 control_reload_nodes_file(mem_ctx, req, &header, &request);
3366                 break;
3367
3368         case CTDB_CONTROL_GET_CAPABILITIES:
3369                 control_get_capabilities(mem_ctx, req, &header, &request);
3370                 break;
3371
3372         case CTDB_CONTROL_RELEASE_IP:
3373                 control_release_ip(mem_ctx, req, &header, &request);
3374                 break;
3375
3376         case CTDB_CONTROL_TAKEOVER_IP:
3377                 control_takeover_ip(mem_ctx, req, &header, &request);
3378                 break;
3379
3380         case CTDB_CONTROL_GET_PUBLIC_IPS:
3381                 control_get_public_ips(mem_ctx, req, &header, &request);
3382                 break;
3383
3384         case CTDB_CONTROL_GET_NODEMAP:
3385                 control_get_nodemap(mem_ctx, req, &header, &request);
3386                 break;
3387
3388         case CTDB_CONTROL_GET_RECLOCK_FILE:
3389                 control_get_reclock_file(mem_ctx, req, &header, &request);
3390                 break;
3391
3392         case CTDB_CONTROL_STOP_NODE:
3393                 control_stop_node(mem_ctx, req, &header, &request);
3394                 break;
3395
3396         case CTDB_CONTROL_CONTINUE_NODE:
3397                 control_continue_node(mem_ctx, req, &header, &request);
3398                 break;
3399
3400         case CTDB_CONTROL_SET_BAN_STATE:
3401                 control_set_ban_state(mem_ctx, req, &header, &request);
3402                 break;
3403
3404         case CTDB_CONTROL_GET_DB_SEQNUM:
3405                 control_get_db_seqnum(mem_ctx, req, &header, &request);
3406                 break;
3407
3408         case CTDB_CONTROL_DB_GET_HEALTH:
3409                 control_db_get_health(mem_ctx, req, &header, &request);
3410                 break;
3411
3412         case CTDB_CONTROL_GET_PUBLIC_IP_INFO:
3413                 control_get_public_ip_info(mem_ctx, req, &header, &request);
3414                 break;
3415
3416         case CTDB_CONTROL_GET_IFACES:
3417                 control_get_ifaces(mem_ctx, req, &header, &request);
3418                 break;
3419
3420         case CTDB_CONTROL_SET_IFACE_LINK_STATE:
3421                 control_set_iface_link_state(mem_ctx, req, &header, &request);
3422                 break;
3423
3424         case CTDB_CONTROL_SET_DB_READONLY:
3425                 control_set_db_readonly(mem_ctx, req, &header, &request);
3426                 break;
3427
3428         case CTDB_CONTROL_SET_DB_STICKY:
3429                 control_set_db_sticky(mem_ctx, req, &header, &request);
3430                 break;
3431
3432         case CTDB_CONTROL_IPREALLOCATED:
3433                 control_ipreallocated(mem_ctx, req, &header, &request);
3434                 break;
3435
3436         case CTDB_CONTROL_GET_RUNSTATE:
3437                 control_get_runstate(mem_ctx, req, &header, &request);
3438                 break;
3439
3440         case CTDB_CONTROL_GET_NODES_FILE:
3441                 control_get_nodes_file(mem_ctx, req, &header, &request);
3442                 break;
3443
3444         case CTDB_CONTROL_CHECK_PID_SRVID:
3445                 control_check_pid_srvid(mem_ctx, req, &header, &request);
3446                 break;
3447
3448         default:
3449                 if (! (request.flags & CTDB_CTRL_FLAG_NOREPLY)) {
3450                         control_error(mem_ctx, req, &header, &request);
3451                 }
3452                 break;
3453         }
3454
3455 done:
3456         talloc_free(mem_ctx);
3457 }
3458
3459 static int client_recv(struct tevent_req *req, int *perr)
3460 {
3461         struct client_state *state = tevent_req_data(
3462                 req, struct client_state);
3463         int err;
3464
3465         DEBUG(DEBUG_INFO, ("Client done fd=%d\n", state->fd));
3466         close(state->fd);
3467
3468         if (tevent_req_is_unix_error(req, &err)) {
3469                 if (perr != NULL) {
3470                         *perr = err;
3471                 }
3472                 return -1;
3473         }
3474
3475         return state->status;
3476 }
3477
3478 /*
3479  * Fake CTDB server
3480  */
3481
3482 struct server_state {
3483         struct tevent_context *ev;
3484         struct ctdbd_context *ctdb;
3485         int fd;
3486 };
3487
3488 static void server_new_client(struct tevent_req *subreq);
3489 static void server_client_done(struct tevent_req *subreq);
3490
3491 static struct tevent_req *server_send(TALLOC_CTX *mem_ctx,
3492                                       struct tevent_context *ev,
3493                                       struct ctdbd_context *ctdb,
3494                                       int fd)
3495 {
3496         struct tevent_req *req, *subreq;
3497         struct server_state *state;
3498
3499         req = tevent_req_create(mem_ctx, &state, struct server_state);
3500         if (req == NULL) {
3501                 return NULL;
3502         }
3503
3504         state->ev = ev;
3505         state->ctdb = ctdb;
3506         state->fd = fd;
3507
3508         subreq = accept_send(state, ev, fd);
3509         if (tevent_req_nomem(subreq, req)) {
3510                 return tevent_req_post(req, ev);
3511         }
3512         tevent_req_set_callback(subreq, server_new_client, req);
3513
3514         return req;
3515 }
3516
3517 static void server_new_client(struct tevent_req *subreq)
3518 {
3519         struct tevent_req *req = tevent_req_callback_data(
3520                 subreq, struct tevent_req);
3521         struct server_state *state = tevent_req_data(
3522                 req, struct server_state);
3523         struct ctdbd_context *ctdb = state->ctdb;
3524         int client_fd;
3525         int ret = 0;
3526
3527         client_fd = accept_recv(subreq, NULL, NULL, &ret);
3528         TALLOC_FREE(subreq);
3529         if (client_fd == -1) {
3530                 tevent_req_error(req, ret);
3531                 return;
3532         }
3533
3534         subreq = client_send(state, state->ev, client_fd,
3535                              ctdb, ctdb->node_map->pnn);
3536         if (tevent_req_nomem(subreq, req)) {
3537                 return;
3538         }
3539         tevent_req_set_callback(subreq, server_client_done, req);
3540
3541         ctdb->num_clients += 1;
3542
3543         subreq = accept_send(state, state->ev, state->fd);
3544         if (tevent_req_nomem(subreq, req)) {
3545                 return;
3546         }
3547         tevent_req_set_callback(subreq, server_new_client, req);
3548 }
3549
3550 static void server_client_done(struct tevent_req *subreq)
3551 {
3552         struct tevent_req *req = tevent_req_callback_data(
3553                 subreq, struct tevent_req);
3554         struct server_state *state = tevent_req_data(
3555                 req, struct server_state);
3556         struct ctdbd_context *ctdb = state->ctdb;
3557         int ret = 0;
3558         int status;
3559
3560         status = client_recv(subreq, &ret);
3561         TALLOC_FREE(subreq);
3562         if (status < 0) {
3563                 tevent_req_error(req, ret);
3564                 return;
3565         }
3566
3567         ctdb->num_clients -= 1;
3568
3569         if (status == 99) {
3570                 /* Special status, to shutdown server */
3571                 DEBUG(DEBUG_INFO, ("Shutting down server\n"));
3572                 tevent_req_done(req);
3573         }
3574 }
3575
3576 static bool server_recv(struct tevent_req *req, int *perr)
3577 {
3578         int err;
3579
3580         if (tevent_req_is_unix_error(req, &err)) {
3581                 if (perr != NULL) {
3582                         *perr = err;
3583                 }
3584                 return false;
3585         }
3586         return true;
3587 }
3588
3589 /*
3590  * Main functions
3591  */
3592
3593 static int socket_init(const char *sockpath)
3594 {
3595         struct sockaddr_un addr;
3596         size_t len;
3597         int ret, fd;
3598
3599         memset(&addr, 0, sizeof(addr));
3600         addr.sun_family = AF_UNIX;
3601
3602         len = strlcpy(addr.sun_path, sockpath, sizeof(addr.sun_path));
3603         if (len >= sizeof(addr.sun_path)) {
3604                 fprintf(stderr, "path too long: %s\n", sockpath);
3605                 return -1;
3606         }
3607
3608         fd = socket(AF_UNIX, SOCK_STREAM, 0);
3609         if (fd == -1) {
3610                 fprintf(stderr, "socket failed - %s\n", sockpath);
3611                 return -1;
3612         }
3613
3614         ret = bind(fd, (struct sockaddr *)&addr, sizeof(addr));
3615         if (ret != 0) {
3616                 fprintf(stderr, "bind failed - %s\n", sockpath);
3617                 goto fail;
3618         }
3619
3620         ret = listen(fd, 10);
3621         if (ret != 0) {
3622                 fprintf(stderr, "listen failed\n");
3623                 goto fail;
3624         }
3625
3626         DEBUG(DEBUG_INFO, ("Socket init done\n"));
3627
3628         return fd;
3629
3630 fail:
3631         if (fd != -1) {
3632                 close(fd);
3633         }
3634         return -1;
3635 }
3636
3637 static struct options {
3638         const char *sockpath;
3639         const char *pidfile;
3640         const char *debuglevel;
3641 } options;
3642
3643 static struct poptOption cmdline_options[] = {
3644         { "socket", 's', POPT_ARG_STRING, &options.sockpath, 0,
3645                 "Unix domain socket path", "filename" },
3646         { "pidfile", 'p', POPT_ARG_STRING, &options.pidfile, 0,
3647                 "pid file", "filename" } ,
3648         { "debug", 'd', POPT_ARG_STRING, &options.debuglevel, 0,
3649                 "debug level", "ERR|WARNING|NOTICE|INFO|DEBUG" } ,
3650 };
3651
3652 static void cleanup(void)
3653 {
3654         unlink(options.sockpath);
3655         unlink(options.pidfile);
3656 }
3657
3658 static void signal_handler(int sig)
3659 {
3660         cleanup();
3661         exit(0);
3662 }
3663
3664 static void start_server(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
3665                          struct ctdbd_context *ctdb, int fd, int pfd)
3666 {
3667         struct tevent_req *req;
3668         int ret = 0;
3669         ssize_t len;
3670
3671         atexit(cleanup);
3672         signal(SIGTERM, signal_handler);
3673
3674         req = server_send(mem_ctx, ev, ctdb, fd);
3675         if (req == NULL) {
3676                 fprintf(stderr, "Memory error\n");
3677                 exit(1);
3678         }
3679
3680         len = write(pfd, &ret, sizeof(ret));
3681         if (len != sizeof(ret)) {
3682                 fprintf(stderr, "Failed to send message to parent\n");
3683                 exit(1);
3684         }
3685         close(pfd);
3686
3687         tevent_req_poll(req, ev);
3688
3689         server_recv(req, &ret);
3690         if (ret != 0) {
3691                 exit(1);
3692         }
3693 }
3694
3695 int main(int argc, const char *argv[])
3696 {
3697         TALLOC_CTX *mem_ctx;
3698         struct ctdbd_context *ctdb;
3699         struct tevent_context *ev;
3700         poptContext pc;
3701         int opt, fd, ret, pfd[2];
3702         ssize_t len;
3703         pid_t pid;
3704         FILE *fp;
3705
3706         pc = poptGetContext(argv[0], argc, argv, cmdline_options,
3707                             POPT_CONTEXT_KEEP_FIRST);
3708         while ((opt = poptGetNextOpt(pc)) != -1) {
3709                 fprintf(stderr, "Invalid option %s\n", poptBadOption(pc, 0));
3710                 exit(1);
3711         }
3712
3713         if (options.sockpath == NULL) {
3714                 fprintf(stderr, "Please specify socket path\n");
3715                 poptPrintHelp(pc, stdout, 0);
3716                 exit(1);
3717         }
3718
3719         if (options.pidfile == NULL) {
3720                 fprintf(stderr, "Please specify pid file\n");
3721                 poptPrintHelp(pc, stdout, 0);
3722                 exit(1);
3723         }
3724
3725         mem_ctx = talloc_new(NULL);
3726         if (mem_ctx == NULL) {
3727                 fprintf(stderr, "Memory error\n");
3728                 exit(1);
3729         }
3730
3731         ret = logging_init(mem_ctx, "file:", options.debuglevel, "fake-ctdbd");
3732         if (ret != 0) {
3733                 fprintf(stderr, "Invalid debug level\n");
3734                 poptPrintHelp(pc, stdout, 0);
3735                 exit(1);
3736         }
3737
3738         ctdb = ctdbd_setup(mem_ctx);
3739         if (ctdb == NULL) {
3740                 exit(1);
3741         }
3742
3743         if (! ctdbd_verify(ctdb)) {
3744                 exit(1);
3745         }
3746
3747         ev = tevent_context_init(mem_ctx);
3748         if (ev == NULL) {
3749                 fprintf(stderr, "Memory error\n");
3750                 exit(1);
3751         }
3752
3753         fd = socket_init(options.sockpath);
3754         if (fd == -1) {
3755                 exit(1);
3756         }
3757
3758         ret = pipe(pfd);
3759         if (ret != 0) {
3760                 fprintf(stderr, "Failed to create pipe\n");
3761                 cleanup();
3762                 exit(1);
3763         }
3764
3765         pid = fork();
3766         if (pid == -1) {
3767                 fprintf(stderr, "Failed to fork\n");
3768                 cleanup();
3769                 exit(1);
3770         }
3771
3772         if (pid == 0) {
3773                 /* Child */
3774                 close(pfd[0]);
3775                 start_server(mem_ctx, ev, ctdb, fd, pfd[1]);
3776                 exit(1);
3777         }
3778
3779         /* Parent */
3780         close(pfd[1]);
3781
3782         len = read(pfd[0], &ret, sizeof(ret));
3783         close(pfd[0]);
3784         if (len != sizeof(ret)) {
3785                 fprintf(stderr, "len = %zi\n", len);
3786                 fprintf(stderr, "Failed to get message from child\n");
3787                 kill(pid, SIGTERM);
3788                 exit(1);
3789         }
3790
3791         fp = fopen(options.pidfile, "w");
3792         if (fp == NULL) {
3793                 fprintf(stderr, "Failed to open pid file %s\n",
3794                         options.pidfile);
3795                 kill(pid, SIGTERM);
3796                 exit(1);
3797         }
3798         fprintf(fp, "%d\n", pid);
3799         fclose(fp);
3800
3801         return 0;
3802 }