ctdb-tests: Add debug messages for unimplemented functions
[amitay/samba.git] / ctdb / tests / src / fake_ctdbd.c
1 /*
2    Fake CTDB server for testing
3
4    Copyright (C) Amitay Isaacs  2016
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "replace.h"
21 #include "system/network.h"
22 #include "system/time.h"
23 #include "system/filesys.h"
24
25 #include <popt.h>
26 #include <talloc.h>
27 #include <tevent.h>
28 #include <tdb.h>
29
30 #include "lib/util/dlinklist.h"
31 #include "lib/util/tevent_unix.h"
32 #include "lib/util/debug.h"
33 #include "lib/util/samba_util.h"
34 #include "lib/async_req/async_sock.h"
35
36 #include "protocol/protocol.h"
37 #include "protocol/protocol_api.h"
38 #include "protocol/protocol_util.h"
39 #include "protocol/protocol_private.h"
40
41 #include "common/comm.h"
42 #include "common/logging.h"
43 #include "common/tunable.h"
44 #include "common/srvid.h"
45
46 #include "ipalloc_read_known_ips.h"
47
48
49 #define CTDB_PORT 4379
50
51 /* A fake flag that is only supported by some functions */
52 #define NODE_FLAGS_FAKE_TIMEOUT 0x80000000
53
54 struct node {
55         ctdb_sock_addr addr;
56         uint32_t pnn;
57         uint32_t flags;
58         uint32_t capabilities;
59         bool recovery_disabled;
60         void *recovery_substate;
61 };
62
63 struct node_map {
64         uint32_t num_nodes;
65         struct node *node;
66         uint32_t pnn;
67         uint32_t recmaster;
68 };
69
70 struct interface {
71         const char *name;
72         bool link_up;
73         uint32_t references;
74 };
75
76 struct interface_map {
77         int num;
78         struct interface *iface;
79 };
80
81 struct vnn_map {
82         uint32_t recmode;
83         uint32_t generation;
84         uint32_t size;
85         uint32_t *map;
86 };
87
88 struct database {
89         struct database *prev, *next;
90         const char *name;
91         const char *path;
92         struct tdb_context *tdb;
93         uint32_t id;
94         uint8_t flags;
95         uint64_t seq_num;
96 };
97
98 struct database_map {
99         struct database *db;
100         const char *dbdir;
101 };
102
103 struct fake_control_failure {
104         struct fake_control_failure  *prev, *next;
105         enum ctdb_controls opcode;
106         uint32_t pnn;
107         const char *error;
108         const char *comment;
109 };
110
111 struct ctdb_client {
112         struct ctdb_client *prev, *next;
113         struct ctdbd_context *ctdb;
114         pid_t pid;
115         void *state;
116 };
117
118 struct ctdbd_context {
119         struct node_map *node_map;
120         struct interface_map *iface_map;
121         struct vnn_map *vnn_map;
122         struct database_map *db_map;
123         struct srvid_context *srv;
124         int num_clients;
125         struct timeval start_time;
126         struct timeval recovery_start_time;
127         struct timeval recovery_end_time;
128         bool takeover_disabled;
129         int log_level;
130         enum ctdb_runstate runstate;
131         struct ctdb_tunable_list tun_list;
132         char *reclock;
133         struct ctdb_public_ip_list *known_ips;
134         struct fake_control_failure *control_failures;
135         struct ctdb_client *client_list;
136 };
137
138 /*
139  * Parse routines
140  */
141
142 static struct node_map *nodemap_init(TALLOC_CTX *mem_ctx)
143 {
144         struct node_map *node_map;
145
146         node_map = talloc_zero(mem_ctx, struct node_map);
147         if (node_map == NULL) {
148                 return NULL;
149         }
150
151         node_map->pnn = CTDB_UNKNOWN_PNN;
152         node_map->recmaster = CTDB_UNKNOWN_PNN;
153
154         return node_map;
155 }
156
157 /* Read a nodemap from stdin.  Each line looks like:
158  *  <PNN> <FLAGS> [RECMASTER] [CURRENT] [CAPABILITIES]
159  * EOF or a blank line terminates input.
160  *
161  * By default, capablities for each node are
162  * CTDB_CAP_RECMASTER|CTDB_CAP_LMASTER.  These 2
163  * capabilities can be faked off by adding, for example,
164  * -CTDB_CAP_RECMASTER.
165  */
166
167 static bool nodemap_parse(struct node_map *node_map)
168 {
169         char line[1024];
170
171         while ((fgets(line, sizeof(line), stdin) != NULL)) {
172                 uint32_t pnn, flags, capabilities;
173                 char *tok, *t;
174                 char *ip;
175                 ctdb_sock_addr saddr;
176                 struct node *node;
177                 int ret;
178
179                 if (line[0] == '\n') {
180                         break;
181                 }
182
183                 /* Get rid of pesky newline */
184                 if ((t = strchr(line, '\n')) != NULL) {
185                         *t = '\0';
186                 }
187
188                 /* Get PNN */
189                 tok = strtok(line, " \t");
190                 if (tok == NULL) {
191                         fprintf(stderr, "bad line (%s) - missing PNN\n", line);
192                         continue;
193                 }
194                 pnn = (uint32_t)strtoul(tok, NULL, 0);
195
196                 /* Get IP */
197                 tok = strtok(NULL, " \t");
198                 if (tok == NULL) {
199                         fprintf(stderr, "bad line (%s) - missing IP\n", line);
200                         continue;
201                 }
202                 ret = ctdb_sock_addr_from_string(tok, &saddr, false);
203                 if (ret != 0) {
204                         fprintf(stderr, "bad line (%s) - invalid IP\n", line);
205                         continue;
206                 }
207                 ctdb_sock_addr_set_port(&saddr, CTDB_PORT);
208                 ip = talloc_strdup(node_map, tok);
209                 if (ip == NULL) {
210                         goto fail;
211                 }
212
213                 /* Get flags */
214                 tok = strtok(NULL, " \t");
215                 if (tok == NULL) {
216                         fprintf(stderr, "bad line (%s) - missing flags\n",
217                                 line);
218                         continue;
219                 }
220                 flags = (uint32_t)strtoul(tok, NULL, 0);
221                 /* Handle deleted nodes */
222                 if (flags & NODE_FLAGS_DELETED) {
223                         talloc_free(ip);
224                         ip = talloc_strdup(node_map, "0.0.0.0");
225                         if (ip == NULL) {
226                                 goto fail;
227                         }
228                 }
229                 capabilities = CTDB_CAP_RECMASTER|CTDB_CAP_LMASTER;
230
231                 tok = strtok(NULL, " \t");
232                 while (tok != NULL) {
233                         if (strcmp(tok, "CURRENT") == 0) {
234                                 node_map->pnn = pnn;
235                         } else if (strcmp(tok, "RECMASTER") == 0) {
236                                 node_map->recmaster = pnn;
237                         } else if (strcmp(tok, "-CTDB_CAP_RECMASTER") == 0) {
238                                 capabilities &= ~CTDB_CAP_RECMASTER;
239                         } else if (strcmp(tok, "-CTDB_CAP_LMASTER") == 0) {
240                                 capabilities &= ~CTDB_CAP_LMASTER;
241                         } else if (strcmp(tok, "TIMEOUT") == 0) {
242                                 /* This can be done with just a flag
243                                  * value but it is probably clearer
244                                  * and less error-prone to fake this
245                                  * with an explicit token */
246                                 flags |= NODE_FLAGS_FAKE_TIMEOUT;
247                         }
248                         tok = strtok(NULL, " \t");
249                 }
250
251                 node_map->node = talloc_realloc(node_map, node_map->node,
252                                                 struct node,
253                                                 node_map->num_nodes + 1);
254                 if (node_map->node == NULL) {
255                         goto fail;
256                 }
257                 node = &node_map->node[node_map->num_nodes];
258
259                 ret = ctdb_sock_addr_from_string(ip, &node->addr, false);
260                 if (ret != 0) {
261                         fprintf(stderr, "bad line (%s) - invalid IP\n", line);
262                         continue;
263                 }
264                 ctdb_sock_addr_set_port(&node->addr, CTDB_PORT);
265                 node->pnn = pnn;
266                 node->flags = flags;
267                 node->capabilities = capabilities;
268                 node->recovery_disabled = false;
269                 node->recovery_substate = NULL;
270
271                 node_map->num_nodes += 1;
272         }
273
274         DEBUG(DEBUG_INFO, ("Parsing nodemap done\n"));
275         return true;
276
277 fail:
278         DEBUG(DEBUG_INFO, ("Parsing nodemap failed\n"));
279         return false;
280
281 }
282
283 /* Append a node to a node map with given address and flags */
284 static bool node_map_add(struct ctdb_node_map *nodemap,
285                          const char *nstr, uint32_t flags)
286 {
287         ctdb_sock_addr addr;
288         uint32_t num;
289         struct ctdb_node_and_flags *n;
290         int ret;
291
292         ret = ctdb_sock_addr_from_string(nstr, &addr, false);
293         if (ret != 0) {
294                 fprintf(stderr, "Invalid IP address %s\n", nstr);
295                 return false;
296         }
297         ctdb_sock_addr_set_port(&addr, CTDB_PORT);
298
299         num = nodemap->num;
300         nodemap->node = talloc_realloc(nodemap, nodemap->node,
301                                        struct ctdb_node_and_flags, num+1);
302         if (nodemap->node == NULL) {
303                 return false;
304         }
305
306         n = &nodemap->node[num];
307         n->addr = addr;
308         n->pnn = num;
309         n->flags = flags;
310
311         nodemap->num = num+1;
312         return true;
313 }
314
315 /* Read a nodes file into a node map */
316 static struct ctdb_node_map *ctdb_read_nodes_file(TALLOC_CTX *mem_ctx,
317                                                   const char *nlist)
318 {
319         char **lines;
320         int nlines;
321         int i;
322         struct ctdb_node_map *nodemap;
323
324         nodemap = talloc_zero(mem_ctx, struct ctdb_node_map);
325         if (nodemap == NULL) {
326                 return NULL;
327         }
328
329         lines = file_lines_load(nlist, &nlines, 0, mem_ctx);
330         if (lines == NULL) {
331                 return NULL;
332         }
333
334         while (nlines > 0 && strcmp(lines[nlines-1], "") == 0) {
335                 nlines--;
336         }
337
338         for (i=0; i<nlines; i++) {
339                 char *node;
340                 uint32_t flags;
341                 size_t len;
342
343                 node = lines[i];
344                 /* strip leading spaces */
345                 while((*node == ' ') || (*node == '\t')) {
346                         node++;
347                 }
348
349                 len = strlen(node);
350
351                 /* strip trailing spaces */
352                 while ((len > 1) &&
353                        ((node[len-1] == ' ') || (node[len-1] == '\t')))
354                 {
355                         node[len-1] = '\0';
356                         len--;
357                 }
358
359                 if (len == 0) {
360                         continue;
361                 }
362                 if (*node == '#') {
363                         /* A "deleted" node is a node that is
364                            commented out in the nodes file.  This is
365                            used instead of removing a line, which
366                            would cause subsequent nodes to change
367                            their PNN. */
368                         flags = NODE_FLAGS_DELETED;
369                         node = discard_const("0.0.0.0");
370                 } else {
371                         flags = 0;
372                 }
373                 if (! node_map_add(nodemap, node, flags)) {
374                         talloc_free(lines);
375                         TALLOC_FREE(nodemap);
376                         return NULL;
377                 }
378         }
379
380         talloc_free(lines);
381         return nodemap;
382 }
383
384 static struct ctdb_node_map *read_nodes_file(TALLOC_CTX *mem_ctx,
385                                              uint32_t pnn)
386 {
387         struct ctdb_node_map *nodemap;
388         char nodes_list[PATH_MAX];
389         const char *ctdb_base;
390         int num;
391
392         ctdb_base = getenv("CTDB_BASE");
393         if (ctdb_base == NULL) {
394                 D_ERR("CTDB_BASE is not set\n");
395                 return NULL;
396         }
397
398         /* read optional node-specific nodes file */
399         num = snprintf(nodes_list, sizeof(nodes_list),
400                        "%s/nodes.%d", ctdb_base, pnn);
401         if (num == sizeof(nodes_list)) {
402                 D_ERR("nodes file path too long\n");
403                 return NULL;
404         }
405         nodemap = ctdb_read_nodes_file(mem_ctx, nodes_list);
406         if (nodemap != NULL) {
407                 /* Fake a load failure for an empty nodemap */
408                 if (nodemap->num == 0) {
409                         talloc_free(nodemap);
410
411                         D_ERR("Failed to read nodes file \"%s\"\n", nodes_list);
412                         return NULL;
413                 }
414
415                 return nodemap;
416         }
417
418         /* read normal nodes file */
419         num = snprintf(nodes_list, sizeof(nodes_list), "%s/nodes", ctdb_base);
420         if (num == sizeof(nodes_list)) {
421                 D_ERR("nodes file path too long\n");
422                 return NULL;
423         }
424         nodemap = ctdb_read_nodes_file(mem_ctx, nodes_list);
425         if (nodemap != NULL) {
426                 return nodemap;
427         }
428
429         DBG_ERR("Failed to read nodes file \"%s\"\n", nodes_list);
430         return NULL;
431 }
432
433 static struct interface_map *interfaces_init(TALLOC_CTX *mem_ctx)
434 {
435         struct interface_map *iface_map;
436
437         iface_map = talloc_zero(mem_ctx, struct interface_map);
438         if (iface_map == NULL) {
439                 return NULL;
440         }
441
442         return iface_map;
443 }
444
445 /* Read interfaces information.  Same format as "ctdb ifaces -Y"
446  * output:
447  *   :Name:LinkStatus:References:
448  *   :eth2:1:4294967294
449  *   :eth1:1:4294967292
450  */
451
452 static bool interfaces_parse(struct interface_map *iface_map)
453 {
454         char line[1024];
455
456         while ((fgets(line, sizeof(line), stdin) != NULL)) {
457                 uint16_t link_state;
458                 uint32_t references;
459                 char *tok, *t, *name;
460                 struct interface *iface;
461
462                 if (line[0] == '\n') {
463                         break;
464                 }
465
466                 /* Get rid of pesky newline */
467                 if ((t = strchr(line, '\n')) != NULL) {
468                         *t = '\0';
469                 }
470
471                 if (strcmp(line, ":Name:LinkStatus:References:") == 0) {
472                         continue;
473                 }
474
475                 /* Leading colon... */
476                 // tok = strtok(line, ":");
477
478                 /* name */
479                 tok = strtok(line, ":");
480                 if (tok == NULL) {
481                         fprintf(stderr, "bad line (%s) - missing name\n", line);
482                         continue;
483                 }
484                 name = tok;
485
486                 /* link_state */
487                 tok = strtok(NULL, ":");
488                 if (tok == NULL) {
489                         fprintf(stderr, "bad line (%s) - missing link state\n",
490                                 line);
491                         continue;
492                 }
493                 link_state = (uint16_t)strtoul(tok, NULL, 0);
494
495                 /* references... */
496                 tok = strtok(NULL, ":");
497                 if (tok == NULL) {
498                         fprintf(stderr, "bad line (%s) - missing references\n",
499                                 line);
500                         continue;
501                 }
502                 references = (uint32_t)strtoul(tok, NULL, 0);
503
504                 iface_map->iface = talloc_realloc(iface_map, iface_map->iface,
505                                                   struct interface,
506                                                   iface_map->num + 1);
507                 if (iface_map->iface == NULL) {
508                         goto fail;
509                 }
510
511                 iface = &iface_map->iface[iface_map->num];
512
513                 iface->name = talloc_strdup(iface_map, name);
514                 if (iface->name == NULL) {
515                         goto fail;
516                 }
517                 iface->link_up = link_state;
518                 iface->references = references;
519
520                 iface_map->num += 1;
521         }
522
523         DEBUG(DEBUG_INFO, ("Parsing interfaces done\n"));
524         return true;
525
526 fail:
527         fprintf(stderr, "Parsing interfaces failed\n");
528         return false;
529 }
530
531 static struct vnn_map *vnnmap_init(TALLOC_CTX *mem_ctx)
532 {
533         struct vnn_map *vnn_map;
534
535         vnn_map = talloc_zero(mem_ctx, struct vnn_map);
536         if (vnn_map == NULL) {
537                 fprintf(stderr, "Memory error\n");
538                 return NULL;
539         }
540         vnn_map->recmode = CTDB_RECOVERY_ACTIVE;
541         vnn_map->generation = INVALID_GENERATION;
542
543         return vnn_map;
544 }
545
546 /* Read vnn map.
547  * output:
548  *   <GENERATION>
549  *   <LMASTER0>
550  *   <LMASTER1>
551  *   ...
552  */
553
554 static bool vnnmap_parse(struct vnn_map *vnn_map)
555 {
556         char line[1024];
557
558         while (fgets(line, sizeof(line), stdin) != NULL) {
559                 uint32_t n;
560                 char *t;
561
562                 if (line[0] == '\n') {
563                         break;
564                 }
565
566                 /* Get rid of pesky newline */
567                 if ((t = strchr(line, '\n')) != NULL) {
568                         *t = '\0';
569                 }
570
571                 n = (uint32_t) strtol(line, NULL, 0);
572
573                 /* generation */
574                 if (vnn_map->generation == INVALID_GENERATION) {
575                         vnn_map->generation = n;
576                         continue;
577                 }
578
579                 vnn_map->map = talloc_realloc(vnn_map, vnn_map->map, uint32_t,
580                                               vnn_map->size + 1);
581                 if (vnn_map->map == NULL) {
582                         fprintf(stderr, "Memory error\n");
583                         goto fail;
584                 }
585
586                 vnn_map->map[vnn_map->size] = n;
587                 vnn_map->size += 1;
588         }
589
590         DEBUG(DEBUG_INFO, ("Parsing vnnmap done\n"));
591         return true;
592
593 fail:
594         fprintf(stderr, "Parsing vnnmap failed\n");
595         return false;
596 }
597
598 static bool reclock_parse(struct ctdbd_context *ctdb)
599 {
600         char line[1024];
601         char *t;
602
603         if (fgets(line, sizeof(line), stdin) == NULL) {
604                 goto fail;
605         }
606
607         if (line[0] == '\n') {
608                 /* Recovery lock remains unset */
609                 goto ok;
610         }
611
612         /* Get rid of pesky newline */
613         if ((t = strchr(line, '\n')) != NULL) {
614                 *t = '\0';
615         }
616
617         ctdb->reclock = talloc_strdup(ctdb, line);
618         if (ctdb->reclock == NULL) {
619                 goto fail;
620         }
621 ok:
622         /* Swallow possible blank line following section.  Picky
623          * compiler settings don't allow the return value to be
624          * ignored, so make the compiler happy.
625          */
626         if (fgets(line, sizeof(line), stdin) == NULL) {
627                 ;
628         }
629         DEBUG(DEBUG_INFO, ("Parsing reclock done\n"));
630         return true;
631
632 fail:
633         fprintf(stderr, "Parsing reclock failed\n");
634         return false;
635 }
636
637 static struct database_map *dbmap_init(TALLOC_CTX *mem_ctx,
638                                        const char *dbdir)
639 {
640         struct database_map *db_map;
641
642         db_map = talloc_zero(mem_ctx, struct database_map);
643         if (db_map == NULL) {
644                 return NULL;
645         }
646
647         db_map->dbdir = talloc_strdup(db_map, dbdir);
648         if (db_map->dbdir == NULL) {
649                 talloc_free(db_map);
650                 return NULL;
651         }
652
653         return db_map;
654 }
655
656 /* Read a database map from stdin.  Each line looks like:
657  *  <ID> <NAME> [FLAGS] [SEQ_NUM]
658  * EOF or a blank line terminates input.
659  *
660  * By default, flags and seq_num are 0
661  */
662
663 static bool dbmap_parse(struct database_map *db_map)
664 {
665         char line[1024];
666
667         while ((fgets(line, sizeof(line), stdin) != NULL)) {
668                 uint32_t id;
669                 uint8_t flags = 0;
670                 uint32_t seq_num = 0;
671                 char *tok, *t;
672                 char *name;
673                 struct database *db;
674
675                 if (line[0] == '\n') {
676                         break;
677                 }
678
679                 /* Get rid of pesky newline */
680                 if ((t = strchr(line, '\n')) != NULL) {
681                         *t = '\0';
682                 }
683
684                 /* Get ID */
685                 tok = strtok(line, " \t");
686                 if (tok == NULL) {
687                         fprintf(stderr, "bad line (%s) - missing ID\n", line);
688                         continue;
689                 }
690                 id = (uint32_t)strtoul(tok, NULL, 0);
691
692                 /* Get NAME */
693                 tok = strtok(NULL, " \t");
694                 if (tok == NULL) {
695                         fprintf(stderr, "bad line (%s) - missing NAME\n", line);
696                         continue;
697                 }
698                 name = talloc_strdup(db_map, tok);
699                 if (name == NULL) {
700                         goto fail;
701                 }
702
703                 /* Get flags */
704                 tok = strtok(NULL, " \t");
705                 while (tok != NULL) {
706                         if (strcmp(tok, "PERSISTENT") == 0) {
707                                 flags |= CTDB_DB_FLAGS_PERSISTENT;
708                         } else if (strcmp(tok, "STICKY") == 0) {
709                                 flags |= CTDB_DB_FLAGS_STICKY;
710                         } else if (strcmp(tok, "READONLY") == 0) {
711                                 flags |= CTDB_DB_FLAGS_READONLY;
712                         } else if (strcmp(tok, "REPLICATED") == 0) {
713                                 flags |= CTDB_DB_FLAGS_REPLICATED;
714                         } else if (tok[0] >= '0'&& tok[0] <= '9') {
715                                 uint8_t nv = CTDB_DB_FLAGS_PERSISTENT |
716                                              CTDB_DB_FLAGS_REPLICATED;
717
718                                 if ((flags & nv) == 0) {
719                                         fprintf(stderr,
720                                                 "seq_num for volatile db\n");
721                                         goto fail;
722                                 }
723                                 seq_num = (uint64_t)strtoull(tok, NULL, 0);
724                         }
725
726                         tok = strtok(NULL, " \t");
727                 }
728
729                 db = talloc_zero(db_map, struct database);
730                 if (db == NULL) {
731                         goto fail;
732                 }
733
734                 db->id = id;
735                 db->name = talloc_steal(db, name);
736                 db->path = talloc_asprintf(db, "%s/%s", db_map->dbdir, name);
737                 if (db->path == NULL) {
738                         talloc_free(db);
739                         goto fail;
740                 }
741                 db->flags = flags;
742                 db->seq_num = seq_num;
743
744                 DLIST_ADD_END(db_map->db, db);
745         }
746
747         DEBUG(DEBUG_INFO, ("Parsing dbmap done\n"));
748         return true;
749
750 fail:
751         DEBUG(DEBUG_INFO, ("Parsing dbmap failed\n"));
752         return false;
753
754 }
755
756 static struct database *database_find(struct database_map *db_map,
757                                       uint32_t db_id)
758 {
759         struct database *db;
760
761         for (db = db_map->db; db != NULL; db = db->next) {
762                 if (db->id == db_id) {
763                         return db;
764                 }
765         }
766
767         return NULL;
768 }
769
770 static int database_count(struct database_map *db_map)
771 {
772         struct database *db;
773         int count = 0;
774
775         for (db = db_map->db; db != NULL; db = db->next) {
776                 count += 1;
777         }
778
779         return count;
780 }
781
782 static int database_flags(uint8_t db_flags)
783 {
784         int tdb_flags = 0;
785
786         if (db_flags & CTDB_DB_FLAGS_PERSISTENT) {
787                 tdb_flags = TDB_DEFAULT;
788         } else {
789                 /* volatile and replicated use the same flags */
790                 tdb_flags = TDB_NOSYNC |
791                             TDB_CLEAR_IF_FIRST |
792                             TDB_INCOMPATIBLE_HASH;
793         }
794
795         tdb_flags |= TDB_DISALLOW_NESTING;
796
797         return tdb_flags;
798 }
799
800 static struct database *database_new(struct database_map *db_map,
801                                      const char *name, uint8_t flags)
802 {
803         struct database *db;
804         TDB_DATA key;
805         int tdb_flags;
806
807         db = talloc_zero(db_map, struct database);
808         if (db == NULL) {
809                 return NULL;
810         }
811
812         db->name = talloc_strdup(db, name);
813         if (db->name == NULL) {
814                 goto fail;
815         }
816
817         db->path = talloc_asprintf(db, "%s/%s", db_map->dbdir, name);
818         if (db->path == NULL) {
819                 goto fail;
820         }
821
822         key.dsize = strlen(db->name) + 1;
823         key.dptr = discard_const(db->name);
824
825         db->id = tdb_jenkins_hash(&key);
826         db->flags = flags;
827
828         tdb_flags = database_flags(flags);
829
830         db->tdb = tdb_open(db->path, 8192, tdb_flags, O_CREAT|O_RDWR, 0644);
831         if (db->tdb == NULL) {
832                 DBG_ERR("tdb_open\n");
833                 goto fail;
834         }
835
836         DLIST_ADD_END(db_map->db, db);
837         return db;
838
839 fail:
840         DBG_ERR("Memory error\n");
841         talloc_free(db);
842         return NULL;
843
844 }
845
846 static int ltdb_store(struct database *db, TDB_DATA key,
847                       struct ctdb_ltdb_header *header, TDB_DATA data)
848 {
849         int ret;
850         bool db_volatile = true;
851         bool keep = false;
852
853         if (db->tdb == NULL) {
854                 return EINVAL;
855         }
856
857         if ((db->flags & CTDB_DB_FLAGS_PERSISTENT) ||
858             (db->flags & CTDB_DB_FLAGS_REPLICATED)) {
859                 db_volatile = false;
860         }
861
862         if (data.dsize > 0) {
863                 keep = true;
864         } else {
865                 if (db_volatile && header->rsn == 0) {
866                         keep = true;
867                 }
868         }
869
870         if (keep) {
871                 TDB_DATA rec[2];
872
873                 rec[0].dsize = ctdb_ltdb_header_len(header);
874                 rec[0].dptr = (uint8_t *)header;
875
876                 rec[1].dsize = data.dsize;
877                 rec[1].dptr = data.dptr;
878
879                 ret = tdb_storev(db->tdb, key, rec, 2, TDB_REPLACE);
880         } else {
881                 if (header->rsn > 0) {
882                         ret = tdb_delete(db->tdb, key);
883                 } else {
884                         ret = 0;
885                 }
886         }
887
888         return ret;
889 }
890
891 static int ltdb_fetch(struct database *db, TDB_DATA key,
892                       struct ctdb_ltdb_header *header,
893                       TALLOC_CTX *mem_ctx, TDB_DATA *data)
894 {
895         TDB_DATA rec;
896         size_t np;
897         int ret;
898
899         if (db->tdb == NULL) {
900                 return EINVAL;
901         }
902
903         rec = tdb_fetch(db->tdb, key);
904         ret = ctdb_ltdb_header_pull(rec.dptr, rec.dsize, header, &np);
905         if (ret != 0) {
906                 if (rec.dptr != NULL) {
907                         free(rec.dptr);
908                 }
909
910                 *header = (struct ctdb_ltdb_header) {
911                         .rsn = 0,
912                         .dmaster = 0,
913                         .flags = 0,
914                 };
915
916                 ret = ltdb_store(db, key, header, tdb_null);
917                 if (ret != 0) {
918                         return ret;
919                 }
920
921                 *data = tdb_null;
922                 return 0;
923         }
924
925         data->dsize = rec.dsize - ctdb_ltdb_header_len(header);
926         data->dptr = talloc_memdup(mem_ctx,
927                                    rec.dptr + ctdb_ltdb_header_len(header),
928                                    data->dsize);
929         if (data->dptr == NULL) {
930                 free(rec.dptr);
931                 return ENOMEM;
932         }
933
934         return 0;
935 }
936
937 static int database_seqnum(struct database *db, uint64_t *seqnum)
938 {
939         const char *keyname = CTDB_DB_SEQNUM_KEY;
940         TDB_DATA key, data;
941         struct ctdb_ltdb_header header;
942         size_t np;
943         int ret;
944
945         if (db->tdb == NULL) {
946                 *seqnum = db->seq_num;
947                 return 0;
948         }
949
950         key.dptr = discard_const(keyname);
951         key.dsize = strlen(keyname) + 1;
952
953         ret = ltdb_fetch(db, key, &header, db, &data);
954         if (ret != 0) {
955                 return ret;
956         }
957
958         if (data.dsize == 0) {
959                 *seqnum = 0;
960                 return 0;
961         }
962
963         ret = ctdb_uint64_pull(data.dptr, data.dsize, seqnum, &np);
964         talloc_free(data.dptr);
965         if (ret != 0) {
966                 *seqnum = 0;
967         }
968
969         return ret;
970 }
971
972 static int ltdb_transaction_update(uint32_t reqid,
973                                    struct ctdb_ltdb_header *no_header,
974                                    TDB_DATA key, TDB_DATA data,
975                                    void *private_data)
976 {
977         struct database *db = (struct database *)private_data;
978         TALLOC_CTX *tmp_ctx = talloc_new(db);
979         struct ctdb_ltdb_header header = { 0 }, oldheader;
980         TDB_DATA olddata;
981         int ret;
982
983         if (db->tdb == NULL) {
984                 return EINVAL;
985         }
986
987         ret = ctdb_ltdb_header_extract(&data, &header);
988         if (ret != 0) {
989                 return ret;
990         }
991
992         ret = ltdb_fetch(db, key, &oldheader, tmp_ctx, &olddata);
993         if (ret != 0) {
994                 return ret;
995         }
996
997         if (olddata.dsize > 0) {
998                 if (oldheader.rsn > header.rsn ||
999                     (oldheader.rsn == header.rsn &&
1000                      olddata.dsize != data.dsize)) {
1001                         return -1;
1002                 }
1003         }
1004
1005         talloc_free(tmp_ctx);
1006
1007         ret = ltdb_store(db, key, &header, data);
1008         return ret;
1009 }
1010
1011 static int ltdb_transaction(struct database *db,
1012                             struct ctdb_rec_buffer *recbuf)
1013 {
1014         int ret;
1015
1016         if (db->tdb == NULL) {
1017                 return EINVAL;
1018         }
1019
1020         ret = tdb_transaction_start(db->tdb);
1021         if (ret == -1) {
1022                 return ret;
1023         }
1024
1025         ret = ctdb_rec_buffer_traverse(recbuf, ltdb_transaction_update, db);
1026         if (ret != 0) {
1027                 tdb_transaction_cancel(db->tdb);
1028         }
1029
1030         ret = tdb_transaction_commit(db->tdb);
1031         return ret;
1032 }
1033
1034 static bool public_ips_parse(struct ctdbd_context *ctdb,
1035                              uint32_t numnodes)
1036 {
1037         bool status;
1038
1039         if (numnodes == 0) {
1040                 D_ERR("Must initialise nodemap before public IPs\n");
1041                 return false;
1042         }
1043
1044         ctdb->known_ips = ipalloc_read_known_ips(ctdb, numnodes, false);
1045
1046         status = (ctdb->known_ips != NULL);
1047
1048         if (status) {
1049                 D_INFO("Parsing public IPs done\n");
1050         } else {
1051                 D_INFO("Parsing public IPs failed\n");
1052         }
1053
1054         return status;
1055 }
1056
1057 /* Read information about controls to fail.  Format is:
1058  *   <opcode> <pnn> {ERROR|TIMEOUT} <comment>
1059  */
1060 static bool control_failures_parse(struct ctdbd_context *ctdb)
1061 {
1062         char line[1024];
1063
1064         while ((fgets(line, sizeof(line), stdin) != NULL)) {
1065                 char *tok, *t;
1066                 enum ctdb_controls opcode;
1067                 uint32_t pnn;
1068                 const char *error;
1069                 const char *comment;
1070                 struct fake_control_failure *failure = NULL;
1071
1072                 if (line[0] == '\n') {
1073                         break;
1074                 }
1075
1076                 /* Get rid of pesky newline */
1077                 if ((t = strchr(line, '\n')) != NULL) {
1078                         *t = '\0';
1079                 }
1080
1081                 /* Get opcode */
1082                 tok = strtok(line, " \t");
1083                 if (tok == NULL) {
1084                         D_ERR("bad line (%s) - missing opcode\n", line);
1085                         continue;
1086                 }
1087                 opcode = (enum ctdb_controls)strtoul(tok, NULL, 0);
1088
1089                 /* Get PNN */
1090                 tok = strtok(NULL, " \t");
1091                 if (tok == NULL) {
1092                         D_ERR("bad line (%s) - missing PNN\n", line);
1093                         continue;
1094                 }
1095                 pnn = (uint32_t)strtoul(tok, NULL, 0);
1096
1097                 /* Get error */
1098                 tok = strtok(NULL, " \t");
1099                 if (tok == NULL) {
1100                         D_ERR("bad line (%s) - missing errno\n", line);
1101                         continue;
1102                 }
1103                 error = talloc_strdup(ctdb, tok);
1104                 if (error == NULL) {
1105                         goto fail;
1106                 }
1107                 if (strcmp(error, "ERROR") != 0 &&
1108                     strcmp(error, "TIMEOUT") != 0) {
1109                         D_ERR("bad line (%s) "
1110                               "- error must be \"ERROR\" or \"TIMEOUT\"\n",
1111                               line);
1112                         goto fail;
1113                 }
1114
1115                 /* Get comment */
1116                 tok = strtok(NULL, "\n"); /* rest of line */
1117                 if (tok == NULL) {
1118                         D_ERR("bad line (%s) - missing comment\n", line);
1119                         continue;
1120                 }
1121                 comment = talloc_strdup(ctdb, tok);
1122                 if (comment == NULL) {
1123                         goto fail;
1124                 }
1125
1126                 failure = talloc_zero(ctdb, struct fake_control_failure);
1127                 if (failure == NULL) {
1128                         goto fail;
1129                 }
1130
1131                 failure->opcode = opcode;
1132                 failure->pnn = pnn;
1133                 failure->error = error;
1134                 failure->comment = comment;
1135
1136                 DLIST_ADD(ctdb->control_failures, failure);
1137         }
1138
1139         D_INFO("Parsing fake control failures done\n");
1140         return true;
1141
1142 fail:
1143         D_INFO("Parsing fake control failures failed\n");
1144         return false;
1145 }
1146
1147 /*
1148  * Manage clients
1149  */
1150
1151 static int ctdb_client_destructor(struct ctdb_client *client)
1152 {
1153         DLIST_REMOVE(client->ctdb->client_list, client);
1154         return 0;
1155 }
1156
1157 static int client_add(struct ctdbd_context *ctdb, pid_t client_pid,
1158                       void *client_state)
1159 {
1160         struct ctdb_client *client;
1161
1162         client = talloc_zero(client_state, struct ctdb_client);
1163         if (client == NULL) {
1164                 return ENOMEM;
1165         }
1166
1167         client->ctdb = ctdb;
1168         client->pid = client_pid;
1169         client->state = client_state;
1170
1171         DLIST_ADD(ctdb->client_list, client);
1172         talloc_set_destructor(client, ctdb_client_destructor);
1173         return 0;
1174 }
1175
1176 static void *client_find(struct ctdbd_context *ctdb, pid_t client_pid)
1177 {
1178         struct ctdb_client *client;
1179
1180         for (client=ctdb->client_list; client != NULL; client=client->next) {
1181                 if (client->pid == client_pid) {
1182                         return client->state;
1183                 }
1184         }
1185
1186         return NULL;
1187 }
1188
1189 /*
1190  * CTDB context setup
1191  */
1192
1193 static uint32_t new_generation(uint32_t old_generation)
1194 {
1195         uint32_t generation;
1196
1197         while (1) {
1198                 generation = random();
1199                 if (generation != INVALID_GENERATION &&
1200                     generation != old_generation) {
1201                         break;
1202                 }
1203         }
1204
1205         return generation;
1206 }
1207
1208 static struct ctdbd_context *ctdbd_setup(TALLOC_CTX *mem_ctx,
1209                                          const char *dbdir)
1210 {
1211         struct ctdbd_context *ctdb;
1212         char line[1024];
1213         bool status;
1214         int ret;
1215
1216         ctdb = talloc_zero(mem_ctx, struct ctdbd_context);
1217         if (ctdb == NULL) {
1218                 return NULL;
1219         }
1220
1221         ctdb->node_map = nodemap_init(ctdb);
1222         if (ctdb->node_map == NULL) {
1223                 goto fail;
1224         }
1225
1226         ctdb->iface_map = interfaces_init(ctdb);
1227         if (ctdb->iface_map == NULL) {
1228                 goto fail;
1229         }
1230
1231         ctdb->vnn_map = vnnmap_init(ctdb);
1232         if (ctdb->vnn_map == NULL) {
1233                 goto fail;
1234         }
1235
1236         ctdb->db_map = dbmap_init(ctdb, dbdir);
1237         if (ctdb->db_map == NULL) {
1238                 goto fail;
1239         }
1240
1241         ret = srvid_init(ctdb, &ctdb->srv);
1242         if (ret != 0) {
1243                 goto fail;
1244         }
1245
1246         while (fgets(line, sizeof(line), stdin) != NULL) {
1247                 char *t;
1248
1249                 if ((t = strchr(line, '\n')) != NULL) {
1250                         *t = '\0';
1251                 }
1252
1253                 if (strcmp(line, "NODEMAP") == 0) {
1254                         status = nodemap_parse(ctdb->node_map);
1255                 } else if (strcmp(line, "IFACES") == 0) {
1256                         status = interfaces_parse(ctdb->iface_map);
1257                 } else if (strcmp(line, "VNNMAP") == 0) {
1258                         status = vnnmap_parse(ctdb->vnn_map);
1259                 } else if (strcmp(line, "DBMAP") == 0) {
1260                         status = dbmap_parse(ctdb->db_map);
1261                 } else if (strcmp(line, "PUBLICIPS") == 0) {
1262                         status = public_ips_parse(ctdb,
1263                                                   ctdb->node_map->num_nodes);
1264                 } else if (strcmp(line, "RECLOCK") == 0) {
1265                         status = reclock_parse(ctdb);
1266                 } else if (strcmp(line, "CONTROLFAILS") == 0) {
1267                         status = control_failures_parse(ctdb);
1268                 } else {
1269                         fprintf(stderr, "Unknown line %s\n", line);
1270                         status = false;
1271                 }
1272
1273                 if (! status) {
1274                         goto fail;
1275                 }
1276         }
1277
1278         ctdb->start_time = tevent_timeval_current();
1279         ctdb->recovery_start_time = tevent_timeval_current();
1280         ctdb->vnn_map->recmode = CTDB_RECOVERY_NORMAL;
1281         if (ctdb->vnn_map->generation == INVALID_GENERATION) {
1282                 ctdb->vnn_map->generation =
1283                         new_generation(ctdb->vnn_map->generation);
1284         }
1285         ctdb->recovery_end_time = tevent_timeval_current();
1286
1287         ctdb->log_level = DEBUG_ERR;
1288         ctdb->runstate = CTDB_RUNSTATE_RUNNING;
1289
1290         ctdb_tunable_set_defaults(&ctdb->tun_list);
1291
1292         return ctdb;
1293
1294 fail:
1295         TALLOC_FREE(ctdb);
1296         return NULL;
1297 }
1298
1299 static bool ctdbd_verify(struct ctdbd_context *ctdb)
1300 {
1301         struct node *node;
1302         int i;
1303
1304         if (ctdb->node_map->num_nodes == 0) {
1305                 return true;
1306         }
1307
1308         /* Make sure all the nodes are in order */
1309         for (i=0; i<ctdb->node_map->num_nodes; i++) {
1310                 node = &ctdb->node_map->node[i];
1311                 if (node->pnn != i) {
1312                         fprintf(stderr, "Expected node %u, found %u\n",
1313                                 i, node->pnn);
1314                         return false;
1315                 }
1316         }
1317
1318         node = &ctdb->node_map->node[ctdb->node_map->pnn];
1319         if (node->flags & NODE_FLAGS_DISCONNECTED) {
1320                 DEBUG(DEBUG_INFO, ("Node disconnected, exiting\n"));
1321                 exit(0);
1322         }
1323
1324         return true;
1325 }
1326
1327 /*
1328  * Doing a recovery
1329  */
1330
1331 struct recover_state {
1332         struct tevent_context *ev;
1333         struct ctdbd_context *ctdb;
1334 };
1335
1336 static int recover_check(struct tevent_req *req);
1337 static void recover_wait_done(struct tevent_req *subreq);
1338 static void recover_done(struct tevent_req *subreq);
1339
1340 static struct tevent_req *recover_send(TALLOC_CTX *mem_ctx,
1341                                        struct tevent_context *ev,
1342                                        struct ctdbd_context *ctdb)
1343 {
1344         struct tevent_req *req;
1345         struct recover_state *state;
1346         int ret;
1347
1348         req = tevent_req_create(mem_ctx, &state, struct recover_state);
1349         if (req == NULL) {
1350                 return NULL;
1351         }
1352
1353         state->ev = ev;
1354         state->ctdb = ctdb;
1355
1356         ret = recover_check(req);
1357         if (ret != 0) {
1358                 tevent_req_error(req, ret);
1359                 return tevent_req_post(req, ev);
1360         }
1361
1362         return req;
1363 }
1364
1365 static int recover_check(struct tevent_req *req)
1366 {
1367         struct recover_state *state = tevent_req_data(
1368                 req, struct recover_state);
1369         struct ctdbd_context *ctdb = state->ctdb;
1370         struct tevent_req *subreq;
1371         bool recovery_disabled;
1372         int i;
1373
1374         recovery_disabled = false;
1375         for (i=0; i<ctdb->node_map->num_nodes; i++) {
1376                 if (ctdb->node_map->node[i].recovery_disabled) {
1377                         recovery_disabled = true;
1378                         break;
1379                 }
1380         }
1381
1382         subreq = tevent_wakeup_send(state, state->ev,
1383                                     tevent_timeval_current_ofs(1, 0));
1384         if (subreq == NULL) {
1385                 return ENOMEM;
1386         }
1387
1388         if (recovery_disabled) {
1389                 tevent_req_set_callback(subreq, recover_wait_done, req);
1390         } else {
1391                 ctdb->recovery_start_time = tevent_timeval_current();
1392                 tevent_req_set_callback(subreq, recover_done, req);
1393         }
1394
1395         return 0;
1396 }
1397
1398 static void recover_wait_done(struct tevent_req *subreq)
1399 {
1400         struct tevent_req *req = tevent_req_callback_data(
1401                 subreq, struct tevent_req);
1402         int ret;
1403         bool status;
1404
1405         status = tevent_wakeup_recv(subreq);
1406         TALLOC_FREE(subreq);
1407         if (! status) {
1408                 tevent_req_error(req, EIO);
1409                 return;
1410         }
1411
1412         ret = recover_check(req);
1413         if (ret != 0) {
1414                 tevent_req_error(req, ret);
1415         }
1416 }
1417
1418 static void recover_done(struct tevent_req *subreq)
1419 {
1420         struct tevent_req *req = tevent_req_callback_data(
1421                 subreq, struct tevent_req);
1422         struct recover_state *state = tevent_req_data(
1423                 req, struct recover_state);
1424         struct ctdbd_context *ctdb = state->ctdb;
1425         bool status;
1426
1427         status = tevent_wakeup_recv(subreq);
1428         TALLOC_FREE(subreq);
1429         if (! status) {
1430                 tevent_req_error(req, EIO);
1431                 return;
1432         }
1433
1434         ctdb->vnn_map->recmode = CTDB_RECOVERY_NORMAL;
1435         ctdb->recovery_end_time = tevent_timeval_current();
1436         ctdb->vnn_map->generation = new_generation(ctdb->vnn_map->generation);
1437
1438         tevent_req_done(req);
1439 }
1440
1441 static bool recover_recv(struct tevent_req *req, int *perr)
1442 {
1443         int err;
1444
1445         if (tevent_req_is_unix_error(req, &err)) {
1446                 if (perr != NULL) {
1447                         *perr = err;
1448                 }
1449                 return false;
1450         }
1451
1452         return true;
1453 }
1454
1455 /*
1456  * Routines for ctdb_req_header
1457  */
1458
1459 static void header_fix_pnn(struct ctdb_req_header *header,
1460                            struct ctdbd_context *ctdb)
1461 {
1462         if (header->srcnode == CTDB_CURRENT_NODE) {
1463                 header->srcnode = ctdb->node_map->pnn;
1464         }
1465
1466         if (header->destnode == CTDB_CURRENT_NODE) {
1467                 header->destnode = ctdb->node_map->pnn;
1468         }
1469 }
1470
1471 static struct ctdb_req_header header_reply_call(
1472                                         struct ctdb_req_header *header,
1473                                         struct ctdbd_context *ctdb)
1474 {
1475         struct ctdb_req_header reply_header;
1476
1477         reply_header = (struct ctdb_req_header) {
1478                 .ctdb_magic = CTDB_MAGIC,
1479                 .ctdb_version = CTDB_PROTOCOL,
1480                 .generation = ctdb->vnn_map->generation,
1481                 .operation = CTDB_REPLY_CALL,
1482                 .destnode = header->srcnode,
1483                 .srcnode = header->destnode,
1484                 .reqid = header->reqid,
1485         };
1486
1487         return reply_header;
1488 }
1489
1490 static struct ctdb_req_header header_reply_control(
1491                                         struct ctdb_req_header *header,
1492                                         struct ctdbd_context *ctdb)
1493 {
1494         struct ctdb_req_header reply_header;
1495
1496         reply_header = (struct ctdb_req_header) {
1497                 .ctdb_magic = CTDB_MAGIC,
1498                 .ctdb_version = CTDB_PROTOCOL,
1499                 .generation = ctdb->vnn_map->generation,
1500                 .operation = CTDB_REPLY_CONTROL,
1501                 .destnode = header->srcnode,
1502                 .srcnode = header->destnode,
1503                 .reqid = header->reqid,
1504         };
1505
1506         return reply_header;
1507 }
1508
1509 static struct ctdb_req_header header_reply_message(
1510                                         struct ctdb_req_header *header,
1511                                         struct ctdbd_context *ctdb)
1512 {
1513         struct ctdb_req_header reply_header;
1514
1515         reply_header = (struct ctdb_req_header) {
1516                 .ctdb_magic = CTDB_MAGIC,
1517                 .ctdb_version = CTDB_PROTOCOL,
1518                 .generation = ctdb->vnn_map->generation,
1519                 .operation = CTDB_REQ_MESSAGE,
1520                 .destnode = header->srcnode,
1521                 .srcnode = header->destnode,
1522                 .reqid = 0,
1523         };
1524
1525         return reply_header;
1526 }
1527
1528 /*
1529  * Client state
1530  */
1531
1532 struct client_state {
1533         struct tevent_context *ev;
1534         int fd;
1535         struct ctdbd_context *ctdb;
1536         int pnn;
1537         pid_t pid;
1538         struct comm_context *comm;
1539         struct srvid_register_state *rstate;
1540         int status;
1541 };
1542
1543 /*
1544  * Send replies to call, controls and messages
1545  */
1546
1547 static void client_reply_done(struct tevent_req *subreq);
1548
1549 static void client_send_call(struct tevent_req *req,
1550                              struct ctdb_req_header *header,
1551                              struct ctdb_reply_call *reply)
1552 {
1553         struct client_state *state = tevent_req_data(
1554                 req, struct client_state);
1555         struct ctdbd_context *ctdb = state->ctdb;
1556         struct tevent_req *subreq;
1557         struct ctdb_req_header reply_header;
1558         uint8_t *buf;
1559         size_t datalen, buflen;
1560         int ret;
1561
1562         reply_header = header_reply_call(header, ctdb);
1563
1564         datalen = ctdb_reply_call_len(&reply_header, reply);
1565         ret = ctdb_allocate_pkt(state, datalen, &buf, &buflen);
1566         if (ret != 0) {
1567                 tevent_req_error(req, ret);
1568                 return;
1569         }
1570
1571         ret = ctdb_reply_call_push(&reply_header, reply, buf, &buflen);
1572         if (ret != 0) {
1573                 tevent_req_error(req, ret);
1574                 return;
1575         }
1576
1577         subreq = comm_write_send(state, state->ev, state->comm, buf, buflen);
1578         if (tevent_req_nomem(subreq, req)) {
1579                 return;
1580         }
1581         tevent_req_set_callback(subreq, client_reply_done, req);
1582
1583         talloc_steal(subreq, buf);
1584 }
1585
1586 static void client_send_message(struct tevent_req *req,
1587                                 struct ctdb_req_header *header,
1588                                 struct ctdb_req_message_data *message)
1589 {
1590         struct client_state *state = tevent_req_data(
1591                 req, struct client_state);
1592         struct ctdbd_context *ctdb = state->ctdb;
1593         struct tevent_req *subreq;
1594         struct ctdb_req_header reply_header;
1595         uint8_t *buf;
1596         size_t datalen, buflen;
1597         int ret;
1598
1599         reply_header = header_reply_message(header, ctdb);
1600
1601         datalen = ctdb_req_message_data_len(&reply_header, message);
1602         ret = ctdb_allocate_pkt(state, datalen, &buf, &buflen);
1603         if (ret != 0) {
1604                 tevent_req_error(req, ret);
1605                 return;
1606         }
1607
1608         ret = ctdb_req_message_data_push(&reply_header, message,
1609                                          buf, &buflen);
1610         if (ret != 0) {
1611                 tevent_req_error(req, ret);
1612                 return;
1613         }
1614
1615         DEBUG(DEBUG_INFO, ("message srvid = 0x%"PRIx64"\n", message->srvid));
1616
1617         subreq = comm_write_send(state, state->ev, state->comm, buf, buflen);
1618         if (tevent_req_nomem(subreq, req)) {
1619                 return;
1620         }
1621         tevent_req_set_callback(subreq, client_reply_done, req);
1622
1623         talloc_steal(subreq, buf);
1624 }
1625
1626 static void client_send_control(struct tevent_req *req,
1627                                 struct ctdb_req_header *header,
1628                                 struct ctdb_reply_control *reply)
1629 {
1630         struct client_state *state = tevent_req_data(
1631                 req, struct client_state);
1632         struct ctdbd_context *ctdb = state->ctdb;
1633         struct tevent_req *subreq;
1634         struct ctdb_req_header reply_header;
1635         uint8_t *buf;
1636         size_t datalen, buflen;
1637         int ret;
1638
1639         reply_header = header_reply_control(header, ctdb);
1640
1641         datalen = ctdb_reply_control_len(&reply_header, reply);
1642         ret = ctdb_allocate_pkt(state, datalen, &buf, &buflen);
1643         if (ret != 0) {
1644                 tevent_req_error(req, ret);
1645                 return;
1646         }
1647
1648         ret = ctdb_reply_control_push(&reply_header, reply, buf, &buflen);
1649         if (ret != 0) {
1650                 tevent_req_error(req, ret);
1651                 return;
1652         }
1653
1654         DEBUG(DEBUG_INFO, ("reply opcode = %u\n", reply->rdata.opcode));
1655
1656         subreq = comm_write_send(state, state->ev, state->comm, buf, buflen);
1657         if (tevent_req_nomem(subreq, req)) {
1658                 return;
1659         }
1660         tevent_req_set_callback(subreq, client_reply_done, req);
1661
1662         talloc_steal(subreq, buf);
1663 }
1664
1665 static void client_reply_done(struct tevent_req *subreq)
1666 {
1667         struct tevent_req *req = tevent_req_callback_data(
1668                 subreq, struct tevent_req);
1669         int ret;
1670         bool status;
1671
1672         status = comm_write_recv(subreq, &ret);
1673         TALLOC_FREE(subreq);
1674         if (! status) {
1675                 tevent_req_error(req, ret);
1676         }
1677 }
1678
1679 /*
1680  * Handling protocol - controls
1681  */
1682
1683 static void control_process_exists(TALLOC_CTX *mem_ctx,
1684                                    struct tevent_req *req,
1685                                    struct ctdb_req_header *header,
1686                                    struct ctdb_req_control *request)
1687 {
1688         struct client_state *state = tevent_req_data(
1689                 req, struct client_state);
1690         struct ctdbd_context *ctdb = state->ctdb;
1691         struct client_state *cstate;
1692         struct ctdb_reply_control reply;
1693
1694         reply.rdata.opcode = request->opcode;
1695
1696         cstate = client_find(ctdb, request->rdata.data.pid);
1697         if (cstate == NULL) {
1698                 reply.status = -1;
1699                 reply.errmsg = "No client for PID";
1700         } else {
1701                 reply.status = kill(request->rdata.data.pid, 0);
1702                 reply.errmsg = NULL;
1703         }
1704
1705         client_send_control(req, header, &reply);
1706 }
1707
1708 static void control_ping(TALLOC_CTX *mem_ctx,
1709                          struct tevent_req *req,
1710                          struct ctdb_req_header *header,
1711                          struct ctdb_req_control *request)
1712 {
1713         struct client_state *state = tevent_req_data(
1714                 req, struct client_state);
1715         struct ctdbd_context *ctdb = state->ctdb;
1716         struct ctdb_reply_control reply;
1717
1718         reply.rdata.opcode = request->opcode;
1719         reply.status = ctdb->num_clients;
1720         reply.errmsg = NULL;
1721
1722         client_send_control(req, header, &reply);
1723 }
1724
1725 static void control_getdbpath(TALLOC_CTX *mem_ctx,
1726                               struct tevent_req *req,
1727                               struct ctdb_req_header *header,
1728                               struct ctdb_req_control *request)
1729 {
1730         struct client_state *state = tevent_req_data(
1731                 req, struct client_state);
1732         struct ctdbd_context *ctdb = state->ctdb;
1733         struct ctdb_reply_control reply;
1734         struct database *db;
1735
1736         reply.rdata.opcode = request->opcode;
1737
1738         db = database_find(ctdb->db_map, request->rdata.data.db_id);
1739         if (db == NULL) {
1740                 reply.status = ENOENT;
1741                 reply.errmsg = "Database not found";
1742         } else {
1743                 reply.rdata.data.db_path =
1744                         talloc_strdup(mem_ctx, db->path);
1745                 if (reply.rdata.data.db_path == NULL) {
1746                         reply.status = ENOMEM;
1747                         reply.errmsg = "Memory error";
1748                 } else {
1749                         reply.status = 0;
1750                         reply.errmsg = NULL;
1751                 }
1752         }
1753
1754         client_send_control(req, header, &reply);
1755 }
1756
1757 static void control_getvnnmap(TALLOC_CTX *mem_ctx,
1758                               struct tevent_req *req,
1759                               struct ctdb_req_header *header,
1760                               struct ctdb_req_control *request)
1761 {
1762         struct client_state *state = tevent_req_data(
1763                 req, struct client_state);
1764         struct ctdbd_context *ctdb = state->ctdb;
1765         struct ctdb_reply_control reply;
1766         struct ctdb_vnn_map *vnnmap;
1767
1768         reply.rdata.opcode = request->opcode;
1769
1770         vnnmap = talloc_zero(mem_ctx, struct ctdb_vnn_map);
1771         if (vnnmap == NULL) {
1772                 reply.status = ENOMEM;
1773                 reply.errmsg = "Memory error";
1774         } else {
1775                 vnnmap->generation = ctdb->vnn_map->generation;
1776                 vnnmap->size = ctdb->vnn_map->size;
1777                 vnnmap->map = ctdb->vnn_map->map;
1778
1779                 reply.rdata.data.vnnmap = vnnmap;
1780                 reply.status = 0;
1781                 reply.errmsg = NULL;
1782         }
1783
1784         client_send_control(req, header, &reply);
1785 }
1786
1787 static void control_get_debug(TALLOC_CTX *mem_ctx,
1788                               struct tevent_req *req,
1789                               struct ctdb_req_header *header,
1790                               struct ctdb_req_control *request)
1791 {
1792         struct client_state *state = tevent_req_data(
1793                 req, struct client_state);
1794         struct ctdbd_context *ctdb = state->ctdb;
1795         struct ctdb_reply_control reply;
1796
1797         reply.rdata.opcode = request->opcode;
1798         reply.rdata.data.loglevel = (uint32_t)ctdb->log_level;
1799         reply.status = 0;
1800         reply.errmsg = NULL;
1801
1802         client_send_control(req, header, &reply);
1803 }
1804
1805 static void control_set_debug(TALLOC_CTX *mem_ctx,
1806                               struct tevent_req *req,
1807                               struct ctdb_req_header *header,
1808                               struct ctdb_req_control *request)
1809 {
1810         struct client_state *state = tevent_req_data(
1811                 req, struct client_state);
1812         struct ctdbd_context *ctdb = state->ctdb;
1813         struct ctdb_reply_control reply;
1814
1815         ctdb->log_level = (int)request->rdata.data.loglevel;
1816
1817         reply.rdata.opcode = request->opcode;
1818         reply.status = 0;
1819         reply.errmsg = NULL;
1820
1821         client_send_control(req, header, &reply);
1822 }
1823
1824 static void control_get_dbmap(TALLOC_CTX *mem_ctx,
1825                               struct tevent_req *req,
1826                                struct ctdb_req_header *header,
1827                               struct ctdb_req_control *request)
1828 {
1829         struct client_state *state = tevent_req_data(
1830                 req, struct client_state);
1831         struct ctdbd_context *ctdb = state->ctdb;
1832         struct ctdb_reply_control reply;
1833         struct ctdb_dbid_map *dbmap;
1834         struct database *db;
1835         int i;
1836
1837         reply.rdata.opcode = request->opcode;
1838
1839         dbmap = talloc_zero(mem_ctx, struct ctdb_dbid_map);
1840         if (dbmap == NULL) {
1841                 goto fail;
1842         }
1843
1844         dbmap->num = database_count(ctdb->db_map);
1845         dbmap->dbs = talloc_zero_array(dbmap, struct ctdb_dbid, dbmap->num);
1846         if (dbmap->dbs == NULL) {
1847                 goto fail;
1848         }
1849
1850         db = ctdb->db_map->db;
1851         for (i = 0; i < dbmap->num; i++) {
1852                 dbmap->dbs[i] = (struct ctdb_dbid) {
1853                         .db_id = db->id,
1854                         .flags = db->flags,
1855                 };
1856
1857                 db = db->next;
1858         }
1859
1860         reply.rdata.data.dbmap = dbmap;
1861         reply.status = 0;
1862         reply.errmsg = NULL;
1863         client_send_control(req, header, &reply);
1864         return;
1865
1866 fail:
1867         reply.status = -1;
1868         reply.errmsg = "Memory error";
1869         client_send_control(req, header, &reply);
1870 }
1871
1872 static void control_get_recmode(TALLOC_CTX *mem_ctx,
1873                                 struct tevent_req *req,
1874                                 struct ctdb_req_header *header,
1875                                 struct ctdb_req_control *request)
1876 {
1877         struct client_state *state = tevent_req_data(
1878                 req, struct client_state);
1879         struct ctdbd_context *ctdb = state->ctdb;
1880         struct ctdb_reply_control reply;
1881
1882         reply.rdata.opcode = request->opcode;
1883         reply.status = ctdb->vnn_map->recmode;
1884         reply.errmsg = NULL;
1885
1886         client_send_control(req, header, &reply);
1887 }
1888
1889 struct set_recmode_state {
1890         struct tevent_req *req;
1891         struct ctdbd_context *ctdb;
1892         struct ctdb_req_header header;
1893         struct ctdb_reply_control reply;
1894 };
1895
1896 static void set_recmode_callback(struct tevent_req *subreq)
1897 {
1898         struct set_recmode_state *substate = tevent_req_callback_data(
1899                 subreq, struct set_recmode_state);
1900         bool status;
1901         int ret;
1902
1903         status = recover_recv(subreq, &ret);
1904         TALLOC_FREE(subreq);
1905         if (! status) {
1906                 substate->reply.status = ret;
1907                 substate->reply.errmsg = "recovery failed";
1908         } else {
1909                 substate->reply.status = 0;
1910                 substate->reply.errmsg = NULL;
1911         }
1912
1913         client_send_control(substate->req, &substate->header, &substate->reply);
1914         talloc_free(substate);
1915 }
1916
1917 static void control_set_recmode(TALLOC_CTX *mem_ctx,
1918                                 struct tevent_req *req,
1919                                 struct ctdb_req_header *header,
1920                                 struct ctdb_req_control *request)
1921 {
1922         struct client_state *state = tevent_req_data(
1923                 req, struct client_state);
1924         struct tevent_req *subreq;
1925         struct ctdbd_context *ctdb = state->ctdb;
1926         struct set_recmode_state *substate;
1927         struct ctdb_reply_control reply;
1928
1929         reply.rdata.opcode = request->opcode;
1930
1931         if (request->rdata.data.recmode == CTDB_RECOVERY_NORMAL) {
1932                 reply.status = -1;
1933                 reply.errmsg = "Client cannot set recmode to NORMAL";
1934                 goto fail;
1935         }
1936
1937         substate = talloc_zero(ctdb, struct set_recmode_state);
1938         if (substate == NULL) {
1939                 reply.status = -1;
1940                 reply.errmsg = "Memory error";
1941                 goto fail;
1942         }
1943
1944         substate->req = req;
1945         substate->ctdb = ctdb;
1946         substate->header = *header;
1947         substate->reply.rdata.opcode = request->opcode;
1948
1949         subreq = recover_send(substate, state->ev, state->ctdb);
1950         if (subreq == NULL) {
1951                 talloc_free(substate);
1952                 goto fail;
1953         }
1954         tevent_req_set_callback(subreq, set_recmode_callback, substate);
1955
1956         ctdb->vnn_map->recmode = CTDB_RECOVERY_ACTIVE;
1957         return;
1958
1959 fail:
1960         client_send_control(req, header, &reply);
1961
1962 }
1963
1964 static void control_db_attach(TALLOC_CTX *mem_ctx,
1965                               struct tevent_req *req,
1966                               struct ctdb_req_header *header,
1967                               struct ctdb_req_control *request)
1968 {
1969         struct client_state *state = tevent_req_data(
1970                 req, struct client_state);
1971         struct ctdbd_context *ctdb = state->ctdb;
1972         struct ctdb_reply_control reply;
1973         struct database *db;
1974
1975         reply.rdata.opcode = request->opcode;
1976
1977         for (db = ctdb->db_map->db; db != NULL; db = db->next) {
1978                 if (strcmp(db->name, request->rdata.data.db_name) == 0) {
1979                         goto done;
1980                 }
1981         }
1982
1983         db = database_new(ctdb->db_map, request->rdata.data.db_name, 0);
1984         if (db == NULL) {
1985                 reply.status = -1;
1986                 reply.errmsg = "Failed to attach database";
1987                 client_send_control(req, header, &reply);
1988                 return;
1989         }
1990
1991 done:
1992         reply.rdata.data.db_id = db->id;
1993         reply.status = 0;
1994         reply.errmsg = NULL;
1995         client_send_control(req, header, &reply);
1996 }
1997
1998 static void srvid_handler(uint64_t srvid, TDB_DATA data, void *private_data)
1999 {
2000         printf("Received a message for SRVID 0x%"PRIx64"\n", srvid);
2001 }
2002
2003 static void control_register_srvid(TALLOC_CTX *mem_ctx,
2004                                    struct tevent_req *req,
2005                                    struct ctdb_req_header *header,
2006                                    struct ctdb_req_control *request)
2007 {
2008         struct client_state *state = tevent_req_data(
2009                 req, struct client_state);
2010         struct ctdbd_context *ctdb = state->ctdb;
2011         struct ctdb_reply_control reply;
2012         int ret;
2013
2014         reply.rdata.opcode = request->opcode;
2015
2016         ret = srvid_register(ctdb->srv, state, request->srvid,
2017                              srvid_handler, state);
2018         if (ret != 0) {
2019                 reply.status = -1;
2020                 reply.errmsg = "Memory error";
2021                 goto fail;
2022         }
2023
2024         DEBUG(DEBUG_INFO, ("Register srvid 0x%"PRIx64"\n", request->srvid));
2025
2026         reply.status = 0;
2027         reply.errmsg = NULL;
2028
2029 fail:
2030         client_send_control(req, header, &reply);
2031 }
2032
2033 static void control_deregister_srvid(TALLOC_CTX *mem_ctx,
2034                                      struct tevent_req *req,
2035                                      struct ctdb_req_header *header,
2036                                      struct ctdb_req_control *request)
2037 {
2038         struct client_state *state = tevent_req_data(
2039                 req, struct client_state);
2040         struct ctdbd_context *ctdb = state->ctdb;
2041         struct ctdb_reply_control reply;
2042         int ret;
2043
2044         reply.rdata.opcode = request->opcode;
2045
2046         ret = srvid_deregister(ctdb->srv, request->srvid, state);
2047         if (ret != 0) {
2048                 reply.status = -1;
2049                 reply.errmsg = "srvid not registered";
2050                 goto fail;
2051         }
2052
2053         DEBUG(DEBUG_INFO, ("Deregister srvid 0x%"PRIx64"\n", request->srvid));
2054
2055         reply.status = 0;
2056         reply.errmsg = NULL;
2057
2058 fail:
2059         client_send_control(req, header, &reply);
2060 }
2061
2062 static void control_get_dbname(TALLOC_CTX *mem_ctx,
2063                                struct tevent_req *req,
2064                                struct ctdb_req_header *header,
2065                                struct ctdb_req_control *request)
2066 {
2067         struct client_state *state = tevent_req_data(
2068                 req, struct client_state);
2069         struct ctdbd_context *ctdb = state->ctdb;
2070         struct ctdb_reply_control reply;
2071         struct database *db;
2072
2073         reply.rdata.opcode = request->opcode;
2074
2075         db = database_find(ctdb->db_map, request->rdata.data.db_id);
2076         if (db == NULL) {
2077                 reply.status = ENOENT;
2078                 reply.errmsg = "Database not found";
2079         } else {
2080                 reply.rdata.data.db_name = talloc_strdup(mem_ctx, db->name);
2081                 if (reply.rdata.data.db_name == NULL) {
2082                         reply.status = ENOMEM;
2083                         reply.errmsg = "Memory error";
2084                 } else {
2085                         reply.status = 0;
2086                         reply.errmsg = NULL;
2087                 }
2088         }
2089
2090         client_send_control(req, header, &reply);
2091 }
2092
2093 static void control_get_pid(TALLOC_CTX *mem_ctx,
2094                             struct tevent_req *req,
2095                             struct ctdb_req_header *header,
2096                             struct ctdb_req_control *request)
2097 {
2098         struct ctdb_reply_control reply;
2099
2100         reply.rdata.opcode = request->opcode;
2101         reply.status = getpid();
2102         reply.errmsg = NULL;
2103
2104         client_send_control(req, header, &reply);
2105 }
2106
2107 static void control_get_recmaster(TALLOC_CTX *mem_ctx,
2108                                   struct tevent_req *req,
2109                                   struct ctdb_req_header *header,
2110                                   struct ctdb_req_control *request)
2111 {
2112         struct client_state *state = tevent_req_data(
2113                 req, struct client_state);
2114         struct ctdbd_context *ctdb = state->ctdb;
2115         struct ctdb_reply_control reply;
2116
2117         reply.rdata.opcode = request->opcode;
2118         reply.status = ctdb->node_map->recmaster;
2119         reply.errmsg = NULL;
2120
2121         client_send_control(req, header, &reply);
2122 }
2123
2124 static void control_get_pnn(TALLOC_CTX *mem_ctx,
2125                             struct tevent_req *req,
2126                             struct ctdb_req_header *header,
2127                             struct ctdb_req_control *request)
2128 {
2129         struct ctdb_reply_control reply;
2130
2131         reply.rdata.opcode = request->opcode;
2132         reply.status = header->destnode;
2133         reply.errmsg = NULL;
2134
2135         client_send_control(req, header, &reply);
2136 }
2137
2138 static void control_shutdown(TALLOC_CTX *mem_ctx,
2139                              struct tevent_req *req,
2140                              struct ctdb_req_header *hdr,
2141                              struct ctdb_req_control *request)
2142 {
2143         struct client_state *state = tevent_req_data(
2144                 req, struct client_state);
2145
2146         state->status = 99;
2147 }
2148
2149 static void control_set_tunable(TALLOC_CTX *mem_ctx,
2150                                 struct tevent_req *req,
2151                                 struct ctdb_req_header *header,
2152                                 struct ctdb_req_control *request)
2153 {
2154         struct client_state *state = tevent_req_data(
2155                 req, struct client_state);
2156         struct ctdbd_context *ctdb = state->ctdb;
2157         struct ctdb_reply_control reply;
2158         bool ret, obsolete;
2159
2160         reply.rdata.opcode = request->opcode;
2161         reply.errmsg = NULL;
2162
2163         ret = ctdb_tunable_set_value(&ctdb->tun_list,
2164                                      request->rdata.data.tunable->name,
2165                                      request->rdata.data.tunable->value,
2166                                      &obsolete);
2167         if (! ret) {
2168                 reply.status = -1;
2169         } else if (obsolete) {
2170                 reply.status = 1;
2171         } else {
2172                 reply.status = 0;
2173         }
2174
2175         client_send_control(req, header, &reply);
2176 }
2177
2178 static void control_get_tunable(TALLOC_CTX *mem_ctx,
2179                                 struct tevent_req *req,
2180                                 struct ctdb_req_header *header,
2181                                 struct ctdb_req_control *request)
2182 {
2183         struct client_state *state = tevent_req_data(
2184                 req, struct client_state);
2185         struct ctdbd_context *ctdb = state->ctdb;
2186         struct ctdb_reply_control reply;
2187         uint32_t value;
2188         bool ret;
2189
2190         reply.rdata.opcode = request->opcode;
2191         reply.errmsg = NULL;
2192
2193         ret = ctdb_tunable_get_value(&ctdb->tun_list,
2194                                      request->rdata.data.tun_var, &value);
2195         if (! ret) {
2196                 reply.status = -1;
2197         } else {
2198                 reply.rdata.data.tun_value = value;
2199                 reply.status = 0;
2200         }
2201
2202         client_send_control(req, header, &reply);
2203 }
2204
2205 static void control_list_tunables(TALLOC_CTX *mem_ctx,
2206                                   struct tevent_req *req,
2207                                   struct ctdb_req_header *header,
2208                                   struct ctdb_req_control *request)
2209 {
2210         struct ctdb_reply_control reply;
2211         struct ctdb_var_list *var_list;
2212
2213         reply.rdata.opcode = request->opcode;
2214         reply.errmsg = NULL;
2215
2216         var_list = ctdb_tunable_names(mem_ctx);
2217         if (var_list == NULL) {
2218                 reply.status = -1;
2219         } else {
2220                 reply.rdata.data.tun_var_list = var_list;
2221                 reply.status = 0;
2222         }
2223
2224         client_send_control(req, header, &reply);
2225 }
2226
2227 static void control_modify_flags(TALLOC_CTX *mem_ctx,
2228                                  struct tevent_req *req,
2229                                  struct ctdb_req_header *header,
2230                                  struct ctdb_req_control *request)
2231 {
2232         struct client_state *state = tevent_req_data(
2233                 req, struct client_state);
2234         struct ctdbd_context *ctdb = state->ctdb;
2235         struct ctdb_node_flag_change *change = request->rdata.data.flag_change;
2236         struct ctdb_reply_control reply;
2237         struct node *node;
2238
2239         reply.rdata.opcode = request->opcode;
2240
2241         if ((change->old_flags & ~NODE_FLAGS_PERMANENTLY_DISABLED) ||
2242             (change->new_flags & ~NODE_FLAGS_PERMANENTLY_DISABLED) != 0) {
2243                 DEBUG(DEBUG_INFO,
2244                       ("MODIFY_FLAGS control not for PERMANENTLY_DISABLED\n"));
2245                 reply.status = EINVAL;
2246                 reply.errmsg = "Failed to MODIFY_FLAGS";
2247                 client_send_control(req, header, &reply);
2248                 return;
2249         }
2250
2251         /* There's all sorts of broadcast weirdness here.  Only change
2252          * the specified node, not the destination node of the
2253          * control. */
2254         node = &ctdb->node_map->node[change->pnn];
2255
2256         if ((node->flags &
2257              change->old_flags & NODE_FLAGS_PERMANENTLY_DISABLED) == 0 &&
2258             (change->new_flags & NODE_FLAGS_PERMANENTLY_DISABLED) != 0) {
2259                 DEBUG(DEBUG_INFO,("Disabling node %d\n", header->destnode));
2260                 node->flags |= NODE_FLAGS_PERMANENTLY_DISABLED;
2261                 goto done;
2262         }
2263
2264         if ((node->flags &
2265              change->old_flags & NODE_FLAGS_PERMANENTLY_DISABLED) != 0 &&
2266             (change->new_flags & NODE_FLAGS_PERMANENTLY_DISABLED) == 0) {
2267                 DEBUG(DEBUG_INFO,("Enabling node %d\n", header->destnode));
2268                 node->flags &= ~NODE_FLAGS_PERMANENTLY_DISABLED;
2269                 goto done;
2270         }
2271
2272         DEBUG(DEBUG_INFO, ("Flags unchanged for node %d\n", header->destnode));
2273
2274 done:
2275         reply.status = 0;
2276         reply.errmsg = NULL;
2277         client_send_control(req, header, &reply);
2278 }
2279
2280 static void control_get_all_tunables(TALLOC_CTX *mem_ctx,
2281                                      struct tevent_req *req,
2282                                      struct ctdb_req_header *header,
2283                                      struct ctdb_req_control *request)
2284 {
2285         struct client_state *state = tevent_req_data(
2286                 req, struct client_state);
2287         struct ctdbd_context *ctdb = state->ctdb;
2288         struct ctdb_reply_control reply;
2289
2290         reply.rdata.opcode = request->opcode;
2291         reply.rdata.data.tun_list = &ctdb->tun_list;
2292         reply.status = 0;
2293         reply.errmsg = NULL;
2294
2295         client_send_control(req, header, &reply);
2296 }
2297
2298 static void control_db_attach_persistent(TALLOC_CTX *mem_ctx,
2299                                          struct tevent_req *req,
2300                                          struct ctdb_req_header *header,
2301                                          struct ctdb_req_control *request)
2302 {
2303         struct client_state *state = tevent_req_data(
2304                 req, struct client_state);
2305         struct ctdbd_context *ctdb = state->ctdb;
2306         struct ctdb_reply_control reply;
2307         struct database *db;
2308
2309         reply.rdata.opcode = request->opcode;
2310
2311         for (db = ctdb->db_map->db; db != NULL; db = db->next) {
2312                 if (strcmp(db->name, request->rdata.data.db_name) == 0) {
2313                         goto done;
2314                 }
2315         }
2316
2317         db = database_new(ctdb->db_map, request->rdata.data.db_name,
2318                           CTDB_DB_FLAGS_PERSISTENT);
2319         if (db == NULL) {
2320                 reply.status = -1;
2321                 reply.errmsg = "Failed to attach database";
2322                 client_send_control(req, header, &reply);
2323                 return;
2324         }
2325
2326 done:
2327         reply.rdata.data.db_id = db->id;
2328         reply.status = 0;
2329         reply.errmsg = NULL;
2330         client_send_control(req, header, &reply);
2331 }
2332
2333 static void control_uptime(TALLOC_CTX *mem_ctx,
2334                            struct tevent_req *req,
2335                            struct ctdb_req_header *header,
2336                            struct ctdb_req_control *request)
2337 {
2338         struct client_state *state = tevent_req_data(
2339                 req, struct client_state);
2340         struct ctdbd_context *ctdb = state->ctdb;
2341         struct ctdb_reply_control reply;
2342         struct ctdb_uptime *uptime;;
2343
2344         reply.rdata.opcode = request->opcode;
2345
2346         uptime = talloc_zero(mem_ctx, struct ctdb_uptime);
2347         if (uptime == NULL) {
2348                 goto fail;
2349         }
2350
2351         uptime->current_time = tevent_timeval_current();
2352         uptime->ctdbd_start_time = ctdb->start_time;
2353         uptime->last_recovery_started = ctdb->recovery_start_time;
2354         uptime->last_recovery_finished = ctdb->recovery_end_time;
2355
2356         reply.rdata.data.uptime = uptime;
2357         reply.status = 0;
2358         reply.errmsg = NULL;
2359         client_send_control(req, header, &reply);
2360         return;
2361
2362 fail:
2363         reply.status = -1;
2364         reply.errmsg = "Memory error";
2365         client_send_control(req, header, &reply);
2366 }
2367
2368 static void control_reload_nodes_file(TALLOC_CTX *mem_ctx,
2369                                       struct tevent_req *req,
2370                                       struct ctdb_req_header *header,
2371                                       struct ctdb_req_control *request)
2372 {
2373         struct client_state *state = tevent_req_data(
2374                 req, struct client_state);
2375         struct ctdbd_context *ctdb = state->ctdb;
2376         struct ctdb_reply_control reply;
2377         struct ctdb_node_map *nodemap;
2378         struct node_map *node_map = ctdb->node_map;
2379         int i;
2380
2381         reply.rdata.opcode = request->opcode;
2382
2383         nodemap = read_nodes_file(mem_ctx, header->destnode);
2384         if (nodemap == NULL) {
2385                 goto fail;
2386         }
2387
2388         for (i=0; i<nodemap->num; i++) {
2389                 struct node *node;
2390
2391                 if (i < node_map->num_nodes &&
2392                     ctdb_sock_addr_same(&nodemap->node[i].addr,
2393                                         &node_map->node[i].addr)) {
2394                         continue;
2395                 }
2396
2397                 if (nodemap->node[i].flags & NODE_FLAGS_DELETED) {
2398                         int ret;
2399
2400                         node = &node_map->node[i];
2401
2402                         node->flags |= NODE_FLAGS_DELETED;
2403                         ret = ctdb_sock_addr_from_string("0.0.0.0", &node->addr,
2404                                                          false);
2405                         if (ret != 0) {
2406                                 /* Can't happen, but Coverity... */
2407                                 goto fail;
2408                         }
2409
2410                         continue;
2411                 }
2412
2413                 if (i < node_map->num_nodes &&
2414                     node_map->node[i].flags & NODE_FLAGS_DELETED) {
2415                         node = &node_map->node[i];
2416
2417                         node->flags &= ~NODE_FLAGS_DELETED;
2418                         node->addr = nodemap->node[i].addr;
2419
2420                         continue;
2421                 }
2422
2423                 node_map->node = talloc_realloc(node_map, node_map->node,
2424                                                 struct node,
2425                                                 node_map->num_nodes+1);
2426                 if (node_map->node == NULL) {
2427                         goto fail;
2428                 }
2429                 node = &node_map->node[node_map->num_nodes];
2430
2431                 node->addr = nodemap->node[i].addr;
2432                 node->pnn = nodemap->node[i].pnn;
2433                 node->flags = 0;
2434                 node->capabilities = CTDB_CAP_DEFAULT;
2435                 node->recovery_disabled = false;
2436                 node->recovery_substate = NULL;
2437
2438                 node_map->num_nodes += 1;
2439         }
2440
2441         talloc_free(nodemap);
2442
2443         reply.status = 0;
2444         reply.errmsg = NULL;
2445         client_send_control(req, header, &reply);
2446         return;
2447
2448 fail:
2449         reply.status = -1;
2450         reply.errmsg = "Memory error";
2451         client_send_control(req, header, &reply);
2452 }
2453
2454 static void control_get_capabilities(TALLOC_CTX *mem_ctx,
2455                                      struct tevent_req *req,
2456                                      struct ctdb_req_header *header,
2457                                      struct ctdb_req_control *request)
2458 {
2459         struct client_state *state = tevent_req_data(
2460                 req, struct client_state);
2461         struct ctdbd_context *ctdb = state->ctdb;
2462         struct ctdb_reply_control reply;
2463         struct node *node;
2464         uint32_t caps = 0;
2465
2466         reply.rdata.opcode = request->opcode;
2467
2468         node = &ctdb->node_map->node[header->destnode];
2469         caps = node->capabilities;
2470
2471         if (node->flags & NODE_FLAGS_FAKE_TIMEOUT) {
2472                 /* Don't send reply */
2473                 return;
2474         }
2475
2476         reply.rdata.data.caps = caps;
2477         reply.status = 0;
2478         reply.errmsg = NULL;
2479
2480         client_send_control(req, header, &reply);
2481 }
2482
2483 static void control_release_ip(TALLOC_CTX *mem_ctx,
2484                                struct tevent_req *req,
2485                                struct ctdb_req_header *header,
2486                                struct ctdb_req_control *request)
2487 {
2488         struct client_state *state = tevent_req_data(
2489                 req, struct client_state);
2490         struct ctdbd_context *ctdb = state->ctdb;
2491         struct ctdb_public_ip *ip = request->rdata.data.pubip;
2492         struct ctdb_reply_control reply;
2493         struct ctdb_public_ip_list *ips = NULL;
2494         struct ctdb_public_ip *t = NULL;
2495         int i;
2496
2497         reply.rdata.opcode = request->opcode;
2498
2499         if (ctdb->known_ips == NULL) {
2500                 D_INFO("RELEASE_IP %s - not a public IP\n",
2501                        ctdb_sock_addr_to_string(mem_ctx, &ip->addr, false));
2502                 goto done;
2503         }
2504
2505         ips = &ctdb->known_ips[header->destnode];
2506
2507         t = NULL;
2508         for (i = 0; i < ips->num; i++) {
2509                 if (ctdb_sock_addr_same_ip(&ips->ip[i].addr, &ip->addr)) {
2510                         t = &ips->ip[i];
2511                         break;
2512                 }
2513         }
2514         if (t == NULL) {
2515                 D_INFO("RELEASE_IP %s - not a public IP\n",
2516                        ctdb_sock_addr_to_string(mem_ctx, &ip->addr, false));
2517                 goto done;
2518         }
2519
2520         if (t->pnn != header->destnode) {
2521                 if (header->destnode == ip->pnn) {
2522                         D_ERR("error: RELEASE_IP %s - to TAKE_IP node %d\n",
2523                               ctdb_sock_addr_to_string(mem_ctx,
2524                                                        &ip->addr, false),
2525                               ip->pnn);
2526                         reply.status = -1;
2527                         reply.errmsg = "RELEASE_IP to TAKE_IP node";
2528                         client_send_control(req, header, &reply);
2529                         return;
2530                 }
2531
2532                 D_INFO("RELEASE_IP %s - to node %d - redundant\n",
2533                        ctdb_sock_addr_to_string(mem_ctx, &ip->addr, false),
2534                        ip->pnn);
2535                 t->pnn = ip->pnn;
2536         } else {
2537                 D_NOTICE("RELEASE_IP %s - to node %d\n",
2538                          ctdb_sock_addr_to_string(mem_ctx, &ip->addr, false),
2539                           ip->pnn);
2540                 t->pnn = ip->pnn;
2541         }
2542
2543 done:
2544         reply.status = 0;
2545         reply.errmsg = NULL;
2546         client_send_control(req, header, &reply);
2547 }
2548
2549 static void control_takeover_ip(TALLOC_CTX *mem_ctx,
2550                                 struct tevent_req *req,
2551                                 struct ctdb_req_header *header,
2552                                 struct ctdb_req_control *request)
2553 {
2554         struct client_state *state = tevent_req_data(
2555                 req, struct client_state);
2556         struct ctdbd_context *ctdb = state->ctdb;
2557         struct ctdb_public_ip *ip = request->rdata.data.pubip;
2558         struct ctdb_reply_control reply;
2559         struct ctdb_public_ip_list *ips = NULL;
2560         struct ctdb_public_ip *t = NULL;
2561         int i;
2562
2563         reply.rdata.opcode = request->opcode;
2564
2565         if (ctdb->known_ips == NULL) {
2566                 D_INFO("TAKEOVER_IP %s - not a public IP\n",
2567                        ctdb_sock_addr_to_string(mem_ctx, &ip->addr, false));
2568                 goto done;
2569         }
2570
2571         ips = &ctdb->known_ips[header->destnode];
2572
2573         t = NULL;
2574         for (i = 0; i < ips->num; i++) {
2575                 if (ctdb_sock_addr_same_ip(&ips->ip[i].addr, &ip->addr)) {
2576                         t = &ips->ip[i];
2577                         break;
2578                 }
2579         }
2580         if (t == NULL) {
2581                 D_INFO("TAKEOVER_IP %s - not a public IP\n",
2582                        ctdb_sock_addr_to_string(mem_ctx, &ip->addr, false));
2583                 goto done;
2584         }
2585
2586         if (t->pnn == header->destnode) {
2587                 D_INFO("TAKEOVER_IP %s - redundant\n",
2588                        ctdb_sock_addr_to_string(mem_ctx, &ip->addr, false));
2589         } else {
2590                 D_NOTICE("TAKEOVER_IP %s\n",
2591                          ctdb_sock_addr_to_string(mem_ctx, &ip->addr, false));
2592                 t->pnn = ip->pnn;
2593         }
2594
2595 done:
2596         reply.status = 0;
2597         reply.errmsg = NULL;
2598         client_send_control(req, header, &reply);
2599 }
2600
2601 static void control_get_public_ips(TALLOC_CTX *mem_ctx,
2602                                    struct tevent_req *req,
2603                                    struct ctdb_req_header *header,
2604                                    struct ctdb_req_control *request)
2605 {
2606         struct client_state *state = tevent_req_data(
2607                 req, struct client_state);
2608         struct ctdbd_context *ctdb = state->ctdb;
2609         struct ctdb_reply_control reply;
2610         struct ctdb_public_ip_list *ips = NULL;
2611
2612         reply.rdata.opcode = request->opcode;
2613
2614         if (ctdb->known_ips == NULL) {
2615                 /* No IPs defined so create a dummy empty struct and ship it */
2616                 ips = talloc_zero(mem_ctx, struct ctdb_public_ip_list);;
2617                 if (ips == NULL) {
2618                         reply.status = ENOMEM;
2619                         reply.errmsg = "Memory error";
2620                         goto done;
2621                 }
2622                 goto ok;
2623         }
2624
2625         ips = &ctdb->known_ips[header->destnode];
2626
2627         if (request->flags & CTDB_PUBLIC_IP_FLAGS_ONLY_AVAILABLE) {
2628                 /* If runstate is not RUNNING or a node is then return
2629                  * no available IPs.  Don't worry about interface
2630                  * states here - we're not faking down to that level.
2631                  */
2632                 if (ctdb->runstate != CTDB_RUNSTATE_RUNNING) {
2633                         /* No available IPs: return dummy empty struct */
2634                         ips = talloc_zero(mem_ctx, struct ctdb_public_ip_list);;
2635                         if (ips == NULL) {
2636                                 reply.status = ENOMEM;
2637                                 reply.errmsg = "Memory error";
2638                                 goto done;
2639                         }
2640                 }
2641         }
2642
2643 ok:
2644         reply.rdata.data.pubip_list = ips;
2645         reply.status = 0;
2646         reply.errmsg = NULL;
2647
2648 done:
2649         client_send_control(req, header, &reply);
2650 }
2651
2652 static void control_get_nodemap(TALLOC_CTX *mem_ctx,
2653                                 struct tevent_req *req,
2654                                 struct ctdb_req_header *header,
2655                                 struct ctdb_req_control *request)
2656 {
2657         struct client_state *state = tevent_req_data(
2658                 req, struct client_state);
2659         struct ctdbd_context *ctdb = state->ctdb;
2660         struct ctdb_reply_control reply;
2661         struct ctdb_node_map *nodemap;
2662         struct node *node;
2663         int i;
2664
2665         reply.rdata.opcode = request->opcode;
2666
2667         nodemap = talloc_zero(mem_ctx, struct ctdb_node_map);
2668         if (nodemap == NULL) {
2669                 goto fail;
2670         }
2671
2672         nodemap->num = ctdb->node_map->num_nodes;
2673         nodemap->node = talloc_array(nodemap, struct ctdb_node_and_flags,
2674                                      nodemap->num);
2675         if (nodemap->node == NULL) {
2676                 goto fail;
2677         }
2678
2679         for (i=0; i<nodemap->num; i++) {
2680                 node = &ctdb->node_map->node[i];
2681                 nodemap->node[i] = (struct ctdb_node_and_flags) {
2682                         .pnn = node->pnn,
2683                         .flags = node->flags,
2684                         .addr = node->addr,
2685                 };
2686         }
2687
2688         reply.rdata.data.nodemap = nodemap;
2689         reply.status = 0;
2690         reply.errmsg = NULL;
2691         client_send_control(req, header, &reply);
2692         return;
2693
2694 fail:
2695         reply.status = -1;
2696         reply.errmsg = "Memory error";
2697         client_send_control(req, header, &reply);
2698 }
2699
2700 static void control_get_reclock_file(TALLOC_CTX *mem_ctx,
2701                                      struct tevent_req *req,
2702                                      struct ctdb_req_header *header,
2703                                      struct ctdb_req_control *request)
2704 {
2705         struct client_state *state = tevent_req_data(
2706                 req, struct client_state);
2707         struct ctdbd_context *ctdb = state->ctdb;
2708         struct ctdb_reply_control reply;
2709
2710         reply.rdata.opcode = request->opcode;
2711
2712         if (ctdb->reclock != NULL) {
2713                 reply.rdata.data.reclock_file =
2714                         talloc_strdup(mem_ctx, ctdb->reclock);
2715                 if (reply.rdata.data.reclock_file == NULL) {
2716                         reply.status = ENOMEM;
2717                         reply.errmsg = "Memory error";
2718                         goto done;
2719                 }
2720         } else {
2721                 reply.rdata.data.reclock_file = NULL;
2722         }
2723
2724         reply.status = 0;
2725         reply.errmsg = NULL;
2726
2727 done:
2728         client_send_control(req, header, &reply);
2729 }
2730
2731 static void control_stop_node(TALLOC_CTX *mem_ctx,
2732                               struct tevent_req *req,
2733                               struct ctdb_req_header *header,
2734                               struct ctdb_req_control *request)
2735 {
2736         struct client_state *state = tevent_req_data(
2737                 req, struct client_state);
2738         struct ctdbd_context *ctdb = state->ctdb;
2739         struct ctdb_reply_control reply;
2740
2741         reply.rdata.opcode = request->opcode;
2742
2743         DEBUG(DEBUG_INFO, ("Stopping node\n"));
2744         ctdb->node_map->node[header->destnode].flags |= NODE_FLAGS_STOPPED;
2745
2746         reply.status = 0;
2747         reply.errmsg = NULL;
2748
2749         client_send_control(req, header, &reply);
2750         return;
2751 }
2752
2753 static void control_continue_node(TALLOC_CTX *mem_ctx,
2754                                   struct tevent_req *req,
2755                                   struct ctdb_req_header *header,
2756                                   struct ctdb_req_control *request)
2757 {
2758         struct client_state *state = tevent_req_data(
2759                 req, struct client_state);
2760         struct ctdbd_context *ctdb = state->ctdb;
2761         struct ctdb_reply_control reply;
2762
2763         reply.rdata.opcode = request->opcode;
2764
2765         DEBUG(DEBUG_INFO, ("Continue node\n"));
2766         ctdb->node_map->node[header->destnode].flags &= ~NODE_FLAGS_STOPPED;
2767
2768         reply.status = 0;
2769         reply.errmsg = NULL;
2770
2771         client_send_control(req, header, &reply);
2772         return;
2773 }
2774
2775 static void set_ban_state_callback(struct tevent_req *subreq)
2776 {
2777         struct node *node = tevent_req_callback_data(
2778                 subreq, struct node);
2779         bool status;
2780
2781         status = tevent_wakeup_recv(subreq);
2782         TALLOC_FREE(subreq);
2783         if (! status) {
2784                 DEBUG(DEBUG_INFO, ("tevent_wakeup_recv failed\n"));
2785         }
2786
2787         node->flags &= ~NODE_FLAGS_BANNED;
2788 }
2789
2790 static void control_set_ban_state(TALLOC_CTX *mem_ctx,
2791                                   struct tevent_req *req,
2792                                   struct ctdb_req_header *header,
2793                                   struct ctdb_req_control *request)
2794 {
2795         struct client_state *state = tevent_req_data(
2796                 req, struct client_state);
2797         struct tevent_req *subreq;
2798         struct ctdbd_context *ctdb = state->ctdb;
2799         struct ctdb_ban_state *ban = request->rdata.data.ban_state;
2800         struct ctdb_reply_control reply;
2801         struct node *node;
2802
2803         reply.rdata.opcode = request->opcode;
2804
2805         if (ban->pnn != header->destnode) {
2806                 DEBUG(DEBUG_INFO,
2807                       ("SET_BAN_STATE control for PNN %d rejected\n",
2808                        ban->pnn));
2809                 reply.status = EINVAL;
2810                 goto fail;
2811         }
2812
2813         node = &ctdb->node_map->node[header->destnode];
2814
2815         if (ban->time == 0) {
2816                 DEBUG(DEBUG_INFO,("Unbanning this node\n"));
2817                 node->flags &= ~NODE_FLAGS_BANNED;
2818                 goto done;
2819         }
2820
2821         subreq = tevent_wakeup_send(ctdb->node_map, state->ev,
2822                                     tevent_timeval_current_ofs(
2823                                             ban->time, 0));
2824         if (subreq == NULL) {
2825                 reply.status = ENOMEM;
2826                 goto fail;
2827         }
2828         tevent_req_set_callback(subreq, set_ban_state_callback, node);
2829
2830         DEBUG(DEBUG_INFO, ("Banning this node for %d seconds\n", ban->time));
2831         node->flags |= NODE_FLAGS_BANNED;
2832         ctdb->vnn_map->generation = INVALID_GENERATION;
2833
2834 done:
2835         reply.status = 0;
2836         reply.errmsg = NULL;
2837
2838         client_send_control(req, header, &reply);
2839         return;
2840
2841 fail:
2842         reply.errmsg = "Failed to ban node";
2843 }
2844
2845 static void control_trans3_commit(TALLOC_CTX *mem_ctx,
2846                                   struct tevent_req *req,
2847                                   struct ctdb_req_header *header,
2848                                   struct ctdb_req_control *request)
2849 {
2850         struct client_state *state = tevent_req_data(
2851                 req, struct client_state);
2852         struct ctdbd_context *ctdb = state->ctdb;
2853         struct ctdb_reply_control reply;
2854         struct database *db;
2855         int ret;
2856
2857         reply.rdata.opcode = request->opcode;
2858
2859         db = database_find(ctdb->db_map, request->rdata.data.recbuf->db_id);
2860         if (db == NULL) {
2861                 reply.status = -1;
2862                 reply.errmsg = "Unknown database";
2863                 client_send_control(req, header, &reply);
2864                 return;
2865         }
2866
2867         if (! (db->flags &
2868                (CTDB_DB_FLAGS_PERSISTENT|CTDB_DB_FLAGS_REPLICATED))) {
2869                 reply.status = -1;
2870                 reply.errmsg = "Transactions on volatile database";
2871                 client_send_control(req, header, &reply);
2872                 return;
2873         }
2874
2875         ret = ltdb_transaction(db, request->rdata.data.recbuf);
2876         if (ret != 0) {
2877                 reply.status = -1;
2878                 reply.errmsg = "Transaction failed";
2879                 client_send_control(req, header, &reply);
2880                 return;
2881         }
2882
2883         reply.status = 0;
2884         reply.errmsg = NULL;
2885         client_send_control(req, header, &reply);
2886 }
2887
2888 static void control_get_db_seqnum(TALLOC_CTX *mem_ctx,
2889                                struct tevent_req *req,
2890                                struct ctdb_req_header *header,
2891                                struct ctdb_req_control *request)
2892 {
2893         struct client_state *state = tevent_req_data(
2894                 req, struct client_state);
2895         struct ctdbd_context *ctdb = state->ctdb;
2896         struct ctdb_reply_control reply;
2897         struct database *db;
2898         int ret;
2899
2900         reply.rdata.opcode = request->opcode;
2901
2902         db = database_find(ctdb->db_map, request->rdata.data.db_id);
2903         if (db == NULL) {
2904                 reply.status = ENOENT;
2905                 reply.errmsg = "Database not found";
2906         } else {
2907                 uint64_t seqnum;
2908
2909                 ret = database_seqnum(db, &seqnum);
2910                 if (ret == 0) {
2911                         reply.rdata.data.seqnum = seqnum;
2912                         reply.status = 0;
2913                         reply.errmsg = NULL;
2914                 } else {
2915                         reply.status = ret;
2916                         reply.errmsg = "Failed to get seqnum";
2917                 }
2918         }
2919
2920         client_send_control(req, header, &reply);
2921 }
2922
2923 static void control_db_get_health(TALLOC_CTX *mem_ctx,
2924                                   struct tevent_req *req,
2925                                   struct ctdb_req_header *header,
2926                                   struct ctdb_req_control *request)
2927 {
2928         struct client_state *state = tevent_req_data(
2929                 req, struct client_state);
2930         struct ctdbd_context *ctdb = state->ctdb;
2931         struct ctdb_reply_control reply;
2932         struct database *db;
2933
2934         reply.rdata.opcode = request->opcode;
2935
2936         db = database_find(ctdb->db_map, request->rdata.data.db_id);
2937         if (db == NULL) {
2938                 reply.status = ENOENT;
2939                 reply.errmsg = "Database not found";
2940         } else {
2941                 reply.rdata.data.reason = NULL;
2942                 reply.status = 0;
2943                 reply.errmsg = NULL;
2944         }
2945
2946         client_send_control(req, header, &reply);
2947 }
2948
2949 static struct ctdb_iface_list *get_ctdb_iface_list(TALLOC_CTX *mem_ctx,
2950                                                    struct ctdbd_context *ctdb)
2951 {
2952         struct ctdb_iface_list *iface_list;
2953         struct interface *iface;
2954         int i;
2955
2956         iface_list = talloc_zero(mem_ctx, struct ctdb_iface_list);
2957         if (iface_list == NULL) {
2958                 goto done;
2959         }
2960
2961         iface_list->num = ctdb->iface_map->num;
2962         iface_list->iface = talloc_array(iface_list, struct ctdb_iface,
2963                                          iface_list->num);
2964         if (iface_list->iface == NULL) {
2965                 TALLOC_FREE(iface_list);
2966                 goto done;
2967         }
2968
2969         for (i=0; i<iface_list->num; i++) {
2970                 iface = &ctdb->iface_map->iface[i];
2971                 iface_list->iface[i] = (struct ctdb_iface) {
2972                         .link_state = iface->link_up,
2973                         .references = iface->references,
2974                 };
2975                 strlcpy(iface_list->iface[i].name, iface->name,
2976                         sizeof(iface_list->iface[i].name));
2977         }
2978
2979 done:
2980         return iface_list;
2981 }
2982
2983 static void control_get_public_ip_info(TALLOC_CTX *mem_ctx,
2984                                        struct tevent_req *req,
2985                                        struct ctdb_req_header *header,
2986                                        struct ctdb_req_control *request)
2987 {
2988         struct client_state *state = tevent_req_data(
2989                 req, struct client_state);
2990         struct ctdbd_context *ctdb = state->ctdb;
2991         struct ctdb_reply_control reply;
2992         ctdb_sock_addr *addr = request->rdata.data.addr;
2993         struct ctdb_public_ip_list *known = NULL;
2994         struct ctdb_public_ip_info *info = NULL;
2995         unsigned i;
2996
2997         reply.rdata.opcode = request->opcode;
2998
2999         info = talloc_zero(mem_ctx, struct ctdb_public_ip_info);
3000         if (info == NULL) {
3001                 reply.status = ENOMEM;
3002                 reply.errmsg = "Memory error";
3003                 goto done;
3004         }
3005
3006         reply.rdata.data.ipinfo = info;
3007
3008         if (ctdb->known_ips != NULL) {
3009                 known = &ctdb->known_ips[header->destnode];
3010         } else {
3011                 /* No IPs defined so create a dummy empty struct and
3012                  * fall through.  The given IP won't be matched
3013                  * below...
3014                  */
3015                 known = talloc_zero(mem_ctx, struct ctdb_public_ip_list);;
3016                 if (known == NULL) {
3017                         reply.status = ENOMEM;
3018                         reply.errmsg = "Memory error";
3019                         goto done;
3020                 }
3021         }
3022
3023         for (i = 0; i < known->num; i++) {
3024                 if (ctdb_sock_addr_same_ip(&known->ip[i].addr,
3025                                            addr)) {
3026                         break;
3027                 }
3028         }
3029
3030         if (i == known->num) {
3031                 D_ERR("GET_PUBLIC_IP_INFO: not known public IP %s\n",
3032                       ctdb_sock_addr_to_string(mem_ctx, addr, false));
3033                 reply.status = -1;
3034                 reply.errmsg = "Unknown address";
3035                 goto done;
3036         }
3037
3038         info->ip = known->ip[i];
3039
3040         /* The fake PUBLICIPS stanza and resulting known_ips data
3041          * don't know anything about interfaces, so completely fake
3042          * this.
3043          */
3044         info->active_idx = 0;
3045
3046         info->ifaces = get_ctdb_iface_list(mem_ctx, ctdb);
3047         if (info->ifaces == NULL) {
3048                 reply.status = ENOMEM;
3049                 reply.errmsg = "Memory error";
3050                 goto done;
3051         }
3052
3053         reply.status = 0;
3054         reply.errmsg = NULL;
3055
3056 done:
3057         client_send_control(req, header, &reply);
3058 }
3059
3060 static void control_get_ifaces(TALLOC_CTX *mem_ctx,
3061                                struct tevent_req *req,
3062                                struct ctdb_req_header *header,
3063                                struct ctdb_req_control *request)
3064 {
3065         struct client_state *state = tevent_req_data(
3066                 req, struct client_state);
3067         struct ctdbd_context *ctdb = state->ctdb;
3068         struct ctdb_reply_control reply;
3069         struct ctdb_iface_list *iface_list;
3070
3071         reply.rdata.opcode = request->opcode;
3072
3073         iface_list = get_ctdb_iface_list(mem_ctx, ctdb);
3074         if (iface_list == NULL) {
3075                 goto fail;
3076         }
3077
3078         reply.rdata.data.iface_list = iface_list;
3079         reply.status = 0;
3080         reply.errmsg = NULL;
3081         client_send_control(req, header, &reply);
3082         return;
3083
3084 fail:
3085         reply.status = -1;
3086         reply.errmsg = "Memory error";
3087         client_send_control(req, header, &reply);
3088 }
3089
3090 static void control_set_iface_link_state(TALLOC_CTX *mem_ctx,
3091                                          struct tevent_req *req,
3092                                          struct ctdb_req_header *header,
3093                                          struct ctdb_req_control *request)
3094 {
3095         struct client_state *state = tevent_req_data(
3096                 req, struct client_state);
3097         struct ctdbd_context *ctdb = state->ctdb;
3098         struct ctdb_reply_control reply;
3099         struct ctdb_iface *in_iface;
3100         struct interface *iface = NULL;
3101         bool link_up = false;
3102         int i;
3103
3104         reply.rdata.opcode = request->opcode;
3105
3106         in_iface = request->rdata.data.iface;
3107
3108         if (in_iface->name[CTDB_IFACE_SIZE] != '\0') {
3109                 reply.errmsg = "interface name not terminated";
3110                 goto fail;
3111         }
3112
3113         switch (in_iface->link_state) {
3114                 case 0:
3115                         link_up = false;
3116                         break;
3117
3118                 case 1:
3119                         link_up = true;
3120                         break;
3121
3122                 default:
3123                         reply.errmsg = "invalid link state";
3124                         goto fail;
3125         }
3126
3127         if (in_iface->references != 0) {
3128                 reply.errmsg = "references should be 0";
3129                 goto fail;
3130         }
3131
3132         for (i=0; i<ctdb->iface_map->num; i++) {
3133                 if (strcmp(ctdb->iface_map->iface[i].name,
3134                            in_iface->name) == 0) {
3135                         iface = &ctdb->iface_map->iface[i];
3136                         break;
3137                 }
3138         }
3139
3140         if (iface == NULL) {
3141                 reply.errmsg = "interface not found";
3142                 goto fail;
3143         }
3144
3145         iface->link_up = link_up;
3146
3147         reply.status = 0;
3148         reply.errmsg = NULL;
3149         client_send_control(req, header, &reply);
3150         return;
3151
3152 fail:
3153         reply.status = -1;
3154         client_send_control(req, header, &reply);
3155 }
3156
3157 static void control_set_db_readonly(TALLOC_CTX *mem_ctx,
3158                                     struct tevent_req *req,
3159                                     struct ctdb_req_header *header,
3160                                     struct ctdb_req_control *request)
3161 {
3162         struct client_state *state = tevent_req_data(
3163                 req, struct client_state);
3164         struct ctdbd_context *ctdb = state->ctdb;
3165         struct ctdb_reply_control reply;
3166         struct database *db;
3167
3168         reply.rdata.opcode = request->opcode;
3169
3170         db = database_find(ctdb->db_map, request->rdata.data.db_id);
3171         if (db == NULL) {
3172                 reply.status = ENOENT;
3173                 reply.errmsg = "Database not found";
3174                 goto done;
3175         }
3176
3177         if (db->flags & CTDB_DB_FLAGS_PERSISTENT) {
3178                 reply.status = EINVAL;
3179                 reply.errmsg = "Can not set READONLY on persistent db";
3180                 goto done;
3181         }
3182
3183         db->flags |= CTDB_DB_FLAGS_READONLY;
3184         reply.status = 0;
3185         reply.errmsg = NULL;
3186
3187 done:
3188         client_send_control(req, header, &reply);
3189 }
3190
3191 struct traverse_start_ext_state {
3192         struct tevent_req *req;
3193         struct ctdb_req_header *header;
3194         uint32_t reqid;
3195         uint64_t srvid;
3196         bool withemptyrecords;
3197         int status;
3198 };
3199
3200 static int traverse_start_ext_handler(struct tdb_context *tdb,
3201                                       TDB_DATA key, TDB_DATA data,
3202                                       void *private_data)
3203 {
3204         struct traverse_start_ext_state *state =
3205                 (struct traverse_start_ext_state *)private_data;
3206         struct ctdb_rec_data rec;
3207         struct ctdb_req_message_data message;
3208         size_t np;
3209
3210         if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
3211                 return 0;
3212         }
3213
3214         if ((data.dsize == sizeof(struct ctdb_ltdb_header)) &&
3215             (!state->withemptyrecords)) {
3216                 return 0;
3217         }
3218
3219         rec = (struct ctdb_rec_data) {
3220                 .reqid = state->reqid,
3221                 .header = NULL,
3222                 .key = key,
3223                 .data = data,
3224         };
3225
3226         message.srvid = state->srvid;
3227         message.data.dsize = ctdb_rec_data_len(&rec);
3228         message.data.dptr = talloc_size(state->req, message.data.dsize);
3229         if (message.data.dptr == NULL) {
3230                 state->status = ENOMEM;
3231                 return 1;
3232         }
3233
3234         ctdb_rec_data_push(&rec, message.data.dptr, &np);
3235         client_send_message(state->req, state->header, &message);
3236
3237         talloc_free(message.data.dptr);
3238
3239         return 0;
3240 }
3241
3242 static void control_traverse_start_ext(TALLOC_CTX *mem_ctx,
3243                                        struct tevent_req *req,
3244                                        struct ctdb_req_header *header,
3245                                        struct ctdb_req_control *request)
3246 {
3247         struct client_state *state = tevent_req_data(
3248                 req, struct client_state);
3249         struct ctdbd_context *ctdb = state->ctdb;
3250         struct ctdb_reply_control reply;
3251         struct database *db;
3252         struct ctdb_traverse_start_ext *ext;
3253         struct traverse_start_ext_state t_state;
3254         struct ctdb_rec_data rec;
3255         struct ctdb_req_message_data message;
3256         uint8_t buffer[32];
3257         size_t np;
3258         int ret;
3259
3260         reply.rdata.opcode = request->opcode;
3261
3262         ext = request->rdata.data.traverse_start_ext;
3263
3264         db = database_find(ctdb->db_map, ext->db_id);
3265         if (db == NULL) {
3266                 reply.status = -1;
3267                 reply.errmsg = "Unknown database";
3268                 client_send_control(req, header, &reply);
3269                 return;
3270         }
3271
3272         t_state = (struct traverse_start_ext_state) {
3273                 .req = req,
3274                 .header = header,
3275                 .reqid = ext->reqid,
3276                 .srvid = ext->srvid,
3277                 .withemptyrecords = ext->withemptyrecords,
3278         };
3279
3280         ret = tdb_traverse_read(db->tdb, traverse_start_ext_handler, &t_state);
3281         DEBUG(DEBUG_INFO, ("traversed %d records\n", ret));
3282         if (t_state.status != 0) {
3283                 reply.status = -1;
3284                 reply.errmsg = "Memory error";
3285                 client_send_control(req, header, &reply);
3286         }
3287
3288         reply.status = 0;
3289         client_send_control(req, header, &reply);
3290
3291         rec = (struct ctdb_rec_data) {
3292                 .reqid = ext->reqid,
3293                 .header = NULL,
3294                 .key = tdb_null,
3295                 .data = tdb_null,
3296         };
3297
3298         message.srvid = ext->srvid;
3299         message.data.dsize = ctdb_rec_data_len(&rec);
3300         ctdb_rec_data_push(&rec, buffer, &np);
3301         message.data.dptr = buffer;
3302         client_send_message(req, header, &message);
3303 }
3304
3305 static void control_set_db_sticky(TALLOC_CTX *mem_ctx,
3306                                     struct tevent_req *req,
3307                                     struct ctdb_req_header *header,
3308                                     struct ctdb_req_control *request)
3309 {
3310         struct client_state *state = tevent_req_data(
3311                 req, struct client_state);
3312         struct ctdbd_context *ctdb = state->ctdb;
3313         struct ctdb_reply_control reply;
3314         struct database *db;
3315
3316         reply.rdata.opcode = request->opcode;
3317
3318         db = database_find(ctdb->db_map, request->rdata.data.db_id);
3319         if (db == NULL) {
3320                 reply.status = ENOENT;
3321                 reply.errmsg = "Database not found";
3322                 goto done;
3323         }
3324
3325         if (db->flags & CTDB_DB_FLAGS_PERSISTENT) {
3326                 reply.status = EINVAL;
3327                 reply.errmsg = "Can not set STICKY on persistent db";
3328                 goto done;
3329         }
3330
3331         db->flags |= CTDB_DB_FLAGS_STICKY;
3332         reply.status = 0;
3333         reply.errmsg = NULL;
3334
3335 done:
3336         client_send_control(req, header, &reply);
3337 }
3338
3339 static void control_ipreallocated(TALLOC_CTX *mem_ctx,
3340                                   struct tevent_req *req,
3341                                   struct ctdb_req_header *header,
3342                                   struct ctdb_req_control *request)
3343 {
3344         struct ctdb_reply_control reply;
3345
3346         /* Always succeed */
3347         reply.rdata.opcode = request->opcode;
3348         reply.status = 0;
3349         reply.errmsg = NULL;
3350
3351         client_send_control(req, header, &reply);
3352 }
3353
3354 static void control_get_runstate(TALLOC_CTX *mem_ctx,
3355                                  struct tevent_req *req,
3356                                  struct ctdb_req_header *header,
3357                                  struct ctdb_req_control *request)
3358 {
3359         struct client_state *state = tevent_req_data(
3360                 req, struct client_state);
3361         struct ctdbd_context *ctdb = state->ctdb;
3362         struct ctdb_reply_control reply;
3363
3364         reply.rdata.opcode = request->opcode;
3365         reply.rdata.data.runstate = ctdb->runstate;
3366         reply.status = 0;
3367         reply.errmsg = NULL;
3368
3369         client_send_control(req, header, &reply);
3370 }
3371
3372 static void control_get_nodes_file(TALLOC_CTX *mem_ctx,
3373                                    struct tevent_req *req,
3374                                    struct ctdb_req_header *header,
3375                                    struct ctdb_req_control *request)
3376 {
3377         struct ctdb_reply_control reply;
3378         struct ctdb_node_map *nodemap;
3379
3380         reply.rdata.opcode = request->opcode;
3381
3382         nodemap = read_nodes_file(mem_ctx, header->destnode);
3383         if (nodemap == NULL) {
3384                 goto fail;
3385         }
3386
3387         reply.rdata.data.nodemap = nodemap;
3388         reply.status = 0;
3389         reply.errmsg = NULL;
3390         client_send_control(req, header, &reply);
3391         return;
3392
3393 fail:
3394         reply.status = -1;
3395         reply.errmsg = "Failed to read nodes file";
3396         client_send_control(req, header, &reply);
3397 }
3398
3399 static void control_db_open_flags(TALLOC_CTX *mem_ctx,
3400                                   struct tevent_req *req,
3401                                   struct ctdb_req_header *header,
3402                                   struct ctdb_req_control *request)
3403 {
3404         struct client_state *state = tevent_req_data(
3405                 req, struct client_state);
3406         struct ctdbd_context *ctdb = state->ctdb;
3407         struct ctdb_reply_control reply;
3408         struct database *db;
3409
3410         reply.rdata.opcode = request->opcode;
3411
3412         db = database_find(ctdb->db_map, request->rdata.data.db_id);
3413         if (db == NULL) {
3414                 reply.status = ENOENT;
3415                 reply.errmsg = "Database not found";
3416         } else {
3417                 reply.rdata.data.tdb_flags = database_flags(db->flags);
3418                 reply.status = 0;
3419                 reply.errmsg = NULL;
3420         }
3421
3422         client_send_control(req, header, &reply);
3423 }
3424
3425 static void control_db_attach_replicated(TALLOC_CTX *mem_ctx,
3426                                          struct tevent_req *req,
3427                                          struct ctdb_req_header *header,
3428                                          struct ctdb_req_control *request)
3429 {
3430         struct client_state *state = tevent_req_data(
3431                 req, struct client_state);
3432         struct ctdbd_context *ctdb = state->ctdb;
3433         struct ctdb_reply_control reply;
3434         struct database *db;
3435
3436         reply.rdata.opcode = request->opcode;
3437
3438         for (db = ctdb->db_map->db; db != NULL; db = db->next) {
3439                 if (strcmp(db->name, request->rdata.data.db_name) == 0) {
3440                         goto done;
3441                 }
3442         }
3443
3444         db = database_new(ctdb->db_map, request->rdata.data.db_name,
3445                           CTDB_DB_FLAGS_REPLICATED);
3446         if (db == NULL) {
3447                 reply.status = -1;
3448                 reply.errmsg = "Failed to attach database";
3449                 client_send_control(req, header, &reply);
3450                 return;
3451         }
3452
3453 done:
3454         reply.rdata.data.db_id = db->id;
3455         reply.status = 0;
3456         reply.errmsg = NULL;
3457         client_send_control(req, header, &reply);
3458 }
3459
3460 static void control_check_pid_srvid(TALLOC_CTX *mem_ctx,
3461                                     struct tevent_req *req,
3462                                     struct ctdb_req_header *header,
3463                                     struct ctdb_req_control *request)
3464 {
3465         struct client_state *state = tevent_req_data(
3466                 req, struct client_state);
3467         struct ctdbd_context *ctdb = state->ctdb;
3468         struct ctdb_client *client;
3469         struct client_state *cstate;
3470         struct ctdb_reply_control reply;
3471         bool pid_found, srvid_found;
3472         int ret;
3473
3474         reply.rdata.opcode = request->opcode;
3475
3476         pid_found = false;
3477         srvid_found = false;
3478
3479         for (client=ctdb->client_list; client != NULL; client=client->next) {
3480                 if (client->pid == request->rdata.data.pid_srvid->pid) {
3481                         pid_found = true;
3482                         cstate = (struct client_state *)client->state;
3483                         ret = srvid_exists(ctdb->srv,
3484                                            request->rdata.data.pid_srvid->srvid,
3485                                            cstate);
3486                         if (ret == 0) {
3487                                 srvid_found = true;
3488                                 ret = kill(cstate->pid, 0);
3489                                 if (ret != 0) {
3490                                         reply.status = ret;
3491                                         reply.errmsg = strerror(errno);
3492                                 } else {
3493                                         reply.status = 0;
3494                                         reply.errmsg = NULL;
3495                                 }
3496                         }
3497                 }
3498         }
3499
3500         if (! pid_found) {
3501                 reply.status = -1;
3502                 reply.errmsg = "No client for PID";
3503         } else if (! srvid_found) {
3504                 reply.status = -1;
3505                 reply.errmsg = "No client for PID and SRVID";
3506         }
3507
3508         client_send_control(req, header, &reply);
3509 }
3510
3511 static bool fake_control_failure(TALLOC_CTX *mem_ctx,
3512                                  struct tevent_req *req,
3513                                  struct ctdb_req_header *header,
3514                                  struct ctdb_req_control *request)
3515 {
3516         struct client_state *state = tevent_req_data(
3517                 req, struct client_state);
3518         struct ctdbd_context *ctdb = state->ctdb;
3519         struct ctdb_reply_control reply;
3520         struct fake_control_failure *f = NULL;
3521
3522         D_DEBUG("Checking fake control failure for control %u on node %u\n",
3523                 request->opcode, header->destnode);
3524         for (f = ctdb->control_failures; f != NULL; f = f->next) {
3525                 if (f->opcode == request->opcode &&
3526                     (f->pnn == header->destnode ||
3527                      f->pnn == CTDB_UNKNOWN_PNN)) {
3528
3529                         reply.rdata.opcode = request->opcode;
3530                         if (strcmp(f->error, "TIMEOUT") == 0) {
3531                                 /* Causes no reply */
3532                                 D_ERR("Control %u fake timeout on node %u\n",
3533                                       request->opcode, header->destnode);
3534                                 return true;
3535                         } else if (strcmp(f->error, "ERROR") == 0) {
3536                                 D_ERR("Control %u fake error on node %u\n",
3537                                       request->opcode, header->destnode);
3538                                 reply.status = -1;
3539                                 reply.errmsg = f->comment;
3540                                 client_send_control(req, header, &reply);
3541                                 return true;
3542                         }
3543                 }
3544         }
3545
3546         return false;
3547 }
3548
3549 static void control_error(TALLOC_CTX *mem_ctx,
3550                           struct tevent_req *req,
3551                           struct ctdb_req_header *header,
3552                           struct ctdb_req_control *request)
3553 {
3554         struct ctdb_reply_control reply;
3555
3556         D_DEBUG("Control %u not implemented\n", request->opcode);
3557
3558         reply.rdata.opcode = request->opcode;
3559         reply.status = -1;
3560         reply.errmsg = "Not implemented";
3561
3562         client_send_control(req, header, &reply);
3563 }
3564
3565 /*
3566  * Handling protocol - messages
3567  */
3568
3569 struct disable_recoveries_state {
3570         struct node *node;
3571 };
3572
3573 static void disable_recoveries_callback(struct tevent_req *subreq)
3574 {
3575         struct disable_recoveries_state *substate = tevent_req_callback_data(
3576                 subreq, struct disable_recoveries_state);
3577         bool status;
3578
3579         status = tevent_wakeup_recv(subreq);
3580         TALLOC_FREE(subreq);
3581         if (! status) {
3582                 DEBUG(DEBUG_INFO, ("tevent_wakeup_recv failed\n"));
3583         }
3584
3585         substate->node->recovery_disabled = false;
3586         TALLOC_FREE(substate->node->recovery_substate);
3587 }
3588
3589 static void message_disable_recoveries(TALLOC_CTX *mem_ctx,
3590                                        struct tevent_req *req,
3591                                        struct ctdb_req_header *header,
3592                                        struct ctdb_req_message *request)
3593 {
3594         struct client_state *state = tevent_req_data(
3595                 req, struct client_state);
3596         struct tevent_req *subreq;
3597         struct ctdbd_context *ctdb = state->ctdb;
3598         struct disable_recoveries_state *substate;
3599         struct ctdb_disable_message *disable = request->data.disable;
3600         struct ctdb_req_message_data reply;
3601         struct node *node;
3602         int ret = -1;
3603         TDB_DATA data;
3604
3605         node = &ctdb->node_map->node[header->destnode];
3606
3607         if (disable->timeout == 0) {
3608                 TALLOC_FREE(node->recovery_substate);
3609                 node->recovery_disabled = false;
3610                 DEBUG(DEBUG_INFO, ("Enabled recoveries on node %u\n",
3611                                    header->destnode));
3612                 goto done;
3613         }
3614
3615         substate = talloc_zero(ctdb->node_map,
3616                                struct disable_recoveries_state);
3617         if (substate == NULL) {
3618                 goto fail;
3619         }
3620
3621         substate->node = node;
3622
3623         subreq = tevent_wakeup_send(substate, state->ev,
3624                                     tevent_timeval_current_ofs(
3625                                             disable->timeout, 0));
3626         if (subreq == NULL) {
3627                 talloc_free(substate);
3628                 goto fail;
3629         }
3630         tevent_req_set_callback(subreq, disable_recoveries_callback, substate);
3631
3632         DEBUG(DEBUG_INFO, ("Disabled recoveries for %d seconds on node %u\n",
3633                            disable->timeout, header->destnode));
3634         node->recovery_substate = substate;
3635         node->recovery_disabled = true;
3636
3637 done:
3638         ret = header->destnode;
3639
3640 fail:
3641         reply.srvid = disable->srvid;
3642         data.dptr = (uint8_t *)&ret;
3643         data.dsize = sizeof(int);
3644         reply.data = data;
3645
3646         client_send_message(req, header, &reply);
3647 }
3648
3649 static void message_takeover_run(TALLOC_CTX *mem_ctx,
3650                                  struct tevent_req *req,
3651                                  struct ctdb_req_header *header,
3652                                  struct ctdb_req_message *request)
3653 {
3654         struct client_state *state = tevent_req_data(
3655                 req, struct client_state);
3656         struct ctdbd_context *ctdb = state->ctdb;
3657         struct ctdb_srvid_message *srvid = request->data.msg;
3658         struct ctdb_req_message_data reply;
3659         int ret = -1;
3660         TDB_DATA data;
3661
3662         if (header->destnode != ctdb->node_map->recmaster) {
3663                 /* No reply! Only recmaster replies... */
3664                 return;
3665         }
3666
3667         DEBUG(DEBUG_INFO, ("IP takover run on node %u\n",
3668                            header->destnode));
3669         ret = header->destnode;
3670
3671         reply.srvid = srvid->srvid;
3672         data.dptr = (uint8_t *)&ret;
3673         data.dsize = sizeof(int);
3674         reply.data = data;
3675
3676         client_send_message(req, header, &reply);
3677 }
3678
3679 /*
3680  * Handle a single client
3681  */
3682
3683 static void client_read_handler(uint8_t *buf, size_t buflen,
3684                                 void *private_data);
3685 static void client_dead_handler(void *private_data);
3686 static void client_process_packet(struct tevent_req *req,
3687                                   uint8_t *buf, size_t buflen);
3688 static void client_process_call(struct tevent_req *req,
3689                                 uint8_t *buf, size_t buflen);
3690 static void client_process_message(struct tevent_req *req,
3691                                    uint8_t *buf, size_t buflen);
3692 static void client_process_control(struct tevent_req *req,
3693                                    uint8_t *buf, size_t buflen);
3694 static void client_reply_done(struct tevent_req *subreq);
3695
3696 static struct tevent_req *client_send(TALLOC_CTX *mem_ctx,
3697                                       struct tevent_context *ev,
3698                                       int fd, struct ctdbd_context *ctdb,
3699                                       int pnn)
3700 {
3701         struct tevent_req *req;
3702         struct client_state *state;
3703         struct ucred cr;
3704         socklen_t crl = sizeof(struct ucred);
3705         int ret;
3706
3707         req = tevent_req_create(mem_ctx, &state, struct client_state);
3708         if (req == NULL) {
3709                 return NULL;
3710         }
3711
3712         state->ev = ev;
3713         state->fd = fd;
3714         state->ctdb = ctdb;
3715         state->pnn = pnn;
3716
3717         ret = getsockopt(fd, SOL_SOCKET, SO_PEERCRED, &cr, &crl);
3718         if (ret != 0) {
3719                 tevent_req_error(req, ret);
3720                 return tevent_req_post(req, ev);
3721         }
3722         state->pid = cr.pid;
3723
3724         ret = comm_setup(state, ev, fd, client_read_handler, req,
3725                          client_dead_handler, req, &state->comm);
3726         if (ret != 0) {
3727                 tevent_req_error(req, ret);
3728                 return tevent_req_post(req, ev);
3729         }
3730
3731         ret = client_add(ctdb, state->pid, state);
3732         if (ret != 0) {
3733                 tevent_req_error(req, ret);
3734                 return tevent_req_post(req, ev);
3735         }
3736
3737         DEBUG(DEBUG_INFO, ("New client fd=%d\n", fd));
3738
3739         return req;
3740 }
3741
3742 static void client_read_handler(uint8_t *buf, size_t buflen,
3743                                 void *private_data)
3744 {
3745         struct tevent_req *req = talloc_get_type_abort(
3746                 private_data, struct tevent_req);
3747         struct client_state *state = tevent_req_data(
3748                 req, struct client_state);
3749         struct ctdbd_context *ctdb = state->ctdb;
3750         struct ctdb_req_header header;
3751         size_t np;
3752         int ret, i;
3753
3754         ret = ctdb_req_header_pull(buf, buflen, &header, &np);
3755         if (ret != 0) {
3756                 return;
3757         }
3758
3759         if (buflen != header.length) {
3760                 return;
3761         }
3762
3763         ret = ctdb_req_header_verify(&header, 0);
3764         if (ret != 0) {
3765                 return;
3766         }
3767
3768         header_fix_pnn(&header, ctdb);
3769
3770         if (header.destnode == CTDB_BROADCAST_ALL) {
3771                 for (i=0; i<ctdb->node_map->num_nodes; i++) {
3772                         header.destnode = i;
3773
3774                         ctdb_req_header_push(&header, buf, &np);
3775                         client_process_packet(req, buf, buflen);
3776                 }
3777                 return;
3778         }
3779
3780         if (header.destnode == CTDB_BROADCAST_CONNECTED) {
3781                 for (i=0; i<ctdb->node_map->num_nodes; i++) {
3782                         if (ctdb->node_map->node[i].flags &
3783                             NODE_FLAGS_DISCONNECTED) {
3784                                 continue;
3785                         }
3786
3787                         header.destnode = i;
3788
3789                         ctdb_req_header_push(&header, buf, &np);
3790                         client_process_packet(req, buf, buflen);
3791                 }
3792                 return;
3793         }
3794
3795         if (header.destnode > ctdb->node_map->num_nodes) {
3796                 fprintf(stderr, "Invalid destination pnn 0x%x\n",
3797                         header.destnode);
3798                 return;
3799         }
3800
3801
3802         if (ctdb->node_map->node[header.destnode].flags & NODE_FLAGS_DISCONNECTED) {
3803                 fprintf(stderr, "Packet for disconnected node pnn %u\n",
3804                         header.destnode);
3805                 return;
3806         }
3807
3808         ctdb_req_header_push(&header, buf, &np);
3809         client_process_packet(req, buf, buflen);
3810 }
3811
3812 static void client_dead_handler(void *private_data)
3813 {
3814         struct tevent_req *req = talloc_get_type_abort(
3815                 private_data, struct tevent_req);
3816
3817         tevent_req_done(req);
3818 }
3819
3820 static void client_process_packet(struct tevent_req *req,
3821                                   uint8_t *buf, size_t buflen)
3822 {
3823         struct ctdb_req_header header;
3824         size_t np;
3825         int ret;
3826
3827         ret = ctdb_req_header_pull(buf, buflen, &header, &np);
3828         if (ret != 0) {
3829                 return;
3830         }
3831
3832         switch (header.operation) {
3833         case CTDB_REQ_CALL:
3834                 client_process_call(req, buf, buflen);
3835                 break;
3836
3837         case CTDB_REQ_MESSAGE:
3838                 client_process_message(req, buf, buflen);
3839                 break;
3840
3841         case CTDB_REQ_CONTROL:
3842                 client_process_control(req, buf, buflen);
3843                 break;
3844
3845         default:
3846                 break;
3847         }
3848 }
3849
3850 static void client_process_call(struct tevent_req *req,
3851                                 uint8_t *buf, size_t buflen)
3852 {
3853         struct client_state *state = tevent_req_data(
3854                 req, struct client_state);
3855         struct ctdbd_context *ctdb = state->ctdb;
3856         TALLOC_CTX *mem_ctx;
3857         struct ctdb_req_header header;
3858         struct ctdb_req_call request;
3859         struct ctdb_reply_call reply;
3860         struct database *db;
3861         struct ctdb_ltdb_header hdr;
3862         TDB_DATA data;
3863         int ret;
3864
3865         mem_ctx = talloc_new(state);
3866         if (tevent_req_nomem(mem_ctx, req)) {
3867                 return;
3868         }
3869
3870         ret = ctdb_req_call_pull(buf, buflen, &header, mem_ctx, &request);
3871         if (ret != 0) {
3872                 talloc_free(mem_ctx);
3873                 tevent_req_error(req, ret);
3874                 return;
3875         }
3876
3877         header_fix_pnn(&header, ctdb);
3878
3879         if (header.destnode >= ctdb->node_map->num_nodes) {
3880                 goto fail;
3881         }
3882
3883         DEBUG(DEBUG_INFO, ("call db_id = %u\n", request.db_id));
3884
3885         db = database_find(ctdb->db_map, request.db_id);
3886         if (db == NULL) {
3887                 goto fail;
3888         }
3889
3890         ret = ltdb_fetch(db, request.key, &hdr, mem_ctx, &data);
3891         if (ret != 0) {
3892                 goto fail;
3893         }
3894
3895         /* Fake migration */
3896         if (hdr.dmaster != ctdb->node_map->pnn) {
3897                 hdr.dmaster = ctdb->node_map->pnn;
3898
3899                 ret = ltdb_store(db, request.key, &hdr, data);
3900                 if (ret != 0) {
3901                         goto fail;
3902                 }
3903         }
3904
3905         talloc_free(mem_ctx);
3906
3907         reply.status = 0;
3908         reply.data = tdb_null;
3909
3910         client_send_call(req, &header, &reply);
3911         return;
3912
3913 fail:
3914         talloc_free(mem_ctx);
3915         reply.status = -1;
3916         reply.data = tdb_null;
3917
3918         client_send_call(req, &header, &reply);
3919 }
3920
3921 static void client_process_message(struct tevent_req *req,
3922                                    uint8_t *buf, size_t buflen)
3923 {
3924         struct client_state *state = tevent_req_data(
3925                 req, struct client_state);
3926         struct ctdbd_context *ctdb = state->ctdb;
3927         TALLOC_CTX *mem_ctx;
3928         struct ctdb_req_header header;
3929         struct ctdb_req_message request;
3930         uint64_t srvid;
3931         int ret;
3932
3933         mem_ctx = talloc_new(state);
3934         if (tevent_req_nomem(mem_ctx, req)) {
3935                 return;
3936         }
3937
3938         ret = ctdb_req_message_pull(buf, buflen, &header, mem_ctx, &request);
3939         if (ret != 0) {
3940                 talloc_free(mem_ctx);
3941                 tevent_req_error(req, ret);
3942                 return;
3943         }
3944
3945         header_fix_pnn(&header, ctdb);
3946
3947         if (header.destnode >= ctdb->node_map->num_nodes) {
3948                 /* Many messages are not replied to, so just behave as
3949                  * though this message was not received */
3950                 fprintf(stderr, "Invalid node %d\n", header.destnode);
3951                 talloc_free(mem_ctx);
3952                 return;
3953         }
3954
3955         srvid = request.srvid;
3956         DEBUG(DEBUG_INFO, ("request srvid = 0x%"PRIx64"\n", srvid));
3957
3958         if (srvid == CTDB_SRVID_DISABLE_RECOVERIES) {
3959                 message_disable_recoveries(mem_ctx, req, &header, &request);
3960         } else if (srvid == CTDB_SRVID_TAKEOVER_RUN) {
3961                 message_takeover_run(mem_ctx, req, &header, &request);
3962         } else {
3963                 D_DEBUG("Message id 0x%"PRIx64" not implemented\n", srvid);
3964         }
3965
3966         /* check srvid */
3967         talloc_free(mem_ctx);
3968 }
3969
3970 static void client_process_control(struct tevent_req *req,
3971                                    uint8_t *buf, size_t buflen)
3972 {
3973         struct client_state *state = tevent_req_data(
3974                 req, struct client_state);
3975         struct ctdbd_context *ctdb = state->ctdb;
3976         TALLOC_CTX *mem_ctx;
3977         struct ctdb_req_header header;
3978         struct ctdb_req_control request;
3979         int ret;
3980
3981         mem_ctx = talloc_new(state);
3982         if (tevent_req_nomem(mem_ctx, req)) {
3983                 return;
3984         }
3985
3986         ret = ctdb_req_control_pull(buf, buflen, &header, mem_ctx, &request);
3987         if (ret != 0) {
3988                 talloc_free(mem_ctx);
3989                 tevent_req_error(req, ret);
3990                 return;
3991         }
3992
3993         header_fix_pnn(&header, ctdb);
3994
3995         if (header.destnode >= ctdb->node_map->num_nodes) {
3996                 struct ctdb_reply_control reply;
3997
3998                 reply.rdata.opcode = request.opcode;
3999                 reply.errmsg = "Invalid node";
4000                 reply.status = -1;
4001                 client_send_control(req, &header, &reply);
4002                 return;
4003         }
4004
4005         DEBUG(DEBUG_INFO, ("request opcode = %u, reqid = %u\n",
4006                            request.opcode, header.reqid));
4007
4008         if (fake_control_failure(mem_ctx, req, &header, &request)) {
4009                 goto done;
4010         }
4011
4012         switch (request.opcode) {
4013         case CTDB_CONTROL_PROCESS_EXISTS:
4014                 control_process_exists(mem_ctx, req, &header, &request);
4015                 break;
4016
4017         case CTDB_CONTROL_PING:
4018                 control_ping(mem_ctx, req, &header, &request);
4019                 break;
4020
4021         case CTDB_CONTROL_GETDBPATH:
4022                 control_getdbpath(mem_ctx, req, &header, &request);
4023                 break;
4024
4025         case CTDB_CONTROL_GETVNNMAP:
4026                 control_getvnnmap(mem_ctx, req, &header, &request);
4027                 break;
4028
4029         case CTDB_CONTROL_GET_DEBUG:
4030                 control_get_debug(mem_ctx, req, &header, &request);
4031                 break;
4032
4033         case CTDB_CONTROL_SET_DEBUG:
4034                 control_set_debug(mem_ctx, req, &header, &request);
4035                 break;
4036
4037         case CTDB_CONTROL_GET_DBMAP:
4038                 control_get_dbmap(mem_ctx, req, &header, &request);
4039                 break;
4040
4041         case CTDB_CONTROL_GET_RECMODE:
4042                 control_get_recmode(mem_ctx, req, &header, &request);
4043                 break;
4044
4045         case CTDB_CONTROL_SET_RECMODE:
4046                 control_set_recmode(mem_ctx, req, &header, &request);
4047                 break;
4048
4049         case CTDB_CONTROL_DB_ATTACH:
4050                 control_db_attach(mem_ctx, req, &header, &request);
4051                 break;
4052
4053         case CTDB_CONTROL_REGISTER_SRVID:
4054                 control_register_srvid(mem_ctx, req, &header, &request);
4055                 break;
4056
4057         case CTDB_CONTROL_DEREGISTER_SRVID:
4058                 control_deregister_srvid(mem_ctx, req, &header, &request);
4059                 break;
4060
4061         case CTDB_CONTROL_GET_DBNAME:
4062                 control_get_dbname(mem_ctx, req, &header, &request);
4063                 break;
4064
4065         case CTDB_CONTROL_GET_PID:
4066                 control_get_pid(mem_ctx, req, &header, &request);
4067                 break;
4068
4069         case CTDB_CONTROL_GET_RECMASTER:
4070                 control_get_recmaster(mem_ctx, req, &header, &request);
4071                 break;
4072
4073         case CTDB_CONTROL_GET_PNN:
4074                 control_get_pnn(mem_ctx, req, &header, &request);
4075                 break;
4076
4077         case CTDB_CONTROL_SHUTDOWN:
4078                 control_shutdown(mem_ctx, req, &header, &request);
4079                 break;
4080
4081         case CTDB_CONTROL_SET_TUNABLE:
4082                 control_set_tunable(mem_ctx, req, &header, &request);
4083                 break;
4084
4085         case CTDB_CONTROL_GET_TUNABLE:
4086                 control_get_tunable(mem_ctx, req, &header, &request);
4087                 break;
4088
4089         case CTDB_CONTROL_LIST_TUNABLES:
4090                 control_list_tunables(mem_ctx, req, &header, &request);
4091                 break;
4092
4093         case CTDB_CONTROL_MODIFY_FLAGS:
4094                 control_modify_flags(mem_ctx, req, &header, &request);
4095                 break;
4096
4097         case CTDB_CONTROL_GET_ALL_TUNABLES:
4098                 control_get_all_tunables(mem_ctx, req, &header, &request);
4099                 break;
4100
4101         case CTDB_CONTROL_DB_ATTACH_PERSISTENT:
4102                 control_db_attach_persistent(mem_ctx, req, &header, &request);
4103                 break;
4104
4105         case CTDB_CONTROL_UPTIME:
4106                 control_uptime(mem_ctx, req, &header, &request);
4107                 break;
4108
4109         case CTDB_CONTROL_RELOAD_NODES_FILE:
4110                 control_reload_nodes_file(mem_ctx, req, &header, &request);
4111                 break;
4112
4113         case CTDB_CONTROL_GET_CAPABILITIES:
4114                 control_get_capabilities(mem_ctx, req, &header, &request);
4115                 break;
4116
4117         case CTDB_CONTROL_RELEASE_IP:
4118                 control_release_ip(mem_ctx, req, &header, &request);
4119                 break;
4120
4121         case CTDB_CONTROL_TAKEOVER_IP:
4122                 control_takeover_ip(mem_ctx, req, &header, &request);
4123                 break;
4124
4125         case CTDB_CONTROL_GET_PUBLIC_IPS:
4126                 control_get_public_ips(mem_ctx, req, &header, &request);
4127                 break;
4128
4129         case CTDB_CONTROL_GET_NODEMAP:
4130                 control_get_nodemap(mem_ctx, req, &header, &request);
4131                 break;
4132
4133         case CTDB_CONTROL_GET_RECLOCK_FILE:
4134                 control_get_reclock_file(mem_ctx, req, &header, &request);
4135                 break;
4136
4137         case CTDB_CONTROL_STOP_NODE:
4138                 control_stop_node(mem_ctx, req, &header, &request);
4139                 break;
4140
4141         case CTDB_CONTROL_CONTINUE_NODE:
4142                 control_continue_node(mem_ctx, req, &header, &request);
4143                 break;
4144
4145         case CTDB_CONTROL_SET_BAN_STATE:
4146                 control_set_ban_state(mem_ctx, req, &header, &request);
4147                 break;
4148
4149         case CTDB_CONTROL_TRANS3_COMMIT:
4150                 control_trans3_commit(mem_ctx, req, &header, &request);
4151                 break;
4152
4153         case CTDB_CONTROL_GET_DB_SEQNUM:
4154                 control_get_db_seqnum(mem_ctx, req, &header, &request);
4155                 break;
4156
4157         case CTDB_CONTROL_DB_GET_HEALTH:
4158                 control_db_get_health(mem_ctx, req, &header, &request);
4159                 break;
4160
4161         case CTDB_CONTROL_GET_PUBLIC_IP_INFO:
4162                 control_get_public_ip_info(mem_ctx, req, &header, &request);
4163                 break;
4164
4165         case CTDB_CONTROL_GET_IFACES:
4166                 control_get_ifaces(mem_ctx, req, &header, &request);
4167                 break;
4168
4169         case CTDB_CONTROL_SET_IFACE_LINK_STATE:
4170                 control_set_iface_link_state(mem_ctx, req, &header, &request);
4171                 break;
4172
4173         case CTDB_CONTROL_SET_DB_READONLY:
4174                 control_set_db_readonly(mem_ctx, req, &header, &request);
4175                 break;
4176
4177         case CTDB_CONTROL_TRAVERSE_START_EXT:
4178                 control_traverse_start_ext(mem_ctx, req, &header, &request);
4179                 break;
4180
4181         case CTDB_CONTROL_SET_DB_STICKY:
4182                 control_set_db_sticky(mem_ctx, req, &header, &request);
4183                 break;
4184
4185         case CTDB_CONTROL_IPREALLOCATED:
4186                 control_ipreallocated(mem_ctx, req, &header, &request);
4187                 break;
4188
4189         case CTDB_CONTROL_GET_RUNSTATE:
4190                 control_get_runstate(mem_ctx, req, &header, &request);
4191                 break;
4192
4193         case CTDB_CONTROL_GET_NODES_FILE:
4194                 control_get_nodes_file(mem_ctx, req, &header, &request);
4195                 break;
4196
4197         case CTDB_CONTROL_DB_OPEN_FLAGS:
4198                 control_db_open_flags(mem_ctx, req, &header, &request);
4199                 break;
4200
4201         case CTDB_CONTROL_DB_ATTACH_REPLICATED:
4202                 control_db_attach_replicated(mem_ctx, req, &header, &request);
4203                 break;
4204
4205         case CTDB_CONTROL_CHECK_PID_SRVID:
4206                 control_check_pid_srvid(mem_ctx, req, &header, &request);
4207                 break;
4208
4209         default:
4210                 if (! (request.flags & CTDB_CTRL_FLAG_NOREPLY)) {
4211                         control_error(mem_ctx, req, &header, &request);
4212                 }
4213                 break;
4214         }
4215
4216 done:
4217         talloc_free(mem_ctx);
4218 }
4219
4220 static int client_recv(struct tevent_req *req, int *perr)
4221 {
4222         struct client_state *state = tevent_req_data(
4223                 req, struct client_state);
4224         int err;
4225
4226         DEBUG(DEBUG_INFO, ("Client done fd=%d\n", state->fd));
4227         close(state->fd);
4228
4229         if (tevent_req_is_unix_error(req, &err)) {
4230                 if (perr != NULL) {
4231                         *perr = err;
4232                 }
4233                 return -1;
4234         }
4235
4236         return state->status;
4237 }
4238
4239 /*
4240  * Fake CTDB server
4241  */
4242
4243 struct server_state {
4244         struct tevent_context *ev;
4245         struct ctdbd_context *ctdb;
4246         int fd;
4247 };
4248
4249 static void server_new_client(struct tevent_req *subreq);
4250 static void server_client_done(struct tevent_req *subreq);
4251
4252 static struct tevent_req *server_send(TALLOC_CTX *mem_ctx,
4253                                       struct tevent_context *ev,
4254                                       struct ctdbd_context *ctdb,
4255                                       int fd)
4256 {
4257         struct tevent_req *req, *subreq;
4258         struct server_state *state;
4259
4260         req = tevent_req_create(mem_ctx, &state, struct server_state);
4261         if (req == NULL) {
4262                 return NULL;
4263         }
4264
4265         state->ev = ev;
4266         state->ctdb = ctdb;
4267         state->fd = fd;
4268
4269         subreq = accept_send(state, ev, fd);
4270         if (tevent_req_nomem(subreq, req)) {
4271                 return tevent_req_post(req, ev);
4272         }
4273         tevent_req_set_callback(subreq, server_new_client, req);
4274
4275         return req;
4276 }
4277
4278 static void server_new_client(struct tevent_req *subreq)
4279 {
4280         struct tevent_req *req = tevent_req_callback_data(
4281                 subreq, struct tevent_req);
4282         struct server_state *state = tevent_req_data(
4283                 req, struct server_state);
4284         struct ctdbd_context *ctdb = state->ctdb;
4285         int client_fd;
4286         int ret = 0;
4287
4288         client_fd = accept_recv(subreq, NULL, NULL, &ret);
4289         TALLOC_FREE(subreq);
4290         if (client_fd == -1) {
4291                 tevent_req_error(req, ret);
4292                 return;
4293         }
4294
4295         subreq = client_send(state, state->ev, client_fd,
4296                              ctdb, ctdb->node_map->pnn);
4297         if (tevent_req_nomem(subreq, req)) {
4298                 return;
4299         }
4300         tevent_req_set_callback(subreq, server_client_done, req);
4301
4302         ctdb->num_clients += 1;
4303
4304         subreq = accept_send(state, state->ev, state->fd);
4305         if (tevent_req_nomem(subreq, req)) {
4306                 return;
4307         }
4308         tevent_req_set_callback(subreq, server_new_client, req);
4309 }
4310
4311 static void server_client_done(struct tevent_req *subreq)
4312 {
4313         struct tevent_req *req = tevent_req_callback_data(
4314                 subreq, struct tevent_req);
4315         struct server_state *state = tevent_req_data(
4316                 req, struct server_state);
4317         struct ctdbd_context *ctdb = state->ctdb;
4318         int ret = 0;
4319         int status;
4320
4321         status = client_recv(subreq, &ret);
4322         TALLOC_FREE(subreq);
4323         if (status < 0) {
4324                 tevent_req_error(req, ret);
4325                 return;
4326         }
4327
4328         ctdb->num_clients -= 1;
4329
4330         if (status == 99) {
4331                 /* Special status, to shutdown server */
4332                 DEBUG(DEBUG_INFO, ("Shutting down server\n"));
4333                 tevent_req_done(req);
4334         }
4335 }
4336
4337 static bool server_recv(struct tevent_req *req, int *perr)
4338 {
4339         int err;
4340
4341         if (tevent_req_is_unix_error(req, &err)) {
4342                 if (perr != NULL) {
4343                         *perr = err;
4344                 }
4345                 return false;
4346         }
4347         return true;
4348 }
4349
4350 /*
4351  * Main functions
4352  */
4353
4354 static int socket_init(const char *sockpath)
4355 {
4356         struct sockaddr_un addr;
4357         size_t len;
4358         int ret, fd;
4359
4360         memset(&addr, 0, sizeof(addr));
4361         addr.sun_family = AF_UNIX;
4362
4363         len = strlcpy(addr.sun_path, sockpath, sizeof(addr.sun_path));
4364         if (len >= sizeof(addr.sun_path)) {
4365                 fprintf(stderr, "path too long: %s\n", sockpath);
4366                 return -1;
4367         }
4368
4369         fd = socket(AF_UNIX, SOCK_STREAM, 0);
4370         if (fd == -1) {
4371                 fprintf(stderr, "socket failed - %s\n", sockpath);
4372                 return -1;
4373         }
4374
4375         ret = bind(fd, (struct sockaddr *)&addr, sizeof(addr));
4376         if (ret != 0) {
4377                 fprintf(stderr, "bind failed - %s\n", sockpath);
4378                 goto fail;
4379         }
4380
4381         ret = listen(fd, 10);
4382         if (ret != 0) {
4383                 fprintf(stderr, "listen failed\n");
4384                 goto fail;
4385         }
4386
4387         DEBUG(DEBUG_INFO, ("Socket init done\n"));
4388
4389         return fd;
4390
4391 fail:
4392         if (fd != -1) {
4393                 close(fd);
4394         }
4395         return -1;
4396 }
4397
4398 static struct options {
4399         const char *dbdir;
4400         const char *sockpath;
4401         const char *pidfile;
4402         const char *debuglevel;
4403 } options;
4404
4405 static struct poptOption cmdline_options[] = {
4406         POPT_AUTOHELP
4407         { "dbdir", 'D', POPT_ARG_STRING, &options.dbdir, 0,
4408                 "Database directory", "directory" },
4409         { "socket", 's', POPT_ARG_STRING, &options.sockpath, 0,
4410                 "Unix domain socket path", "filename" },
4411         { "pidfile", 'p', POPT_ARG_STRING, &options.pidfile, 0,
4412                 "pid file", "filename" } ,
4413         { "debug", 'd', POPT_ARG_STRING, &options.debuglevel, 0,
4414                 "debug level", "ERR|WARNING|NOTICE|INFO|DEBUG" } ,
4415         POPT_TABLEEND
4416 };
4417
4418 static void cleanup(void)
4419 {
4420         unlink(options.sockpath);
4421         unlink(options.pidfile);
4422 }
4423
4424 static void signal_handler(int sig)
4425 {
4426         cleanup();
4427         exit(0);
4428 }
4429
4430 static void start_server(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
4431                          struct ctdbd_context *ctdb, int fd, int pfd)
4432 {
4433         struct tevent_req *req;
4434         int ret = 0;
4435         ssize_t len;
4436
4437         atexit(cleanup);
4438         signal(SIGTERM, signal_handler);
4439
4440         req = server_send(mem_ctx, ev, ctdb, fd);
4441         if (req == NULL) {
4442                 fprintf(stderr, "Memory error\n");
4443                 exit(1);
4444         }
4445
4446         len = write(pfd, &ret, sizeof(ret));
4447         if (len != sizeof(ret)) {
4448                 fprintf(stderr, "Failed to send message to parent\n");
4449                 exit(1);
4450         }
4451         close(pfd);
4452
4453         tevent_req_poll(req, ev);
4454
4455         server_recv(req, &ret);
4456         if (ret != 0) {
4457                 exit(1);
4458         }
4459 }
4460
4461 int main(int argc, const char *argv[])
4462 {
4463         TALLOC_CTX *mem_ctx;
4464         struct ctdbd_context *ctdb;
4465         struct tevent_context *ev;
4466         poptContext pc;
4467         int opt, fd, ret, pfd[2];
4468         ssize_t len;
4469         pid_t pid;
4470         FILE *fp;
4471
4472         pc = poptGetContext(argv[0], argc, argv, cmdline_options,
4473                             POPT_CONTEXT_KEEP_FIRST);
4474         while ((opt = poptGetNextOpt(pc)) != -1) {
4475                 fprintf(stderr, "Invalid option %s\n", poptBadOption(pc, 0));
4476                 exit(1);
4477         }
4478
4479         if (options.dbdir == NULL) {
4480                 fprintf(stderr, "Please specify database directory\n");
4481                 poptPrintHelp(pc, stdout, 0);
4482                 exit(1);
4483         }
4484
4485         if (options.sockpath == NULL) {
4486                 fprintf(stderr, "Please specify socket path\n");
4487                 poptPrintHelp(pc, stdout, 0);
4488                 exit(1);
4489         }
4490
4491         if (options.pidfile == NULL) {
4492                 fprintf(stderr, "Please specify pid file\n");
4493                 poptPrintHelp(pc, stdout, 0);
4494                 exit(1);
4495         }
4496
4497         mem_ctx = talloc_new(NULL);
4498         if (mem_ctx == NULL) {
4499                 fprintf(stderr, "Memory error\n");
4500                 exit(1);
4501         }
4502
4503         ret = logging_init(mem_ctx, "file:", options.debuglevel, "fake-ctdbd");
4504         if (ret != 0) {
4505                 fprintf(stderr, "Invalid debug level\n");
4506                 poptPrintHelp(pc, stdout, 0);
4507                 exit(1);
4508         }
4509
4510         ctdb = ctdbd_setup(mem_ctx, options.dbdir);
4511         if (ctdb == NULL) {
4512                 exit(1);
4513         }
4514
4515         if (! ctdbd_verify(ctdb)) {
4516                 exit(1);
4517         }
4518
4519         ev = tevent_context_init(mem_ctx);
4520         if (ev == NULL) {
4521                 fprintf(stderr, "Memory error\n");
4522                 exit(1);
4523         }
4524
4525         fd = socket_init(options.sockpath);
4526         if (fd == -1) {
4527                 exit(1);
4528         }
4529
4530         ret = pipe(pfd);
4531         if (ret != 0) {
4532                 fprintf(stderr, "Failed to create pipe\n");
4533                 cleanup();
4534                 exit(1);
4535         }
4536
4537         pid = fork();
4538         if (pid == -1) {
4539                 fprintf(stderr, "Failed to fork\n");
4540                 cleanup();
4541                 exit(1);
4542         }
4543
4544         if (pid == 0) {
4545                 /* Child */
4546                 close(pfd[0]);
4547                 start_server(mem_ctx, ev, ctdb, fd, pfd[1]);
4548                 exit(1);
4549         }
4550
4551         /* Parent */
4552         close(pfd[1]);
4553
4554         len = read(pfd[0], &ret, sizeof(ret));
4555         close(pfd[0]);
4556         if (len != sizeof(ret)) {
4557                 fprintf(stderr, "len = %zi\n", len);
4558                 fprintf(stderr, "Failed to get message from child\n");
4559                 kill(pid, SIGTERM);
4560                 exit(1);
4561         }
4562
4563         fp = fopen(options.pidfile, "w");
4564         if (fp == NULL) {
4565                 fprintf(stderr, "Failed to open pid file %s\n",
4566                         options.pidfile);
4567                 kill(pid, SIGTERM);
4568                 exit(1);
4569         }
4570         fprintf(fp, "%d\n", pid);
4571         fclose(fp);
4572
4573         return 0;
4574 }