libndr: Avoid assigning duplicate versions to symbols
[amitay/samba.git] / ctdb / tests / src / fake_ctdbd.c
1 /*
2    Fake CTDB server for testing
3
4    Copyright (C) Amitay Isaacs  2016
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "replace.h"
21 #include "system/network.h"
22 #include "system/time.h"
23 #include "system/filesys.h"
24
25 #include <popt.h>
26 #include <talloc.h>
27 #include <tevent.h>
28 #include <tdb.h>
29
30 #include "lib/util/dlinklist.h"
31 #include "lib/util/tevent_unix.h"
32 #include "lib/util/debug.h"
33 #include "lib/util/samba_util.h"
34 #include "lib/async_req/async_sock.h"
35
36 #include "protocol/protocol.h"
37 #include "protocol/protocol_api.h"
38 #include "protocol/protocol_util.h"
39 #include "protocol/protocol_private.h"
40
41 #include "common/comm.h"
42 #include "common/logging.h"
43 #include "common/tunable.h"
44 #include "common/srvid.h"
45 #include "common/system.h"
46
47 #include "ipalloc_read_known_ips.h"
48
49
50 #define CTDB_PORT 4379
51
52 /* A fake flag that is only supported by some functions */
53 #define NODE_FLAGS_FAKE_TIMEOUT 0x80000000
54
55 struct node {
56         ctdb_sock_addr addr;
57         uint32_t pnn;
58         uint32_t flags;
59         uint32_t capabilities;
60         bool recovery_disabled;
61         void *recovery_substate;
62 };
63
64 struct node_map {
65         uint32_t num_nodes;
66         struct node *node;
67         uint32_t pnn;
68         uint32_t recmaster;
69 };
70
71 struct interface {
72         const char *name;
73         bool link_up;
74         uint32_t references;
75 };
76
77 struct interface_map {
78         int num;
79         struct interface *iface;
80 };
81
82 struct vnn_map {
83         uint32_t recmode;
84         uint32_t generation;
85         uint32_t size;
86         uint32_t *map;
87 };
88
89 struct database {
90         struct database *prev, *next;
91         const char *name;
92         const char *path;
93         struct tdb_context *tdb;
94         uint32_t id;
95         uint8_t flags;
96         uint64_t seq_num;
97 };
98
99 struct database_map {
100         struct database *db;
101         const char *dbdir;
102 };
103
104 struct fake_control_failure {
105         struct fake_control_failure  *prev, *next;
106         enum ctdb_controls opcode;
107         uint32_t pnn;
108         const char *error;
109         const char *comment;
110 };
111
112 struct ctdb_client {
113         struct ctdb_client *prev, *next;
114         struct ctdbd_context *ctdb;
115         pid_t pid;
116         void *state;
117 };
118
119 struct ctdbd_context {
120         struct node_map *node_map;
121         struct interface_map *iface_map;
122         struct vnn_map *vnn_map;
123         struct database_map *db_map;
124         struct srvid_context *srv;
125         int num_clients;
126         struct timeval start_time;
127         struct timeval recovery_start_time;
128         struct timeval recovery_end_time;
129         bool takeover_disabled;
130         int log_level;
131         enum ctdb_runstate runstate;
132         struct ctdb_tunable_list tun_list;
133         char *reclock;
134         struct ctdb_public_ip_list *known_ips;
135         struct fake_control_failure *control_failures;
136         struct ctdb_client *client_list;
137 };
138
139 /*
140  * Parse routines
141  */
142
143 static struct node_map *nodemap_init(TALLOC_CTX *mem_ctx)
144 {
145         struct node_map *node_map;
146
147         node_map = talloc_zero(mem_ctx, struct node_map);
148         if (node_map == NULL) {
149                 return NULL;
150         }
151
152         node_map->pnn = CTDB_UNKNOWN_PNN;
153         node_map->recmaster = CTDB_UNKNOWN_PNN;
154
155         return node_map;
156 }
157
158 /* Read a nodemap from stdin.  Each line looks like:
159  *  <PNN> <FLAGS> [RECMASTER] [CURRENT] [CAPABILITIES]
160  * EOF or a blank line terminates input.
161  *
162  * By default, capablities for each node are
163  * CTDB_CAP_RECMASTER|CTDB_CAP_LMASTER.  These 2
164  * capabilities can be faked off by adding, for example,
165  * -CTDB_CAP_RECMASTER.
166  */
167
168 static bool nodemap_parse(struct node_map *node_map)
169 {
170         char line[1024];
171
172         while ((fgets(line, sizeof(line), stdin) != NULL)) {
173                 uint32_t pnn, flags, capabilities;
174                 char *tok, *t;
175                 char *ip;
176                 ctdb_sock_addr saddr;
177                 struct node *node;
178                 int ret;
179
180                 if (line[0] == '\n') {
181                         break;
182                 }
183
184                 /* Get rid of pesky newline */
185                 if ((t = strchr(line, '\n')) != NULL) {
186                         *t = '\0';
187                 }
188
189                 /* Get PNN */
190                 tok = strtok(line, " \t");
191                 if (tok == NULL) {
192                         fprintf(stderr, "bad line (%s) - missing PNN\n", line);
193                         continue;
194                 }
195                 pnn = (uint32_t)strtoul(tok, NULL, 0);
196
197                 /* Get IP */
198                 tok = strtok(NULL, " \t");
199                 if (tok == NULL) {
200                         fprintf(stderr, "bad line (%s) - missing IP\n", line);
201                         continue;
202                 }
203                 ret = ctdb_sock_addr_from_string(tok, &saddr, false);
204                 if (ret != 0) {
205                         fprintf(stderr, "bad line (%s) - invalid IP\n", line);
206                         continue;
207                 }
208                 ctdb_sock_addr_set_port(&saddr, CTDB_PORT);
209                 ip = talloc_strdup(node_map, tok);
210                 if (ip == NULL) {
211                         goto fail;
212                 }
213
214                 /* Get flags */
215                 tok = strtok(NULL, " \t");
216                 if (tok == NULL) {
217                         fprintf(stderr, "bad line (%s) - missing flags\n",
218                                 line);
219                         continue;
220                 }
221                 flags = (uint32_t)strtoul(tok, NULL, 0);
222                 /* Handle deleted nodes */
223                 if (flags & NODE_FLAGS_DELETED) {
224                         talloc_free(ip);
225                         ip = talloc_strdup(node_map, "0.0.0.0");
226                         if (ip == NULL) {
227                                 goto fail;
228                         }
229                 }
230                 capabilities = CTDB_CAP_RECMASTER|CTDB_CAP_LMASTER;
231
232                 tok = strtok(NULL, " \t");
233                 while (tok != NULL) {
234                         if (strcmp(tok, "CURRENT") == 0) {
235                                 node_map->pnn = pnn;
236                         } else if (strcmp(tok, "RECMASTER") == 0) {
237                                 node_map->recmaster = pnn;
238                         } else if (strcmp(tok, "-CTDB_CAP_RECMASTER") == 0) {
239                                 capabilities &= ~CTDB_CAP_RECMASTER;
240                         } else if (strcmp(tok, "-CTDB_CAP_LMASTER") == 0) {
241                                 capabilities &= ~CTDB_CAP_LMASTER;
242                         } else if (strcmp(tok, "TIMEOUT") == 0) {
243                                 /* This can be done with just a flag
244                                  * value but it is probably clearer
245                                  * and less error-prone to fake this
246                                  * with an explicit token */
247                                 flags |= NODE_FLAGS_FAKE_TIMEOUT;
248                         }
249                         tok = strtok(NULL, " \t");
250                 }
251
252                 node_map->node = talloc_realloc(node_map, node_map->node,
253                                                 struct node,
254                                                 node_map->num_nodes + 1);
255                 if (node_map->node == NULL) {
256                         goto fail;
257                 }
258                 node = &node_map->node[node_map->num_nodes];
259
260                 ret = ctdb_sock_addr_from_string(ip, &node->addr, false);
261                 if (ret != 0) {
262                         fprintf(stderr, "bad line (%s) - invalid IP\n", line);
263                         continue;
264                 }
265                 ctdb_sock_addr_set_port(&node->addr, CTDB_PORT);
266                 node->pnn = pnn;
267                 node->flags = flags;
268                 node->capabilities = capabilities;
269                 node->recovery_disabled = false;
270                 node->recovery_substate = NULL;
271
272                 node_map->num_nodes += 1;
273         }
274
275         DEBUG(DEBUG_INFO, ("Parsing nodemap done\n"));
276         return true;
277
278 fail:
279         DEBUG(DEBUG_INFO, ("Parsing nodemap failed\n"));
280         return false;
281
282 }
283
284 /* Append a node to a node map with given address and flags */
285 static bool node_map_add(struct ctdb_node_map *nodemap,
286                          const char *nstr, uint32_t flags)
287 {
288         ctdb_sock_addr addr;
289         uint32_t num;
290         struct ctdb_node_and_flags *n;
291         int ret;
292
293         ret = ctdb_sock_addr_from_string(nstr, &addr, false);
294         if (ret != 0) {
295                 fprintf(stderr, "Invalid IP address %s\n", nstr);
296                 return false;
297         }
298         ctdb_sock_addr_set_port(&addr, CTDB_PORT);
299
300         num = nodemap->num;
301         nodemap->node = talloc_realloc(nodemap, nodemap->node,
302                                        struct ctdb_node_and_flags, num+1);
303         if (nodemap->node == NULL) {
304                 return false;
305         }
306
307         n = &nodemap->node[num];
308         n->addr = addr;
309         n->pnn = num;
310         n->flags = flags;
311
312         nodemap->num = num+1;
313         return true;
314 }
315
316 /* Read a nodes file into a node map */
317 static struct ctdb_node_map *ctdb_read_nodes_file(TALLOC_CTX *mem_ctx,
318                                                   const char *nlist)
319 {
320         char **lines;
321         int nlines;
322         int i;
323         struct ctdb_node_map *nodemap;
324
325         nodemap = talloc_zero(mem_ctx, struct ctdb_node_map);
326         if (nodemap == NULL) {
327                 return NULL;
328         }
329
330         lines = file_lines_load(nlist, &nlines, 0, mem_ctx);
331         if (lines == NULL) {
332                 return NULL;
333         }
334
335         while (nlines > 0 && strcmp(lines[nlines-1], "") == 0) {
336                 nlines--;
337         }
338
339         for (i=0; i<nlines; i++) {
340                 char *node;
341                 uint32_t flags;
342                 size_t len;
343
344                 node = lines[i];
345                 /* strip leading spaces */
346                 while((*node == ' ') || (*node == '\t')) {
347                         node++;
348                 }
349
350                 len = strlen(node);
351
352                 /* strip trailing spaces */
353                 while ((len > 1) &&
354                        ((node[len-1] == ' ') || (node[len-1] == '\t')))
355                 {
356                         node[len-1] = '\0';
357                         len--;
358                 }
359
360                 if (len == 0) {
361                         continue;
362                 }
363                 if (*node == '#') {
364                         /* A "deleted" node is a node that is
365                            commented out in the nodes file.  This is
366                            used instead of removing a line, which
367                            would cause subsequent nodes to change
368                            their PNN. */
369                         flags = NODE_FLAGS_DELETED;
370                         node = discard_const("0.0.0.0");
371                 } else {
372                         flags = 0;
373                 }
374                 if (! node_map_add(nodemap, node, flags)) {
375                         talloc_free(lines);
376                         TALLOC_FREE(nodemap);
377                         return NULL;
378                 }
379         }
380
381         talloc_free(lines);
382         return nodemap;
383 }
384
385 static struct ctdb_node_map *read_nodes_file(TALLOC_CTX *mem_ctx,
386                                              uint32_t pnn)
387 {
388         struct ctdb_node_map *nodemap;
389         char nodes_list[PATH_MAX];
390         const char *ctdb_base;
391         int num;
392
393         ctdb_base = getenv("CTDB_BASE");
394         if (ctdb_base == NULL) {
395                 D_ERR("CTDB_BASE is not set\n");
396                 return NULL;
397         }
398
399         /* read optional node-specific nodes file */
400         num = snprintf(nodes_list, sizeof(nodes_list),
401                        "%s/nodes.%d", ctdb_base, pnn);
402         if (num == sizeof(nodes_list)) {
403                 D_ERR("nodes file path too long\n");
404                 return NULL;
405         }
406         nodemap = ctdb_read_nodes_file(mem_ctx, nodes_list);
407         if (nodemap != NULL) {
408                 /* Fake a load failure for an empty nodemap */
409                 if (nodemap->num == 0) {
410                         talloc_free(nodemap);
411
412                         D_ERR("Failed to read nodes file \"%s\"\n", nodes_list);
413                         return NULL;
414                 }
415
416                 return nodemap;
417         }
418
419         /* read normal nodes file */
420         num = snprintf(nodes_list, sizeof(nodes_list), "%s/nodes", ctdb_base);
421         if (num == sizeof(nodes_list)) {
422                 D_ERR("nodes file path too long\n");
423                 return NULL;
424         }
425         nodemap = ctdb_read_nodes_file(mem_ctx, nodes_list);
426         if (nodemap != NULL) {
427                 return nodemap;
428         }
429
430         DBG_ERR("Failed to read nodes file \"%s\"\n", nodes_list);
431         return NULL;
432 }
433
434 static struct interface_map *interfaces_init(TALLOC_CTX *mem_ctx)
435 {
436         struct interface_map *iface_map;
437
438         iface_map = talloc_zero(mem_ctx, struct interface_map);
439         if (iface_map == NULL) {
440                 return NULL;
441         }
442
443         return iface_map;
444 }
445
446 /* Read interfaces information.  Same format as "ctdb ifaces -Y"
447  * output:
448  *   :Name:LinkStatus:References:
449  *   :eth2:1:4294967294
450  *   :eth1:1:4294967292
451  */
452
453 static bool interfaces_parse(struct interface_map *iface_map)
454 {
455         char line[1024];
456
457         while ((fgets(line, sizeof(line), stdin) != NULL)) {
458                 uint16_t link_state;
459                 uint32_t references;
460                 char *tok, *t, *name;
461                 struct interface *iface;
462
463                 if (line[0] == '\n') {
464                         break;
465                 }
466
467                 /* Get rid of pesky newline */
468                 if ((t = strchr(line, '\n')) != NULL) {
469                         *t = '\0';
470                 }
471
472                 if (strcmp(line, ":Name:LinkStatus:References:") == 0) {
473                         continue;
474                 }
475
476                 /* Leading colon... */
477                 // tok = strtok(line, ":");
478
479                 /* name */
480                 tok = strtok(line, ":");
481                 if (tok == NULL) {
482                         fprintf(stderr, "bad line (%s) - missing name\n", line);
483                         continue;
484                 }
485                 name = tok;
486
487                 /* link_state */
488                 tok = strtok(NULL, ":");
489                 if (tok == NULL) {
490                         fprintf(stderr, "bad line (%s) - missing link state\n",
491                                 line);
492                         continue;
493                 }
494                 link_state = (uint16_t)strtoul(tok, NULL, 0);
495
496                 /* references... */
497                 tok = strtok(NULL, ":");
498                 if (tok == NULL) {
499                         fprintf(stderr, "bad line (%s) - missing references\n",
500                                 line);
501                         continue;
502                 }
503                 references = (uint32_t)strtoul(tok, NULL, 0);
504
505                 iface_map->iface = talloc_realloc(iface_map, iface_map->iface,
506                                                   struct interface,
507                                                   iface_map->num + 1);
508                 if (iface_map->iface == NULL) {
509                         goto fail;
510                 }
511
512                 iface = &iface_map->iface[iface_map->num];
513
514                 iface->name = talloc_strdup(iface_map, name);
515                 if (iface->name == NULL) {
516                         goto fail;
517                 }
518                 iface->link_up = link_state;
519                 iface->references = references;
520
521                 iface_map->num += 1;
522         }
523
524         DEBUG(DEBUG_INFO, ("Parsing interfaces done\n"));
525         return true;
526
527 fail:
528         fprintf(stderr, "Parsing interfaces failed\n");
529         return false;
530 }
531
532 static struct vnn_map *vnnmap_init(TALLOC_CTX *mem_ctx)
533 {
534         struct vnn_map *vnn_map;
535
536         vnn_map = talloc_zero(mem_ctx, struct vnn_map);
537         if (vnn_map == NULL) {
538                 fprintf(stderr, "Memory error\n");
539                 return NULL;
540         }
541         vnn_map->recmode = CTDB_RECOVERY_ACTIVE;
542         vnn_map->generation = INVALID_GENERATION;
543
544         return vnn_map;
545 }
546
547 /* Read vnn map.
548  * output:
549  *   <GENERATION>
550  *   <LMASTER0>
551  *   <LMASTER1>
552  *   ...
553  */
554
555 static bool vnnmap_parse(struct vnn_map *vnn_map)
556 {
557         char line[1024];
558
559         while (fgets(line, sizeof(line), stdin) != NULL) {
560                 uint32_t n;
561                 char *t;
562
563                 if (line[0] == '\n') {
564                         break;
565                 }
566
567                 /* Get rid of pesky newline */
568                 if ((t = strchr(line, '\n')) != NULL) {
569                         *t = '\0';
570                 }
571
572                 n = (uint32_t) strtol(line, NULL, 0);
573
574                 /* generation */
575                 if (vnn_map->generation == INVALID_GENERATION) {
576                         vnn_map->generation = n;
577                         continue;
578                 }
579
580                 vnn_map->map = talloc_realloc(vnn_map, vnn_map->map, uint32_t,
581                                               vnn_map->size + 1);
582                 if (vnn_map->map == NULL) {
583                         fprintf(stderr, "Memory error\n");
584                         goto fail;
585                 }
586
587                 vnn_map->map[vnn_map->size] = n;
588                 vnn_map->size += 1;
589         }
590
591         DEBUG(DEBUG_INFO, ("Parsing vnnmap done\n"));
592         return true;
593
594 fail:
595         fprintf(stderr, "Parsing vnnmap failed\n");
596         return false;
597 }
598
599 static bool reclock_parse(struct ctdbd_context *ctdb)
600 {
601         char line[1024];
602         char *t;
603
604         if (fgets(line, sizeof(line), stdin) == NULL) {
605                 goto fail;
606         }
607
608         if (line[0] == '\n') {
609                 /* Recovery lock remains unset */
610                 goto ok;
611         }
612
613         /* Get rid of pesky newline */
614         if ((t = strchr(line, '\n')) != NULL) {
615                 *t = '\0';
616         }
617
618         ctdb->reclock = talloc_strdup(ctdb, line);
619         if (ctdb->reclock == NULL) {
620                 goto fail;
621         }
622 ok:
623         /* Swallow possible blank line following section.  Picky
624          * compiler settings don't allow the return value to be
625          * ignored, so make the compiler happy.
626          */
627         if (fgets(line, sizeof(line), stdin) == NULL) {
628                 ;
629         }
630         DEBUG(DEBUG_INFO, ("Parsing reclock done\n"));
631         return true;
632
633 fail:
634         fprintf(stderr, "Parsing reclock failed\n");
635         return false;
636 }
637
638 static struct database_map *dbmap_init(TALLOC_CTX *mem_ctx,
639                                        const char *dbdir)
640 {
641         struct database_map *db_map;
642
643         db_map = talloc_zero(mem_ctx, struct database_map);
644         if (db_map == NULL) {
645                 return NULL;
646         }
647
648         db_map->dbdir = talloc_strdup(db_map, dbdir);
649         if (db_map->dbdir == NULL) {
650                 talloc_free(db_map);
651                 return NULL;
652         }
653
654         return db_map;
655 }
656
657 /* Read a database map from stdin.  Each line looks like:
658  *  <ID> <NAME> [FLAGS] [SEQ_NUM]
659  * EOF or a blank line terminates input.
660  *
661  * By default, flags and seq_num are 0
662  */
663
664 static bool dbmap_parse(struct database_map *db_map)
665 {
666         char line[1024];
667
668         while ((fgets(line, sizeof(line), stdin) != NULL)) {
669                 uint32_t id;
670                 uint8_t flags = 0;
671                 uint32_t seq_num = 0;
672                 char *tok, *t;
673                 char *name;
674                 struct database *db;
675
676                 if (line[0] == '\n') {
677                         break;
678                 }
679
680                 /* Get rid of pesky newline */
681                 if ((t = strchr(line, '\n')) != NULL) {
682                         *t = '\0';
683                 }
684
685                 /* Get ID */
686                 tok = strtok(line, " \t");
687                 if (tok == NULL) {
688                         fprintf(stderr, "bad line (%s) - missing ID\n", line);
689                         continue;
690                 }
691                 id = (uint32_t)strtoul(tok, NULL, 0);
692
693                 /* Get NAME */
694                 tok = strtok(NULL, " \t");
695                 if (tok == NULL) {
696                         fprintf(stderr, "bad line (%s) - missing NAME\n", line);
697                         continue;
698                 }
699                 name = talloc_strdup(db_map, tok);
700                 if (name == NULL) {
701                         goto fail;
702                 }
703
704                 /* Get flags */
705                 tok = strtok(NULL, " \t");
706                 while (tok != NULL) {
707                         if (strcmp(tok, "PERSISTENT") == 0) {
708                                 flags |= CTDB_DB_FLAGS_PERSISTENT;
709                         } else if (strcmp(tok, "STICKY") == 0) {
710                                 flags |= CTDB_DB_FLAGS_STICKY;
711                         } else if (strcmp(tok, "READONLY") == 0) {
712                                 flags |= CTDB_DB_FLAGS_READONLY;
713                         } else if (strcmp(tok, "REPLICATED") == 0) {
714                                 flags |= CTDB_DB_FLAGS_REPLICATED;
715                         } else if (tok[0] >= '0'&& tok[0] <= '9') {
716                                 uint8_t nv = CTDB_DB_FLAGS_PERSISTENT |
717                                              CTDB_DB_FLAGS_REPLICATED;
718
719                                 if ((flags & nv) == 0) {
720                                         fprintf(stderr,
721                                                 "seq_num for volatile db\n");
722                                         goto fail;
723                                 }
724                                 seq_num = (uint64_t)strtoull(tok, NULL, 0);
725                         }
726
727                         tok = strtok(NULL, " \t");
728                 }
729
730                 db = talloc_zero(db_map, struct database);
731                 if (db == NULL) {
732                         goto fail;
733                 }
734
735                 db->id = id;
736                 db->name = talloc_steal(db, name);
737                 db->path = talloc_asprintf(db, "%s/%s", db_map->dbdir, name);
738                 if (db->path == NULL) {
739                         talloc_free(db);
740                         goto fail;
741                 }
742                 db->flags = flags;
743                 db->seq_num = seq_num;
744
745                 DLIST_ADD_END(db_map->db, db);
746         }
747
748         DEBUG(DEBUG_INFO, ("Parsing dbmap done\n"));
749         return true;
750
751 fail:
752         DEBUG(DEBUG_INFO, ("Parsing dbmap failed\n"));
753         return false;
754
755 }
756
757 static struct database *database_find(struct database_map *db_map,
758                                       uint32_t db_id)
759 {
760         struct database *db;
761
762         for (db = db_map->db; db != NULL; db = db->next) {
763                 if (db->id == db_id) {
764                         return db;
765                 }
766         }
767
768         return NULL;
769 }
770
771 static int database_count(struct database_map *db_map)
772 {
773         struct database *db;
774         int count = 0;
775
776         for (db = db_map->db; db != NULL; db = db->next) {
777                 count += 1;
778         }
779
780         return count;
781 }
782
783 static int database_flags(uint8_t db_flags)
784 {
785         int tdb_flags = 0;
786
787         if (db_flags & CTDB_DB_FLAGS_PERSISTENT) {
788                 tdb_flags = TDB_DEFAULT;
789         } else {
790                 /* volatile and replicated use the same flags */
791                 tdb_flags = TDB_NOSYNC |
792                             TDB_CLEAR_IF_FIRST |
793                             TDB_INCOMPATIBLE_HASH;
794         }
795
796         tdb_flags |= TDB_DISALLOW_NESTING;
797
798         return tdb_flags;
799 }
800
801 static struct database *database_new(struct database_map *db_map,
802                                      const char *name, uint8_t flags)
803 {
804         struct database *db;
805         TDB_DATA key;
806         int tdb_flags;
807
808         db = talloc_zero(db_map, struct database);
809         if (db == NULL) {
810                 return NULL;
811         }
812
813         db->name = talloc_strdup(db, name);
814         if (db->name == NULL) {
815                 goto fail;
816         }
817
818         db->path = talloc_asprintf(db, "%s/%s", db_map->dbdir, name);
819         if (db->path == NULL) {
820                 goto fail;
821         }
822
823         key.dsize = strlen(db->name) + 1;
824         key.dptr = discard_const(db->name);
825
826         db->id = tdb_jenkins_hash(&key);
827         db->flags = flags;
828
829         tdb_flags = database_flags(flags);
830
831         db->tdb = tdb_open(db->path, 8192, tdb_flags, O_CREAT|O_RDWR, 0644);
832         if (db->tdb == NULL) {
833                 DBG_ERR("tdb_open\n");
834                 goto fail;
835         }
836
837         DLIST_ADD_END(db_map->db, db);
838         return db;
839
840 fail:
841         DBG_ERR("Memory error\n");
842         talloc_free(db);
843         return NULL;
844
845 }
846
847 static int ltdb_store(struct database *db, TDB_DATA key,
848                       struct ctdb_ltdb_header *header, TDB_DATA data)
849 {
850         int ret;
851         bool db_volatile = true;
852         bool keep = false;
853
854         if (db->tdb == NULL) {
855                 return EINVAL;
856         }
857
858         if ((db->flags & CTDB_DB_FLAGS_PERSISTENT) ||
859             (db->flags & CTDB_DB_FLAGS_REPLICATED)) {
860                 db_volatile = false;
861         }
862
863         if (data.dsize > 0) {
864                 keep = true;
865         } else {
866                 if (db_volatile && header->rsn == 0) {
867                         keep = true;
868                 }
869         }
870
871         if (keep) {
872                 TDB_DATA rec[2];
873
874                 rec[0].dsize = ctdb_ltdb_header_len(header);
875                 rec[0].dptr = (uint8_t *)header;
876
877                 rec[1].dsize = data.dsize;
878                 rec[1].dptr = data.dptr;
879
880                 ret = tdb_storev(db->tdb, key, rec, 2, TDB_REPLACE);
881         } else {
882                 if (header->rsn > 0) {
883                         ret = tdb_delete(db->tdb, key);
884                 } else {
885                         ret = 0;
886                 }
887         }
888
889         return ret;
890 }
891
892 static int ltdb_fetch(struct database *db, TDB_DATA key,
893                       struct ctdb_ltdb_header *header,
894                       TALLOC_CTX *mem_ctx, TDB_DATA *data)
895 {
896         TDB_DATA rec;
897         size_t np;
898         int ret;
899
900         if (db->tdb == NULL) {
901                 return EINVAL;
902         }
903
904         rec = tdb_fetch(db->tdb, key);
905         ret = ctdb_ltdb_header_pull(rec.dptr, rec.dsize, header, &np);
906         if (ret != 0) {
907                 if (rec.dptr != NULL) {
908                         free(rec.dptr);
909                 }
910
911                 *header = (struct ctdb_ltdb_header) {
912                         .rsn = 0,
913                         .dmaster = 0,
914                         .flags = 0,
915                 };
916
917                 ret = ltdb_store(db, key, header, tdb_null);
918                 if (ret != 0) {
919                         return ret;
920                 }
921
922                 *data = tdb_null;
923                 return 0;
924         }
925
926         data->dsize = rec.dsize - ctdb_ltdb_header_len(header);
927         data->dptr = talloc_memdup(mem_ctx,
928                                    rec.dptr + ctdb_ltdb_header_len(header),
929                                    data->dsize);
930
931         free(rec.dptr);
932
933         if (data->dptr == NULL) {
934                 return ENOMEM;
935         }
936
937         return 0;
938 }
939
940 static int database_seqnum(struct database *db, uint64_t *seqnum)
941 {
942         const char *keyname = CTDB_DB_SEQNUM_KEY;
943         TDB_DATA key, data;
944         struct ctdb_ltdb_header header;
945         size_t np;
946         int ret;
947
948         if (db->tdb == NULL) {
949                 *seqnum = db->seq_num;
950                 return 0;
951         }
952
953         key.dptr = discard_const(keyname);
954         key.dsize = strlen(keyname) + 1;
955
956         ret = ltdb_fetch(db, key, &header, db, &data);
957         if (ret != 0) {
958                 return ret;
959         }
960
961         if (data.dsize == 0) {
962                 *seqnum = 0;
963                 return 0;
964         }
965
966         ret = ctdb_uint64_pull(data.dptr, data.dsize, seqnum, &np);
967         talloc_free(data.dptr);
968         if (ret != 0) {
969                 *seqnum = 0;
970         }
971
972         return ret;
973 }
974
975 static int ltdb_transaction_update(uint32_t reqid,
976                                    struct ctdb_ltdb_header *no_header,
977                                    TDB_DATA key, TDB_DATA data,
978                                    void *private_data)
979 {
980         struct database *db = (struct database *)private_data;
981         TALLOC_CTX *tmp_ctx = talloc_new(db);
982         struct ctdb_ltdb_header header = { 0 }, oldheader;
983         TDB_DATA olddata;
984         int ret;
985
986         if (db->tdb == NULL) {
987                 return EINVAL;
988         }
989
990         ret = ctdb_ltdb_header_extract(&data, &header);
991         if (ret != 0) {
992                 return ret;
993         }
994
995         ret = ltdb_fetch(db, key, &oldheader, tmp_ctx, &olddata);
996         if (ret != 0) {
997                 return ret;
998         }
999
1000         if (olddata.dsize > 0) {
1001                 if (oldheader.rsn > header.rsn ||
1002                     (oldheader.rsn == header.rsn &&
1003                      olddata.dsize != data.dsize)) {
1004                         return -1;
1005                 }
1006         }
1007
1008         talloc_free(tmp_ctx);
1009
1010         ret = ltdb_store(db, key, &header, data);
1011         return ret;
1012 }
1013
1014 static int ltdb_transaction(struct database *db,
1015                             struct ctdb_rec_buffer *recbuf)
1016 {
1017         int ret;
1018
1019         if (db->tdb == NULL) {
1020                 return EINVAL;
1021         }
1022
1023         ret = tdb_transaction_start(db->tdb);
1024         if (ret == -1) {
1025                 return ret;
1026         }
1027
1028         ret = ctdb_rec_buffer_traverse(recbuf, ltdb_transaction_update, db);
1029         if (ret != 0) {
1030                 tdb_transaction_cancel(db->tdb);
1031         }
1032
1033         ret = tdb_transaction_commit(db->tdb);
1034         return ret;
1035 }
1036
1037 static bool public_ips_parse(struct ctdbd_context *ctdb,
1038                              uint32_t numnodes)
1039 {
1040         bool status;
1041
1042         if (numnodes == 0) {
1043                 D_ERR("Must initialise nodemap before public IPs\n");
1044                 return false;
1045         }
1046
1047         ctdb->known_ips = ipalloc_read_known_ips(ctdb, numnodes, false);
1048
1049         status = (ctdb->known_ips != NULL);
1050
1051         if (status) {
1052                 D_INFO("Parsing public IPs done\n");
1053         } else {
1054                 D_INFO("Parsing public IPs failed\n");
1055         }
1056
1057         return status;
1058 }
1059
1060 /* Read information about controls to fail.  Format is:
1061  *   <opcode> <pnn> {ERROR|TIMEOUT} <comment>
1062  */
1063 static bool control_failures_parse(struct ctdbd_context *ctdb)
1064 {
1065         char line[1024];
1066
1067         while ((fgets(line, sizeof(line), stdin) != NULL)) {
1068                 char *tok, *t;
1069                 enum ctdb_controls opcode;
1070                 uint32_t pnn;
1071                 const char *error;
1072                 const char *comment;
1073                 struct fake_control_failure *failure = NULL;
1074
1075                 if (line[0] == '\n') {
1076                         break;
1077                 }
1078
1079                 /* Get rid of pesky newline */
1080                 if ((t = strchr(line, '\n')) != NULL) {
1081                         *t = '\0';
1082                 }
1083
1084                 /* Get opcode */
1085                 tok = strtok(line, " \t");
1086                 if (tok == NULL) {
1087                         D_ERR("bad line (%s) - missing opcode\n", line);
1088                         continue;
1089                 }
1090                 opcode = (enum ctdb_controls)strtoul(tok, NULL, 0);
1091
1092                 /* Get PNN */
1093                 tok = strtok(NULL, " \t");
1094                 if (tok == NULL) {
1095                         D_ERR("bad line (%s) - missing PNN\n", line);
1096                         continue;
1097                 }
1098                 pnn = (uint32_t)strtoul(tok, NULL, 0);
1099
1100                 /* Get error */
1101                 tok = strtok(NULL, " \t");
1102                 if (tok == NULL) {
1103                         D_ERR("bad line (%s) - missing errno\n", line);
1104                         continue;
1105                 }
1106                 error = talloc_strdup(ctdb, tok);
1107                 if (error == NULL) {
1108                         goto fail;
1109                 }
1110                 if (strcmp(error, "ERROR") != 0 &&
1111                     strcmp(error, "TIMEOUT") != 0) {
1112                         D_ERR("bad line (%s) "
1113                               "- error must be \"ERROR\" or \"TIMEOUT\"\n",
1114                               line);
1115                         goto fail;
1116                 }
1117
1118                 /* Get comment */
1119                 tok = strtok(NULL, "\n"); /* rest of line */
1120                 if (tok == NULL) {
1121                         D_ERR("bad line (%s) - missing comment\n", line);
1122                         continue;
1123                 }
1124                 comment = talloc_strdup(ctdb, tok);
1125                 if (comment == NULL) {
1126                         goto fail;
1127                 }
1128
1129                 failure = talloc_zero(ctdb, struct fake_control_failure);
1130                 if (failure == NULL) {
1131                         goto fail;
1132                 }
1133
1134                 failure->opcode = opcode;
1135                 failure->pnn = pnn;
1136                 failure->error = error;
1137                 failure->comment = comment;
1138
1139                 DLIST_ADD(ctdb->control_failures, failure);
1140         }
1141
1142         D_INFO("Parsing fake control failures done\n");
1143         return true;
1144
1145 fail:
1146         D_INFO("Parsing fake control failures failed\n");
1147         return false;
1148 }
1149
1150 /*
1151  * Manage clients
1152  */
1153
1154 static int ctdb_client_destructor(struct ctdb_client *client)
1155 {
1156         DLIST_REMOVE(client->ctdb->client_list, client);
1157         return 0;
1158 }
1159
1160 static int client_add(struct ctdbd_context *ctdb, pid_t client_pid,
1161                       void *client_state)
1162 {
1163         struct ctdb_client *client;
1164
1165         client = talloc_zero(client_state, struct ctdb_client);
1166         if (client == NULL) {
1167                 return ENOMEM;
1168         }
1169
1170         client->ctdb = ctdb;
1171         client->pid = client_pid;
1172         client->state = client_state;
1173
1174         DLIST_ADD(ctdb->client_list, client);
1175         talloc_set_destructor(client, ctdb_client_destructor);
1176         return 0;
1177 }
1178
1179 static void *client_find(struct ctdbd_context *ctdb, pid_t client_pid)
1180 {
1181         struct ctdb_client *client;
1182
1183         for (client=ctdb->client_list; client != NULL; client=client->next) {
1184                 if (client->pid == client_pid) {
1185                         return client->state;
1186                 }
1187         }
1188
1189         return NULL;
1190 }
1191
1192 /*
1193  * CTDB context setup
1194  */
1195
1196 static uint32_t new_generation(uint32_t old_generation)
1197 {
1198         uint32_t generation;
1199
1200         while (1) {
1201                 generation = random();
1202                 if (generation != INVALID_GENERATION &&
1203                     generation != old_generation) {
1204                         break;
1205                 }
1206         }
1207
1208         return generation;
1209 }
1210
1211 static struct ctdbd_context *ctdbd_setup(TALLOC_CTX *mem_ctx,
1212                                          const char *dbdir)
1213 {
1214         struct ctdbd_context *ctdb;
1215         char line[1024];
1216         bool status;
1217         int ret;
1218
1219         ctdb = talloc_zero(mem_ctx, struct ctdbd_context);
1220         if (ctdb == NULL) {
1221                 return NULL;
1222         }
1223
1224         ctdb->node_map = nodemap_init(ctdb);
1225         if (ctdb->node_map == NULL) {
1226                 goto fail;
1227         }
1228
1229         ctdb->iface_map = interfaces_init(ctdb);
1230         if (ctdb->iface_map == NULL) {
1231                 goto fail;
1232         }
1233
1234         ctdb->vnn_map = vnnmap_init(ctdb);
1235         if (ctdb->vnn_map == NULL) {
1236                 goto fail;
1237         }
1238
1239         ctdb->db_map = dbmap_init(ctdb, dbdir);
1240         if (ctdb->db_map == NULL) {
1241                 goto fail;
1242         }
1243
1244         ret = srvid_init(ctdb, &ctdb->srv);
1245         if (ret != 0) {
1246                 goto fail;
1247         }
1248
1249         while (fgets(line, sizeof(line), stdin) != NULL) {
1250                 char *t;
1251
1252                 if ((t = strchr(line, '\n')) != NULL) {
1253                         *t = '\0';
1254                 }
1255
1256                 if (strcmp(line, "NODEMAP") == 0) {
1257                         status = nodemap_parse(ctdb->node_map);
1258                 } else if (strcmp(line, "IFACES") == 0) {
1259                         status = interfaces_parse(ctdb->iface_map);
1260                 } else if (strcmp(line, "VNNMAP") == 0) {
1261                         status = vnnmap_parse(ctdb->vnn_map);
1262                 } else if (strcmp(line, "DBMAP") == 0) {
1263                         status = dbmap_parse(ctdb->db_map);
1264                 } else if (strcmp(line, "PUBLICIPS") == 0) {
1265                         status = public_ips_parse(ctdb,
1266                                                   ctdb->node_map->num_nodes);
1267                 } else if (strcmp(line, "RECLOCK") == 0) {
1268                         status = reclock_parse(ctdb);
1269                 } else if (strcmp(line, "CONTROLFAILS") == 0) {
1270                         status = control_failures_parse(ctdb);
1271                 } else {
1272                         fprintf(stderr, "Unknown line %s\n", line);
1273                         status = false;
1274                 }
1275
1276                 if (! status) {
1277                         goto fail;
1278                 }
1279         }
1280
1281         ctdb->start_time = tevent_timeval_current();
1282         ctdb->recovery_start_time = tevent_timeval_current();
1283         ctdb->vnn_map->recmode = CTDB_RECOVERY_NORMAL;
1284         if (ctdb->vnn_map->generation == INVALID_GENERATION) {
1285                 ctdb->vnn_map->generation =
1286                         new_generation(ctdb->vnn_map->generation);
1287         }
1288         ctdb->recovery_end_time = tevent_timeval_current();
1289
1290         ctdb->log_level = DEBUG_ERR;
1291         ctdb->runstate = CTDB_RUNSTATE_RUNNING;
1292
1293         ctdb_tunable_set_defaults(&ctdb->tun_list);
1294
1295         return ctdb;
1296
1297 fail:
1298         TALLOC_FREE(ctdb);
1299         return NULL;
1300 }
1301
1302 static bool ctdbd_verify(struct ctdbd_context *ctdb)
1303 {
1304         struct node *node;
1305         unsigned int i;
1306
1307         if (ctdb->node_map->num_nodes == 0) {
1308                 return true;
1309         }
1310
1311         /* Make sure all the nodes are in order */
1312         for (i=0; i<ctdb->node_map->num_nodes; i++) {
1313                 node = &ctdb->node_map->node[i];
1314                 if (node->pnn != i) {
1315                         fprintf(stderr, "Expected node %u, found %u\n",
1316                                 i, node->pnn);
1317                         return false;
1318                 }
1319         }
1320
1321         node = &ctdb->node_map->node[ctdb->node_map->pnn];
1322         if (node->flags & NODE_FLAGS_DISCONNECTED) {
1323                 DEBUG(DEBUG_INFO, ("Node disconnected, exiting\n"));
1324                 exit(0);
1325         }
1326
1327         return true;
1328 }
1329
1330 /*
1331  * Doing a recovery
1332  */
1333
1334 struct recover_state {
1335         struct tevent_context *ev;
1336         struct ctdbd_context *ctdb;
1337 };
1338
1339 static int recover_check(struct tevent_req *req);
1340 static void recover_wait_done(struct tevent_req *subreq);
1341 static void recover_done(struct tevent_req *subreq);
1342
1343 static struct tevent_req *recover_send(TALLOC_CTX *mem_ctx,
1344                                        struct tevent_context *ev,
1345                                        struct ctdbd_context *ctdb)
1346 {
1347         struct tevent_req *req;
1348         struct recover_state *state;
1349         int ret;
1350
1351         req = tevent_req_create(mem_ctx, &state, struct recover_state);
1352         if (req == NULL) {
1353                 return NULL;
1354         }
1355
1356         state->ev = ev;
1357         state->ctdb = ctdb;
1358
1359         ret = recover_check(req);
1360         if (ret != 0) {
1361                 tevent_req_error(req, ret);
1362                 return tevent_req_post(req, ev);
1363         }
1364
1365         return req;
1366 }
1367
1368 static int recover_check(struct tevent_req *req)
1369 {
1370         struct recover_state *state = tevent_req_data(
1371                 req, struct recover_state);
1372         struct ctdbd_context *ctdb = state->ctdb;
1373         struct tevent_req *subreq;
1374         bool recovery_disabled;
1375         unsigned int i;
1376
1377         recovery_disabled = false;
1378         for (i=0; i<ctdb->node_map->num_nodes; i++) {
1379                 if (ctdb->node_map->node[i].recovery_disabled) {
1380                         recovery_disabled = true;
1381                         break;
1382                 }
1383         }
1384
1385         subreq = tevent_wakeup_send(state, state->ev,
1386                                     tevent_timeval_current_ofs(1, 0));
1387         if (subreq == NULL) {
1388                 return ENOMEM;
1389         }
1390
1391         if (recovery_disabled) {
1392                 tevent_req_set_callback(subreq, recover_wait_done, req);
1393         } else {
1394                 ctdb->recovery_start_time = tevent_timeval_current();
1395                 tevent_req_set_callback(subreq, recover_done, req);
1396         }
1397
1398         return 0;
1399 }
1400
1401 static void recover_wait_done(struct tevent_req *subreq)
1402 {
1403         struct tevent_req *req = tevent_req_callback_data(
1404                 subreq, struct tevent_req);
1405         int ret;
1406         bool status;
1407
1408         status = tevent_wakeup_recv(subreq);
1409         TALLOC_FREE(subreq);
1410         if (! status) {
1411                 tevent_req_error(req, EIO);
1412                 return;
1413         }
1414
1415         ret = recover_check(req);
1416         if (ret != 0) {
1417                 tevent_req_error(req, ret);
1418         }
1419 }
1420
1421 static void recover_done(struct tevent_req *subreq)
1422 {
1423         struct tevent_req *req = tevent_req_callback_data(
1424                 subreq, struct tevent_req);
1425         struct recover_state *state = tevent_req_data(
1426                 req, struct recover_state);
1427         struct ctdbd_context *ctdb = state->ctdb;
1428         bool status;
1429
1430         status = tevent_wakeup_recv(subreq);
1431         TALLOC_FREE(subreq);
1432         if (! status) {
1433                 tevent_req_error(req, EIO);
1434                 return;
1435         }
1436
1437         ctdb->vnn_map->recmode = CTDB_RECOVERY_NORMAL;
1438         ctdb->recovery_end_time = tevent_timeval_current();
1439         ctdb->vnn_map->generation = new_generation(ctdb->vnn_map->generation);
1440
1441         tevent_req_done(req);
1442 }
1443
1444 static bool recover_recv(struct tevent_req *req, int *perr)
1445 {
1446         int err;
1447
1448         if (tevent_req_is_unix_error(req, &err)) {
1449                 if (perr != NULL) {
1450                         *perr = err;
1451                 }
1452                 return false;
1453         }
1454
1455         return true;
1456 }
1457
1458 /*
1459  * Routines for ctdb_req_header
1460  */
1461
1462 static void header_fix_pnn(struct ctdb_req_header *header,
1463                            struct ctdbd_context *ctdb)
1464 {
1465         if (header->srcnode == CTDB_CURRENT_NODE) {
1466                 header->srcnode = ctdb->node_map->pnn;
1467         }
1468
1469         if (header->destnode == CTDB_CURRENT_NODE) {
1470                 header->destnode = ctdb->node_map->pnn;
1471         }
1472 }
1473
1474 static struct ctdb_req_header header_reply_call(
1475                                         struct ctdb_req_header *header,
1476                                         struct ctdbd_context *ctdb)
1477 {
1478         struct ctdb_req_header reply_header;
1479
1480         reply_header = (struct ctdb_req_header) {
1481                 .ctdb_magic = CTDB_MAGIC,
1482                 .ctdb_version = CTDB_PROTOCOL,
1483                 .generation = ctdb->vnn_map->generation,
1484                 .operation = CTDB_REPLY_CALL,
1485                 .destnode = header->srcnode,
1486                 .srcnode = header->destnode,
1487                 .reqid = header->reqid,
1488         };
1489
1490         return reply_header;
1491 }
1492
1493 static struct ctdb_req_header header_reply_control(
1494                                         struct ctdb_req_header *header,
1495                                         struct ctdbd_context *ctdb)
1496 {
1497         struct ctdb_req_header reply_header;
1498
1499         reply_header = (struct ctdb_req_header) {
1500                 .ctdb_magic = CTDB_MAGIC,
1501                 .ctdb_version = CTDB_PROTOCOL,
1502                 .generation = ctdb->vnn_map->generation,
1503                 .operation = CTDB_REPLY_CONTROL,
1504                 .destnode = header->srcnode,
1505                 .srcnode = header->destnode,
1506                 .reqid = header->reqid,
1507         };
1508
1509         return reply_header;
1510 }
1511
1512 static struct ctdb_req_header header_reply_message(
1513                                         struct ctdb_req_header *header,
1514                                         struct ctdbd_context *ctdb)
1515 {
1516         struct ctdb_req_header reply_header;
1517
1518         reply_header = (struct ctdb_req_header) {
1519                 .ctdb_magic = CTDB_MAGIC,
1520                 .ctdb_version = CTDB_PROTOCOL,
1521                 .generation = ctdb->vnn_map->generation,
1522                 .operation = CTDB_REQ_MESSAGE,
1523                 .destnode = header->srcnode,
1524                 .srcnode = header->destnode,
1525                 .reqid = 0,
1526         };
1527
1528         return reply_header;
1529 }
1530
1531 /*
1532  * Client state
1533  */
1534
1535 struct client_state {
1536         struct tevent_context *ev;
1537         int fd;
1538         struct ctdbd_context *ctdb;
1539         int pnn;
1540         pid_t pid;
1541         struct comm_context *comm;
1542         struct srvid_register_state *rstate;
1543         int status;
1544 };
1545
1546 /*
1547  * Send replies to call, controls and messages
1548  */
1549
1550 static void client_reply_done(struct tevent_req *subreq);
1551
1552 static void client_send_call(struct tevent_req *req,
1553                              struct ctdb_req_header *header,
1554                              struct ctdb_reply_call *reply)
1555 {
1556         struct client_state *state = tevent_req_data(
1557                 req, struct client_state);
1558         struct ctdbd_context *ctdb = state->ctdb;
1559         struct tevent_req *subreq;
1560         struct ctdb_req_header reply_header;
1561         uint8_t *buf;
1562         size_t datalen, buflen;
1563         int ret;
1564
1565         reply_header = header_reply_call(header, ctdb);
1566
1567         datalen = ctdb_reply_call_len(&reply_header, reply);
1568         ret = ctdb_allocate_pkt(state, datalen, &buf, &buflen);
1569         if (ret != 0) {
1570                 tevent_req_error(req, ret);
1571                 return;
1572         }
1573
1574         ret = ctdb_reply_call_push(&reply_header, reply, buf, &buflen);
1575         if (ret != 0) {
1576                 tevent_req_error(req, ret);
1577                 return;
1578         }
1579
1580         subreq = comm_write_send(state, state->ev, state->comm, buf, buflen);
1581         if (tevent_req_nomem(subreq, req)) {
1582                 return;
1583         }
1584         tevent_req_set_callback(subreq, client_reply_done, req);
1585
1586         talloc_steal(subreq, buf);
1587 }
1588
1589 static void client_send_message(struct tevent_req *req,
1590                                 struct ctdb_req_header *header,
1591                                 struct ctdb_req_message_data *message)
1592 {
1593         struct client_state *state = tevent_req_data(
1594                 req, struct client_state);
1595         struct ctdbd_context *ctdb = state->ctdb;
1596         struct tevent_req *subreq;
1597         struct ctdb_req_header reply_header;
1598         uint8_t *buf;
1599         size_t datalen, buflen;
1600         int ret;
1601
1602         reply_header = header_reply_message(header, ctdb);
1603
1604         datalen = ctdb_req_message_data_len(&reply_header, message);
1605         ret = ctdb_allocate_pkt(state, datalen, &buf, &buflen);
1606         if (ret != 0) {
1607                 tevent_req_error(req, ret);
1608                 return;
1609         }
1610
1611         ret = ctdb_req_message_data_push(&reply_header, message,
1612                                          buf, &buflen);
1613         if (ret != 0) {
1614                 tevent_req_error(req, ret);
1615                 return;
1616         }
1617
1618         DEBUG(DEBUG_INFO, ("message srvid = 0x%"PRIx64"\n", message->srvid));
1619
1620         subreq = comm_write_send(state, state->ev, state->comm, buf, buflen);
1621         if (tevent_req_nomem(subreq, req)) {
1622                 return;
1623         }
1624         tevent_req_set_callback(subreq, client_reply_done, req);
1625
1626         talloc_steal(subreq, buf);
1627 }
1628
1629 static void client_send_control(struct tevent_req *req,
1630                                 struct ctdb_req_header *header,
1631                                 struct ctdb_reply_control *reply)
1632 {
1633         struct client_state *state = tevent_req_data(
1634                 req, struct client_state);
1635         struct ctdbd_context *ctdb = state->ctdb;
1636         struct tevent_req *subreq;
1637         struct ctdb_req_header reply_header;
1638         uint8_t *buf;
1639         size_t datalen, buflen;
1640         int ret;
1641
1642         reply_header = header_reply_control(header, ctdb);
1643
1644         datalen = ctdb_reply_control_len(&reply_header, reply);
1645         ret = ctdb_allocate_pkt(state, datalen, &buf, &buflen);
1646         if (ret != 0) {
1647                 tevent_req_error(req, ret);
1648                 return;
1649         }
1650
1651         ret = ctdb_reply_control_push(&reply_header, reply, buf, &buflen);
1652         if (ret != 0) {
1653                 tevent_req_error(req, ret);
1654                 return;
1655         }
1656
1657         DEBUG(DEBUG_INFO, ("reply opcode = %u\n", reply->rdata.opcode));
1658
1659         subreq = comm_write_send(state, state->ev, state->comm, buf, buflen);
1660         if (tevent_req_nomem(subreq, req)) {
1661                 return;
1662         }
1663         tevent_req_set_callback(subreq, client_reply_done, req);
1664
1665         talloc_steal(subreq, buf);
1666 }
1667
1668 static void client_reply_done(struct tevent_req *subreq)
1669 {
1670         struct tevent_req *req = tevent_req_callback_data(
1671                 subreq, struct tevent_req);
1672         int ret;
1673         bool status;
1674
1675         status = comm_write_recv(subreq, &ret);
1676         TALLOC_FREE(subreq);
1677         if (! status) {
1678                 tevent_req_error(req, ret);
1679         }
1680 }
1681
1682 /*
1683  * Handling protocol - controls
1684  */
1685
1686 static void control_process_exists(TALLOC_CTX *mem_ctx,
1687                                    struct tevent_req *req,
1688                                    struct ctdb_req_header *header,
1689                                    struct ctdb_req_control *request)
1690 {
1691         struct client_state *state = tevent_req_data(
1692                 req, struct client_state);
1693         struct ctdbd_context *ctdb = state->ctdb;
1694         struct client_state *cstate;
1695         struct ctdb_reply_control reply;
1696
1697         reply.rdata.opcode = request->opcode;
1698
1699         cstate = client_find(ctdb, request->rdata.data.pid);
1700         if (cstate == NULL) {
1701                 reply.status = -1;
1702                 reply.errmsg = "No client for PID";
1703         } else {
1704                 reply.status = kill(request->rdata.data.pid, 0);
1705                 reply.errmsg = NULL;
1706         }
1707
1708         client_send_control(req, header, &reply);
1709 }
1710
1711 static void control_ping(TALLOC_CTX *mem_ctx,
1712                          struct tevent_req *req,
1713                          struct ctdb_req_header *header,
1714                          struct ctdb_req_control *request)
1715 {
1716         struct client_state *state = tevent_req_data(
1717                 req, struct client_state);
1718         struct ctdbd_context *ctdb = state->ctdb;
1719         struct ctdb_reply_control reply;
1720
1721         reply.rdata.opcode = request->opcode;
1722         reply.status = ctdb->num_clients;
1723         reply.errmsg = NULL;
1724
1725         client_send_control(req, header, &reply);
1726 }
1727
1728 static void control_getdbpath(TALLOC_CTX *mem_ctx,
1729                               struct tevent_req *req,
1730                               struct ctdb_req_header *header,
1731                               struct ctdb_req_control *request)
1732 {
1733         struct client_state *state = tevent_req_data(
1734                 req, struct client_state);
1735         struct ctdbd_context *ctdb = state->ctdb;
1736         struct ctdb_reply_control reply;
1737         struct database *db;
1738
1739         reply.rdata.opcode = request->opcode;
1740
1741         db = database_find(ctdb->db_map, request->rdata.data.db_id);
1742         if (db == NULL) {
1743                 reply.status = ENOENT;
1744                 reply.errmsg = "Database not found";
1745         } else {
1746                 reply.rdata.data.db_path =
1747                         talloc_strdup(mem_ctx, db->path);
1748                 if (reply.rdata.data.db_path == NULL) {
1749                         reply.status = ENOMEM;
1750                         reply.errmsg = "Memory error";
1751                 } else {
1752                         reply.status = 0;
1753                         reply.errmsg = NULL;
1754                 }
1755         }
1756
1757         client_send_control(req, header, &reply);
1758 }
1759
1760 static void control_getvnnmap(TALLOC_CTX *mem_ctx,
1761                               struct tevent_req *req,
1762                               struct ctdb_req_header *header,
1763                               struct ctdb_req_control *request)
1764 {
1765         struct client_state *state = tevent_req_data(
1766                 req, struct client_state);
1767         struct ctdbd_context *ctdb = state->ctdb;
1768         struct ctdb_reply_control reply;
1769         struct ctdb_vnn_map *vnnmap;
1770
1771         reply.rdata.opcode = request->opcode;
1772
1773         vnnmap = talloc_zero(mem_ctx, struct ctdb_vnn_map);
1774         if (vnnmap == NULL) {
1775                 reply.status = ENOMEM;
1776                 reply.errmsg = "Memory error";
1777         } else {
1778                 vnnmap->generation = ctdb->vnn_map->generation;
1779                 vnnmap->size = ctdb->vnn_map->size;
1780                 vnnmap->map = ctdb->vnn_map->map;
1781
1782                 reply.rdata.data.vnnmap = vnnmap;
1783                 reply.status = 0;
1784                 reply.errmsg = NULL;
1785         }
1786
1787         client_send_control(req, header, &reply);
1788 }
1789
1790 static void control_get_debug(TALLOC_CTX *mem_ctx,
1791                               struct tevent_req *req,
1792                               struct ctdb_req_header *header,
1793                               struct ctdb_req_control *request)
1794 {
1795         struct client_state *state = tevent_req_data(
1796                 req, struct client_state);
1797         struct ctdbd_context *ctdb = state->ctdb;
1798         struct ctdb_reply_control reply;
1799
1800         reply.rdata.opcode = request->opcode;
1801         reply.rdata.data.loglevel = (uint32_t)ctdb->log_level;
1802         reply.status = 0;
1803         reply.errmsg = NULL;
1804
1805         client_send_control(req, header, &reply);
1806 }
1807
1808 static void control_set_debug(TALLOC_CTX *mem_ctx,
1809                               struct tevent_req *req,
1810                               struct ctdb_req_header *header,
1811                               struct ctdb_req_control *request)
1812 {
1813         struct client_state *state = tevent_req_data(
1814                 req, struct client_state);
1815         struct ctdbd_context *ctdb = state->ctdb;
1816         struct ctdb_reply_control reply;
1817
1818         ctdb->log_level = (int)request->rdata.data.loglevel;
1819
1820         reply.rdata.opcode = request->opcode;
1821         reply.status = 0;
1822         reply.errmsg = NULL;
1823
1824         client_send_control(req, header, &reply);
1825 }
1826
1827 static void control_get_dbmap(TALLOC_CTX *mem_ctx,
1828                               struct tevent_req *req,
1829                                struct ctdb_req_header *header,
1830                               struct ctdb_req_control *request)
1831 {
1832         struct client_state *state = tevent_req_data(
1833                 req, struct client_state);
1834         struct ctdbd_context *ctdb = state->ctdb;
1835         struct ctdb_reply_control reply;
1836         struct ctdb_dbid_map *dbmap;
1837         struct database *db;
1838         unsigned int i;
1839
1840         reply.rdata.opcode = request->opcode;
1841
1842         dbmap = talloc_zero(mem_ctx, struct ctdb_dbid_map);
1843         if (dbmap == NULL) {
1844                 goto fail;
1845         }
1846
1847         dbmap->num = database_count(ctdb->db_map);
1848         dbmap->dbs = talloc_zero_array(dbmap, struct ctdb_dbid, dbmap->num);
1849         if (dbmap->dbs == NULL) {
1850                 goto fail;
1851         }
1852
1853         db = ctdb->db_map->db;
1854         for (i = 0; i < dbmap->num; i++) {
1855                 dbmap->dbs[i] = (struct ctdb_dbid) {
1856                         .db_id = db->id,
1857                         .flags = db->flags,
1858                 };
1859
1860                 db = db->next;
1861         }
1862
1863         reply.rdata.data.dbmap = dbmap;
1864         reply.status = 0;
1865         reply.errmsg = NULL;
1866         client_send_control(req, header, &reply);
1867         return;
1868
1869 fail:
1870         reply.status = -1;
1871         reply.errmsg = "Memory error";
1872         client_send_control(req, header, &reply);
1873 }
1874
1875 static void control_get_recmode(TALLOC_CTX *mem_ctx,
1876                                 struct tevent_req *req,
1877                                 struct ctdb_req_header *header,
1878                                 struct ctdb_req_control *request)
1879 {
1880         struct client_state *state = tevent_req_data(
1881                 req, struct client_state);
1882         struct ctdbd_context *ctdb = state->ctdb;
1883         struct ctdb_reply_control reply;
1884
1885         reply.rdata.opcode = request->opcode;
1886         reply.status = ctdb->vnn_map->recmode;
1887         reply.errmsg = NULL;
1888
1889         client_send_control(req, header, &reply);
1890 }
1891
1892 struct set_recmode_state {
1893         struct tevent_req *req;
1894         struct ctdbd_context *ctdb;
1895         struct ctdb_req_header header;
1896         struct ctdb_reply_control reply;
1897 };
1898
1899 static void set_recmode_callback(struct tevent_req *subreq)
1900 {
1901         struct set_recmode_state *substate = tevent_req_callback_data(
1902                 subreq, struct set_recmode_state);
1903         bool status;
1904         int ret;
1905
1906         status = recover_recv(subreq, &ret);
1907         TALLOC_FREE(subreq);
1908         if (! status) {
1909                 substate->reply.status = ret;
1910                 substate->reply.errmsg = "recovery failed";
1911         } else {
1912                 substate->reply.status = 0;
1913                 substate->reply.errmsg = NULL;
1914         }
1915
1916         client_send_control(substate->req, &substate->header, &substate->reply);
1917         talloc_free(substate);
1918 }
1919
1920 static void control_set_recmode(TALLOC_CTX *mem_ctx,
1921                                 struct tevent_req *req,
1922                                 struct ctdb_req_header *header,
1923                                 struct ctdb_req_control *request)
1924 {
1925         struct client_state *state = tevent_req_data(
1926                 req, struct client_state);
1927         struct tevent_req *subreq;
1928         struct ctdbd_context *ctdb = state->ctdb;
1929         struct set_recmode_state *substate;
1930         struct ctdb_reply_control reply;
1931
1932         reply.rdata.opcode = request->opcode;
1933
1934         if (request->rdata.data.recmode == CTDB_RECOVERY_NORMAL) {
1935                 reply.status = -1;
1936                 reply.errmsg = "Client cannot set recmode to NORMAL";
1937                 goto fail;
1938         }
1939
1940         substate = talloc_zero(ctdb, struct set_recmode_state);
1941         if (substate == NULL) {
1942                 reply.status = -1;
1943                 reply.errmsg = "Memory error";
1944                 goto fail;
1945         }
1946
1947         substate->req = req;
1948         substate->ctdb = ctdb;
1949         substate->header = *header;
1950         substate->reply.rdata.opcode = request->opcode;
1951
1952         subreq = recover_send(substate, state->ev, state->ctdb);
1953         if (subreq == NULL) {
1954                 talloc_free(substate);
1955                 goto fail;
1956         }
1957         tevent_req_set_callback(subreq, set_recmode_callback, substate);
1958
1959         ctdb->vnn_map->recmode = CTDB_RECOVERY_ACTIVE;
1960         return;
1961
1962 fail:
1963         client_send_control(req, header, &reply);
1964
1965 }
1966
1967 static void control_db_attach(TALLOC_CTX *mem_ctx,
1968                               struct tevent_req *req,
1969                               struct ctdb_req_header *header,
1970                               struct ctdb_req_control *request)
1971 {
1972         struct client_state *state = tevent_req_data(
1973                 req, struct client_state);
1974         struct ctdbd_context *ctdb = state->ctdb;
1975         struct ctdb_reply_control reply;
1976         struct database *db;
1977
1978         reply.rdata.opcode = request->opcode;
1979
1980         for (db = ctdb->db_map->db; db != NULL; db = db->next) {
1981                 if (strcmp(db->name, request->rdata.data.db_name) == 0) {
1982                         goto done;
1983                 }
1984         }
1985
1986         db = database_new(ctdb->db_map, request->rdata.data.db_name, 0);
1987         if (db == NULL) {
1988                 reply.status = -1;
1989                 reply.errmsg = "Failed to attach database";
1990                 client_send_control(req, header, &reply);
1991                 return;
1992         }
1993
1994 done:
1995         reply.rdata.data.db_id = db->id;
1996         reply.status = 0;
1997         reply.errmsg = NULL;
1998         client_send_control(req, header, &reply);
1999 }
2000
2001 static void srvid_handler_done(struct tevent_req *subreq);
2002
2003 static void srvid_handler(uint64_t srvid, TDB_DATA data, void *private_data)
2004 {
2005         struct client_state *state = talloc_get_type_abort(
2006                 private_data, struct client_state);
2007         struct ctdbd_context *ctdb = state->ctdb;
2008         struct tevent_req *subreq;
2009         struct ctdb_req_header request_header;
2010         struct ctdb_req_message_data message;
2011         uint8_t *buf;
2012         size_t datalen, buflen;
2013         int ret;
2014
2015         request_header = (struct ctdb_req_header) {
2016                 .ctdb_magic = CTDB_MAGIC,
2017                 .ctdb_version = CTDB_PROTOCOL,
2018                 .generation = ctdb->vnn_map->generation,
2019                 .operation = CTDB_REQ_MESSAGE,
2020                 .destnode = state->pnn,
2021                 .srcnode = state->pnn,
2022                 .reqid = 0,
2023         };
2024
2025         message = (struct ctdb_req_message_data) {
2026                 .srvid = srvid,
2027                 .data = data,
2028         };
2029
2030         datalen = ctdb_req_message_data_len(&request_header, &message);
2031         ret = ctdb_allocate_pkt(state, datalen, &buf, &buflen);
2032         if (ret != 0) {
2033                 return;
2034         }
2035
2036         ret = ctdb_req_message_data_push(&request_header,
2037                                          &message,
2038                                          buf,
2039                                          &buflen);
2040         if (ret != 0) {
2041                 talloc_free(buf);
2042                 return;
2043         }
2044
2045         subreq = comm_write_send(state, state->ev, state->comm, buf, buflen);
2046         if (subreq == NULL) {
2047                 talloc_free(buf);
2048                 return;
2049         }
2050         tevent_req_set_callback(subreq, srvid_handler_done, state);
2051
2052         talloc_steal(subreq, buf);
2053 }
2054
2055 static void srvid_handler_done(struct tevent_req *subreq)
2056 {
2057         struct client_state *state = tevent_req_data(
2058                 subreq, struct client_state);
2059         int ret;
2060         bool ok;
2061
2062         ok = comm_write_recv(subreq, &ret);
2063         TALLOC_FREE(subreq);
2064         if (!ok) {
2065                 DEBUG(DEBUG_ERR,
2066                       ("Failed to dispatch message to client pid=%u, ret=%d\n",
2067                        state->pid,
2068                        ret));
2069         }
2070 }
2071
2072 static void control_register_srvid(TALLOC_CTX *mem_ctx,
2073                                    struct tevent_req *req,
2074                                    struct ctdb_req_header *header,
2075                                    struct ctdb_req_control *request)
2076 {
2077         struct client_state *state = tevent_req_data(
2078                 req, struct client_state);
2079         struct ctdbd_context *ctdb = state->ctdb;
2080         struct ctdb_reply_control reply;
2081         int ret;
2082
2083         reply.rdata.opcode = request->opcode;
2084
2085         ret = srvid_register(ctdb->srv, state, request->srvid,
2086                              srvid_handler, state);
2087         if (ret != 0) {
2088                 reply.status = -1;
2089                 reply.errmsg = "Memory error";
2090                 goto fail;
2091         }
2092
2093         DEBUG(DEBUG_INFO, ("Register srvid 0x%"PRIx64"\n", request->srvid));
2094
2095         reply.status = 0;
2096         reply.errmsg = NULL;
2097
2098 fail:
2099         client_send_control(req, header, &reply);
2100 }
2101
2102 static void control_deregister_srvid(TALLOC_CTX *mem_ctx,
2103                                      struct tevent_req *req,
2104                                      struct ctdb_req_header *header,
2105                                      struct ctdb_req_control *request)
2106 {
2107         struct client_state *state = tevent_req_data(
2108                 req, struct client_state);
2109         struct ctdbd_context *ctdb = state->ctdb;
2110         struct ctdb_reply_control reply;
2111         int ret;
2112
2113         reply.rdata.opcode = request->opcode;
2114
2115         ret = srvid_deregister(ctdb->srv, request->srvid, state);
2116         if (ret != 0) {
2117                 reply.status = -1;
2118                 reply.errmsg = "srvid not registered";
2119                 goto fail;
2120         }
2121
2122         DEBUG(DEBUG_INFO, ("Deregister srvid 0x%"PRIx64"\n", request->srvid));
2123
2124         reply.status = 0;
2125         reply.errmsg = NULL;
2126
2127 fail:
2128         client_send_control(req, header, &reply);
2129 }
2130
2131 static void control_get_dbname(TALLOC_CTX *mem_ctx,
2132                                struct tevent_req *req,
2133                                struct ctdb_req_header *header,
2134                                struct ctdb_req_control *request)
2135 {
2136         struct client_state *state = tevent_req_data(
2137                 req, struct client_state);
2138         struct ctdbd_context *ctdb = state->ctdb;
2139         struct ctdb_reply_control reply;
2140         struct database *db;
2141
2142         reply.rdata.opcode = request->opcode;
2143
2144         db = database_find(ctdb->db_map, request->rdata.data.db_id);
2145         if (db == NULL) {
2146                 reply.status = ENOENT;
2147                 reply.errmsg = "Database not found";
2148         } else {
2149                 reply.rdata.data.db_name = talloc_strdup(mem_ctx, db->name);
2150                 if (reply.rdata.data.db_name == NULL) {
2151                         reply.status = ENOMEM;
2152                         reply.errmsg = "Memory error";
2153                 } else {
2154                         reply.status = 0;
2155                         reply.errmsg = NULL;
2156                 }
2157         }
2158
2159         client_send_control(req, header, &reply);
2160 }
2161
2162 static void control_get_pid(TALLOC_CTX *mem_ctx,
2163                             struct tevent_req *req,
2164                             struct ctdb_req_header *header,
2165                             struct ctdb_req_control *request)
2166 {
2167         struct ctdb_reply_control reply;
2168
2169         reply.rdata.opcode = request->opcode;
2170         reply.status = getpid();
2171         reply.errmsg = NULL;
2172
2173         client_send_control(req, header, &reply);
2174 }
2175
2176 static void control_get_recmaster(TALLOC_CTX *mem_ctx,
2177                                   struct tevent_req *req,
2178                                   struct ctdb_req_header *header,
2179                                   struct ctdb_req_control *request)
2180 {
2181         struct client_state *state = tevent_req_data(
2182                 req, struct client_state);
2183         struct ctdbd_context *ctdb = state->ctdb;
2184         struct ctdb_reply_control reply;
2185
2186         reply.rdata.opcode = request->opcode;
2187         reply.status = ctdb->node_map->recmaster;
2188         reply.errmsg = NULL;
2189
2190         client_send_control(req, header, &reply);
2191 }
2192
2193 static void control_get_pnn(TALLOC_CTX *mem_ctx,
2194                             struct tevent_req *req,
2195                             struct ctdb_req_header *header,
2196                             struct ctdb_req_control *request)
2197 {
2198         struct ctdb_reply_control reply;
2199
2200         reply.rdata.opcode = request->opcode;
2201         reply.status = header->destnode;
2202         reply.errmsg = NULL;
2203
2204         client_send_control(req, header, &reply);
2205 }
2206
2207 static void control_shutdown(TALLOC_CTX *mem_ctx,
2208                              struct tevent_req *req,
2209                              struct ctdb_req_header *hdr,
2210                              struct ctdb_req_control *request)
2211 {
2212         struct client_state *state = tevent_req_data(
2213                 req, struct client_state);
2214
2215         state->status = 99;
2216 }
2217
2218 static void control_set_tunable(TALLOC_CTX *mem_ctx,
2219                                 struct tevent_req *req,
2220                                 struct ctdb_req_header *header,
2221                                 struct ctdb_req_control *request)
2222 {
2223         struct client_state *state = tevent_req_data(
2224                 req, struct client_state);
2225         struct ctdbd_context *ctdb = state->ctdb;
2226         struct ctdb_reply_control reply;
2227         bool ret, obsolete;
2228
2229         reply.rdata.opcode = request->opcode;
2230         reply.errmsg = NULL;
2231
2232         ret = ctdb_tunable_set_value(&ctdb->tun_list,
2233                                      request->rdata.data.tunable->name,
2234                                      request->rdata.data.tunable->value,
2235                                      &obsolete);
2236         if (! ret) {
2237                 reply.status = -1;
2238         } else if (obsolete) {
2239                 reply.status = 1;
2240         } else {
2241                 reply.status = 0;
2242         }
2243
2244         client_send_control(req, header, &reply);
2245 }
2246
2247 static void control_get_tunable(TALLOC_CTX *mem_ctx,
2248                                 struct tevent_req *req,
2249                                 struct ctdb_req_header *header,
2250                                 struct ctdb_req_control *request)
2251 {
2252         struct client_state *state = tevent_req_data(
2253                 req, struct client_state);
2254         struct ctdbd_context *ctdb = state->ctdb;
2255         struct ctdb_reply_control reply;
2256         uint32_t value;
2257         bool ret;
2258
2259         reply.rdata.opcode = request->opcode;
2260         reply.errmsg = NULL;
2261
2262         ret = ctdb_tunable_get_value(&ctdb->tun_list,
2263                                      request->rdata.data.tun_var, &value);
2264         if (! ret) {
2265                 reply.status = -1;
2266         } else {
2267                 reply.rdata.data.tun_value = value;
2268                 reply.status = 0;
2269         }
2270
2271         client_send_control(req, header, &reply);
2272 }
2273
2274 static void control_list_tunables(TALLOC_CTX *mem_ctx,
2275                                   struct tevent_req *req,
2276                                   struct ctdb_req_header *header,
2277                                   struct ctdb_req_control *request)
2278 {
2279         struct ctdb_reply_control reply;
2280         struct ctdb_var_list *var_list;
2281
2282         reply.rdata.opcode = request->opcode;
2283         reply.errmsg = NULL;
2284
2285         var_list = ctdb_tunable_names(mem_ctx);
2286         if (var_list == NULL) {
2287                 reply.status = -1;
2288         } else {
2289                 reply.rdata.data.tun_var_list = var_list;
2290                 reply.status = 0;
2291         }
2292
2293         client_send_control(req, header, &reply);
2294 }
2295
2296 static void control_modify_flags(TALLOC_CTX *mem_ctx,
2297                                  struct tevent_req *req,
2298                                  struct ctdb_req_header *header,
2299                                  struct ctdb_req_control *request)
2300 {
2301         struct client_state *state = tevent_req_data(
2302                 req, struct client_state);
2303         struct ctdbd_context *ctdb = state->ctdb;
2304         struct ctdb_node_flag_change *change = request->rdata.data.flag_change;
2305         struct ctdb_reply_control reply;
2306         struct node *node;
2307
2308         reply.rdata.opcode = request->opcode;
2309
2310         if ((change->old_flags & ~NODE_FLAGS_PERMANENTLY_DISABLED) ||
2311             (change->new_flags & ~NODE_FLAGS_PERMANENTLY_DISABLED) != 0) {
2312                 DEBUG(DEBUG_INFO,
2313                       ("MODIFY_FLAGS control not for PERMANENTLY_DISABLED\n"));
2314                 reply.status = EINVAL;
2315                 reply.errmsg = "Failed to MODIFY_FLAGS";
2316                 client_send_control(req, header, &reply);
2317                 return;
2318         }
2319
2320         /* There's all sorts of broadcast weirdness here.  Only change
2321          * the specified node, not the destination node of the
2322          * control. */
2323         node = &ctdb->node_map->node[change->pnn];
2324
2325         if ((node->flags &
2326              change->old_flags & NODE_FLAGS_PERMANENTLY_DISABLED) == 0 &&
2327             (change->new_flags & NODE_FLAGS_PERMANENTLY_DISABLED) != 0) {
2328                 DEBUG(DEBUG_INFO,("Disabling node %d\n", header->destnode));
2329                 node->flags |= NODE_FLAGS_PERMANENTLY_DISABLED;
2330                 goto done;
2331         }
2332
2333         if ((node->flags &
2334              change->old_flags & NODE_FLAGS_PERMANENTLY_DISABLED) != 0 &&
2335             (change->new_flags & NODE_FLAGS_PERMANENTLY_DISABLED) == 0) {
2336                 DEBUG(DEBUG_INFO,("Enabling node %d\n", header->destnode));
2337                 node->flags &= ~NODE_FLAGS_PERMANENTLY_DISABLED;
2338                 goto done;
2339         }
2340
2341         DEBUG(DEBUG_INFO, ("Flags unchanged for node %d\n", header->destnode));
2342
2343 done:
2344         reply.status = 0;
2345         reply.errmsg = NULL;
2346         client_send_control(req, header, &reply);
2347 }
2348
2349 static void control_get_all_tunables(TALLOC_CTX *mem_ctx,
2350                                      struct tevent_req *req,
2351                                      struct ctdb_req_header *header,
2352                                      struct ctdb_req_control *request)
2353 {
2354         struct client_state *state = tevent_req_data(
2355                 req, struct client_state);
2356         struct ctdbd_context *ctdb = state->ctdb;
2357         struct ctdb_reply_control reply;
2358
2359         reply.rdata.opcode = request->opcode;
2360         reply.rdata.data.tun_list = &ctdb->tun_list;
2361         reply.status = 0;
2362         reply.errmsg = NULL;
2363
2364         client_send_control(req, header, &reply);
2365 }
2366
2367 static void control_db_attach_persistent(TALLOC_CTX *mem_ctx,
2368                                          struct tevent_req *req,
2369                                          struct ctdb_req_header *header,
2370                                          struct ctdb_req_control *request)
2371 {
2372         struct client_state *state = tevent_req_data(
2373                 req, struct client_state);
2374         struct ctdbd_context *ctdb = state->ctdb;
2375         struct ctdb_reply_control reply;
2376         struct database *db;
2377
2378         reply.rdata.opcode = request->opcode;
2379
2380         for (db = ctdb->db_map->db; db != NULL; db = db->next) {
2381                 if (strcmp(db->name, request->rdata.data.db_name) == 0) {
2382                         goto done;
2383                 }
2384         }
2385
2386         db = database_new(ctdb->db_map, request->rdata.data.db_name,
2387                           CTDB_DB_FLAGS_PERSISTENT);
2388         if (db == NULL) {
2389                 reply.status = -1;
2390                 reply.errmsg = "Failed to attach database";
2391                 client_send_control(req, header, &reply);
2392                 return;
2393         }
2394
2395 done:
2396         reply.rdata.data.db_id = db->id;
2397         reply.status = 0;
2398         reply.errmsg = NULL;
2399         client_send_control(req, header, &reply);
2400 }
2401
2402 static void control_uptime(TALLOC_CTX *mem_ctx,
2403                            struct tevent_req *req,
2404                            struct ctdb_req_header *header,
2405                            struct ctdb_req_control *request)
2406 {
2407         struct client_state *state = tevent_req_data(
2408                 req, struct client_state);
2409         struct ctdbd_context *ctdb = state->ctdb;
2410         struct ctdb_reply_control reply;
2411         struct ctdb_uptime *uptime;;
2412
2413         reply.rdata.opcode = request->opcode;
2414
2415         uptime = talloc_zero(mem_ctx, struct ctdb_uptime);
2416         if (uptime == NULL) {
2417                 goto fail;
2418         }
2419
2420         uptime->current_time = tevent_timeval_current();
2421         uptime->ctdbd_start_time = ctdb->start_time;
2422         uptime->last_recovery_started = ctdb->recovery_start_time;
2423         uptime->last_recovery_finished = ctdb->recovery_end_time;
2424
2425         reply.rdata.data.uptime = uptime;
2426         reply.status = 0;
2427         reply.errmsg = NULL;
2428         client_send_control(req, header, &reply);
2429         return;
2430
2431 fail:
2432         reply.status = -1;
2433         reply.errmsg = "Memory error";
2434         client_send_control(req, header, &reply);
2435 }
2436
2437 static void control_reload_nodes_file(TALLOC_CTX *mem_ctx,
2438                                       struct tevent_req *req,
2439                                       struct ctdb_req_header *header,
2440                                       struct ctdb_req_control *request)
2441 {
2442         struct client_state *state = tevent_req_data(
2443                 req, struct client_state);
2444         struct ctdbd_context *ctdb = state->ctdb;
2445         struct ctdb_reply_control reply;
2446         struct ctdb_node_map *nodemap;
2447         struct node_map *node_map = ctdb->node_map;
2448         unsigned int i;
2449
2450         reply.rdata.opcode = request->opcode;
2451
2452         nodemap = read_nodes_file(mem_ctx, header->destnode);
2453         if (nodemap == NULL) {
2454                 goto fail;
2455         }
2456
2457         for (i=0; i<nodemap->num; i++) {
2458                 struct node *node;
2459
2460                 if (i < node_map->num_nodes &&
2461                     ctdb_sock_addr_same(&nodemap->node[i].addr,
2462                                         &node_map->node[i].addr)) {
2463                         continue;
2464                 }
2465
2466                 if (nodemap->node[i].flags & NODE_FLAGS_DELETED) {
2467                         int ret;
2468
2469                         node = &node_map->node[i];
2470
2471                         node->flags |= NODE_FLAGS_DELETED;
2472                         ret = ctdb_sock_addr_from_string("0.0.0.0", &node->addr,
2473                                                          false);
2474                         if (ret != 0) {
2475                                 /* Can't happen, but Coverity... */
2476                                 goto fail;
2477                         }
2478
2479                         continue;
2480                 }
2481
2482                 if (i < node_map->num_nodes &&
2483                     node_map->node[i].flags & NODE_FLAGS_DELETED) {
2484                         node = &node_map->node[i];
2485
2486                         node->flags &= ~NODE_FLAGS_DELETED;
2487                         node->addr = nodemap->node[i].addr;
2488
2489                         continue;
2490                 }
2491
2492                 node_map->node = talloc_realloc(node_map, node_map->node,
2493                                                 struct node,
2494                                                 node_map->num_nodes+1);
2495                 if (node_map->node == NULL) {
2496                         goto fail;
2497                 }
2498                 node = &node_map->node[node_map->num_nodes];
2499
2500                 node->addr = nodemap->node[i].addr;
2501                 node->pnn = nodemap->node[i].pnn;
2502                 node->flags = 0;
2503                 node->capabilities = CTDB_CAP_DEFAULT;
2504                 node->recovery_disabled = false;
2505                 node->recovery_substate = NULL;
2506
2507                 node_map->num_nodes += 1;
2508         }
2509
2510         talloc_free(nodemap);
2511
2512         reply.status = 0;
2513         reply.errmsg = NULL;
2514         client_send_control(req, header, &reply);
2515         return;
2516
2517 fail:
2518         reply.status = -1;
2519         reply.errmsg = "Memory error";
2520         client_send_control(req, header, &reply);
2521 }
2522
2523 static void control_get_capabilities(TALLOC_CTX *mem_ctx,
2524                                      struct tevent_req *req,
2525                                      struct ctdb_req_header *header,
2526                                      struct ctdb_req_control *request)
2527 {
2528         struct client_state *state = tevent_req_data(
2529                 req, struct client_state);
2530         struct ctdbd_context *ctdb = state->ctdb;
2531         struct ctdb_reply_control reply;
2532         struct node *node;
2533         uint32_t caps = 0;
2534
2535         reply.rdata.opcode = request->opcode;
2536
2537         node = &ctdb->node_map->node[header->destnode];
2538         caps = node->capabilities;
2539
2540         if (node->flags & NODE_FLAGS_FAKE_TIMEOUT) {
2541                 /* Don't send reply */
2542                 return;
2543         }
2544
2545         reply.rdata.data.caps = caps;
2546         reply.status = 0;
2547         reply.errmsg = NULL;
2548
2549         client_send_control(req, header, &reply);
2550 }
2551
2552 static void control_release_ip(TALLOC_CTX *mem_ctx,
2553                                struct tevent_req *req,
2554                                struct ctdb_req_header *header,
2555                                struct ctdb_req_control *request)
2556 {
2557         struct client_state *state = tevent_req_data(
2558                 req, struct client_state);
2559         struct ctdbd_context *ctdb = state->ctdb;
2560         struct ctdb_public_ip *ip = request->rdata.data.pubip;
2561         struct ctdb_reply_control reply;
2562         struct ctdb_public_ip_list *ips = NULL;
2563         struct ctdb_public_ip *t = NULL;
2564         unsigned int i;
2565
2566         reply.rdata.opcode = request->opcode;
2567
2568         if (ctdb->known_ips == NULL) {
2569                 D_INFO("RELEASE_IP %s - not a public IP\n",
2570                        ctdb_sock_addr_to_string(mem_ctx, &ip->addr, false));
2571                 goto done;
2572         }
2573
2574         ips = &ctdb->known_ips[header->destnode];
2575
2576         t = NULL;
2577         for (i = 0; i < ips->num; i++) {
2578                 if (ctdb_sock_addr_same_ip(&ips->ip[i].addr, &ip->addr)) {
2579                         t = &ips->ip[i];
2580                         break;
2581                 }
2582         }
2583         if (t == NULL) {
2584                 D_INFO("RELEASE_IP %s - not a public IP\n",
2585                        ctdb_sock_addr_to_string(mem_ctx, &ip->addr, false));
2586                 goto done;
2587         }
2588
2589         if (t->pnn != header->destnode) {
2590                 if (header->destnode == ip->pnn) {
2591                         D_ERR("error: RELEASE_IP %s - to TAKE_IP node %d\n",
2592                               ctdb_sock_addr_to_string(mem_ctx,
2593                                                        &ip->addr, false),
2594                               ip->pnn);
2595                         reply.status = -1;
2596                         reply.errmsg = "RELEASE_IP to TAKE_IP node";
2597                         client_send_control(req, header, &reply);
2598                         return;
2599                 }
2600
2601                 D_INFO("RELEASE_IP %s - to node %d - redundant\n",
2602                        ctdb_sock_addr_to_string(mem_ctx, &ip->addr, false),
2603                        ip->pnn);
2604                 t->pnn = ip->pnn;
2605         } else {
2606                 D_NOTICE("RELEASE_IP %s - to node %d\n",
2607                          ctdb_sock_addr_to_string(mem_ctx, &ip->addr, false),
2608                           ip->pnn);
2609                 t->pnn = ip->pnn;
2610         }
2611
2612 done:
2613         reply.status = 0;
2614         reply.errmsg = NULL;
2615         client_send_control(req, header, &reply);
2616 }
2617
2618 static void control_takeover_ip(TALLOC_CTX *mem_ctx,
2619                                 struct tevent_req *req,
2620                                 struct ctdb_req_header *header,
2621                                 struct ctdb_req_control *request)
2622 {
2623         struct client_state *state = tevent_req_data(
2624                 req, struct client_state);
2625         struct ctdbd_context *ctdb = state->ctdb;
2626         struct ctdb_public_ip *ip = request->rdata.data.pubip;
2627         struct ctdb_reply_control reply;
2628         struct ctdb_public_ip_list *ips = NULL;
2629         struct ctdb_public_ip *t = NULL;
2630         unsigned int i;
2631
2632         reply.rdata.opcode = request->opcode;
2633
2634         if (ctdb->known_ips == NULL) {
2635                 D_INFO("TAKEOVER_IP %s - not a public IP\n",
2636                        ctdb_sock_addr_to_string(mem_ctx, &ip->addr, false));
2637                 goto done;
2638         }
2639
2640         ips = &ctdb->known_ips[header->destnode];
2641
2642         t = NULL;
2643         for (i = 0; i < ips->num; i++) {
2644                 if (ctdb_sock_addr_same_ip(&ips->ip[i].addr, &ip->addr)) {
2645                         t = &ips->ip[i];
2646                         break;
2647                 }
2648         }
2649         if (t == NULL) {
2650                 D_INFO("TAKEOVER_IP %s - not a public IP\n",
2651                        ctdb_sock_addr_to_string(mem_ctx, &ip->addr, false));
2652                 goto done;
2653         }
2654
2655         if (t->pnn == header->destnode) {
2656                 D_INFO("TAKEOVER_IP %s - redundant\n",
2657                        ctdb_sock_addr_to_string(mem_ctx, &ip->addr, false));
2658         } else {
2659                 D_NOTICE("TAKEOVER_IP %s\n",
2660                          ctdb_sock_addr_to_string(mem_ctx, &ip->addr, false));
2661                 t->pnn = ip->pnn;
2662         }
2663
2664 done:
2665         reply.status = 0;
2666         reply.errmsg = NULL;
2667         client_send_control(req, header, &reply);
2668 }
2669
2670 static void control_get_public_ips(TALLOC_CTX *mem_ctx,
2671                                    struct tevent_req *req,
2672                                    struct ctdb_req_header *header,
2673                                    struct ctdb_req_control *request)
2674 {
2675         struct client_state *state = tevent_req_data(
2676                 req, struct client_state);
2677         struct ctdbd_context *ctdb = state->ctdb;
2678         struct ctdb_reply_control reply;
2679         struct ctdb_public_ip_list *ips = NULL;
2680
2681         reply.rdata.opcode = request->opcode;
2682
2683         if (ctdb->known_ips == NULL) {
2684                 /* No IPs defined so create a dummy empty struct and ship it */
2685                 ips = talloc_zero(mem_ctx, struct ctdb_public_ip_list);;
2686                 if (ips == NULL) {
2687                         reply.status = ENOMEM;
2688                         reply.errmsg = "Memory error";
2689                         goto done;
2690                 }
2691                 goto ok;
2692         }
2693
2694         ips = &ctdb->known_ips[header->destnode];
2695
2696         if (request->flags & CTDB_PUBLIC_IP_FLAGS_ONLY_AVAILABLE) {
2697                 /* If runstate is not RUNNING or a node is then return
2698                  * no available IPs.  Don't worry about interface
2699                  * states here - we're not faking down to that level.
2700                  */
2701                 uint32_t flags = ctdb->node_map->node[header->destnode].flags;
2702                 if (ctdb->runstate != CTDB_RUNSTATE_RUNNING ||
2703                     ((flags & (NODE_FLAGS_INACTIVE|NODE_FLAGS_DISABLED)) != 0)) {
2704                         /* No available IPs: return dummy empty struct */
2705                         ips = talloc_zero(mem_ctx, struct ctdb_public_ip_list);;
2706                         if (ips == NULL) {
2707                                 reply.status = ENOMEM;
2708                                 reply.errmsg = "Memory error";
2709                                 goto done;
2710                         }
2711                 }
2712         }
2713
2714 ok:
2715         reply.rdata.data.pubip_list = ips;
2716         reply.status = 0;
2717         reply.errmsg = NULL;
2718
2719 done:
2720         client_send_control(req, header, &reply);
2721 }
2722
2723 static void control_get_nodemap(TALLOC_CTX *mem_ctx,
2724                                 struct tevent_req *req,
2725                                 struct ctdb_req_header *header,
2726                                 struct ctdb_req_control *request)
2727 {
2728         struct client_state *state = tevent_req_data(
2729                 req, struct client_state);
2730         struct ctdbd_context *ctdb = state->ctdb;
2731         struct ctdb_reply_control reply;
2732         struct ctdb_node_map *nodemap;
2733         struct node *node;
2734         unsigned int i;
2735
2736         reply.rdata.opcode = request->opcode;
2737
2738         nodemap = talloc_zero(mem_ctx, struct ctdb_node_map);
2739         if (nodemap == NULL) {
2740                 goto fail;
2741         }
2742
2743         nodemap->num = ctdb->node_map->num_nodes;
2744         nodemap->node = talloc_array(nodemap, struct ctdb_node_and_flags,
2745                                      nodemap->num);
2746         if (nodemap->node == NULL) {
2747                 goto fail;
2748         }
2749
2750         for (i=0; i<nodemap->num; i++) {
2751                 node = &ctdb->node_map->node[i];
2752                 nodemap->node[i] = (struct ctdb_node_and_flags) {
2753                         .pnn = node->pnn,
2754                         .flags = node->flags,
2755                         .addr = node->addr,
2756                 };
2757         }
2758
2759         reply.rdata.data.nodemap = nodemap;
2760         reply.status = 0;
2761         reply.errmsg = NULL;
2762         client_send_control(req, header, &reply);
2763         return;
2764
2765 fail:
2766         reply.status = -1;
2767         reply.errmsg = "Memory error";
2768         client_send_control(req, header, &reply);
2769 }
2770
2771 static void control_get_reclock_file(TALLOC_CTX *mem_ctx,
2772                                      struct tevent_req *req,
2773                                      struct ctdb_req_header *header,
2774                                      struct ctdb_req_control *request)
2775 {
2776         struct client_state *state = tevent_req_data(
2777                 req, struct client_state);
2778         struct ctdbd_context *ctdb = state->ctdb;
2779         struct ctdb_reply_control reply;
2780
2781         reply.rdata.opcode = request->opcode;
2782
2783         if (ctdb->reclock != NULL) {
2784                 reply.rdata.data.reclock_file =
2785                         talloc_strdup(mem_ctx, ctdb->reclock);
2786                 if (reply.rdata.data.reclock_file == NULL) {
2787                         reply.status = ENOMEM;
2788                         reply.errmsg = "Memory error";
2789                         goto done;
2790                 }
2791         } else {
2792                 reply.rdata.data.reclock_file = NULL;
2793         }
2794
2795         reply.status = 0;
2796         reply.errmsg = NULL;
2797
2798 done:
2799         client_send_control(req, header, &reply);
2800 }
2801
2802 static void control_stop_node(TALLOC_CTX *mem_ctx,
2803                               struct tevent_req *req,
2804                               struct ctdb_req_header *header,
2805                               struct ctdb_req_control *request)
2806 {
2807         struct client_state *state = tevent_req_data(
2808                 req, struct client_state);
2809         struct ctdbd_context *ctdb = state->ctdb;
2810         struct ctdb_reply_control reply;
2811
2812         reply.rdata.opcode = request->opcode;
2813
2814         DEBUG(DEBUG_INFO, ("Stopping node\n"));
2815         ctdb->node_map->node[header->destnode].flags |= NODE_FLAGS_STOPPED;
2816
2817         reply.status = 0;
2818         reply.errmsg = NULL;
2819
2820         client_send_control(req, header, &reply);
2821         return;
2822 }
2823
2824 static void control_continue_node(TALLOC_CTX *mem_ctx,
2825                                   struct tevent_req *req,
2826                                   struct ctdb_req_header *header,
2827                                   struct ctdb_req_control *request)
2828 {
2829         struct client_state *state = tevent_req_data(
2830                 req, struct client_state);
2831         struct ctdbd_context *ctdb = state->ctdb;
2832         struct ctdb_reply_control reply;
2833
2834         reply.rdata.opcode = request->opcode;
2835
2836         DEBUG(DEBUG_INFO, ("Continue node\n"));
2837         ctdb->node_map->node[header->destnode].flags &= ~NODE_FLAGS_STOPPED;
2838
2839         reply.status = 0;
2840         reply.errmsg = NULL;
2841
2842         client_send_control(req, header, &reply);
2843         return;
2844 }
2845
2846 static void set_ban_state_callback(struct tevent_req *subreq)
2847 {
2848         struct node *node = tevent_req_callback_data(
2849                 subreq, struct node);
2850         bool status;
2851
2852         status = tevent_wakeup_recv(subreq);
2853         TALLOC_FREE(subreq);
2854         if (! status) {
2855                 DEBUG(DEBUG_INFO, ("tevent_wakeup_recv failed\n"));
2856         }
2857
2858         node->flags &= ~NODE_FLAGS_BANNED;
2859 }
2860
2861 static void control_set_ban_state(TALLOC_CTX *mem_ctx,
2862                                   struct tevent_req *req,
2863                                   struct ctdb_req_header *header,
2864                                   struct ctdb_req_control *request)
2865 {
2866         struct client_state *state = tevent_req_data(
2867                 req, struct client_state);
2868         struct tevent_req *subreq;
2869         struct ctdbd_context *ctdb = state->ctdb;
2870         struct ctdb_ban_state *ban = request->rdata.data.ban_state;
2871         struct ctdb_reply_control reply;
2872         struct node *node;
2873
2874         reply.rdata.opcode = request->opcode;
2875
2876         if (ban->pnn != header->destnode) {
2877                 DEBUG(DEBUG_INFO,
2878                       ("SET_BAN_STATE control for PNN %d rejected\n",
2879                        ban->pnn));
2880                 reply.status = EINVAL;
2881                 goto fail;
2882         }
2883
2884         node = &ctdb->node_map->node[header->destnode];
2885
2886         if (ban->time == 0) {
2887                 DEBUG(DEBUG_INFO,("Unbanning this node\n"));
2888                 node->flags &= ~NODE_FLAGS_BANNED;
2889                 goto done;
2890         }
2891
2892         subreq = tevent_wakeup_send(ctdb->node_map, state->ev,
2893                                     tevent_timeval_current_ofs(
2894                                             ban->time, 0));
2895         if (subreq == NULL) {
2896                 reply.status = ENOMEM;
2897                 goto fail;
2898         }
2899         tevent_req_set_callback(subreq, set_ban_state_callback, node);
2900
2901         DEBUG(DEBUG_INFO, ("Banning this node for %d seconds\n", ban->time));
2902         node->flags |= NODE_FLAGS_BANNED;
2903         ctdb->vnn_map->generation = INVALID_GENERATION;
2904
2905 done:
2906         reply.status = 0;
2907         reply.errmsg = NULL;
2908
2909         client_send_control(req, header, &reply);
2910         return;
2911
2912 fail:
2913         reply.errmsg = "Failed to ban node";
2914 }
2915
2916 static void control_trans3_commit(TALLOC_CTX *mem_ctx,
2917                                   struct tevent_req *req,
2918                                   struct ctdb_req_header *header,
2919                                   struct ctdb_req_control *request)
2920 {
2921         struct client_state *state = tevent_req_data(
2922                 req, struct client_state);
2923         struct ctdbd_context *ctdb = state->ctdb;
2924         struct ctdb_reply_control reply;
2925         struct database *db;
2926         int ret;
2927
2928         reply.rdata.opcode = request->opcode;
2929
2930         db = database_find(ctdb->db_map, request->rdata.data.recbuf->db_id);
2931         if (db == NULL) {
2932                 reply.status = -1;
2933                 reply.errmsg = "Unknown database";
2934                 client_send_control(req, header, &reply);
2935                 return;
2936         }
2937
2938         if (! (db->flags &
2939                (CTDB_DB_FLAGS_PERSISTENT|CTDB_DB_FLAGS_REPLICATED))) {
2940                 reply.status = -1;
2941                 reply.errmsg = "Transactions on volatile database";
2942                 client_send_control(req, header, &reply);
2943                 return;
2944         }
2945
2946         ret = ltdb_transaction(db, request->rdata.data.recbuf);
2947         if (ret != 0) {
2948                 reply.status = -1;
2949                 reply.errmsg = "Transaction failed";
2950                 client_send_control(req, header, &reply);
2951                 return;
2952         }
2953
2954         reply.status = 0;
2955         reply.errmsg = NULL;
2956         client_send_control(req, header, &reply);
2957 }
2958
2959 static void control_get_db_seqnum(TALLOC_CTX *mem_ctx,
2960                                struct tevent_req *req,
2961                                struct ctdb_req_header *header,
2962                                struct ctdb_req_control *request)
2963 {
2964         struct client_state *state = tevent_req_data(
2965                 req, struct client_state);
2966         struct ctdbd_context *ctdb = state->ctdb;
2967         struct ctdb_reply_control reply;
2968         struct database *db;
2969         int ret;
2970
2971         reply.rdata.opcode = request->opcode;
2972
2973         db = database_find(ctdb->db_map, request->rdata.data.db_id);
2974         if (db == NULL) {
2975                 reply.status = ENOENT;
2976                 reply.errmsg = "Database not found";
2977         } else {
2978                 uint64_t seqnum;
2979
2980                 ret = database_seqnum(db, &seqnum);
2981                 if (ret == 0) {
2982                         reply.rdata.data.seqnum = seqnum;
2983                         reply.status = 0;
2984                         reply.errmsg = NULL;
2985                 } else {
2986                         reply.status = ret;
2987                         reply.errmsg = "Failed to get seqnum";
2988                 }
2989         }
2990
2991         client_send_control(req, header, &reply);
2992 }
2993
2994 static void control_db_get_health(TALLOC_CTX *mem_ctx,
2995                                   struct tevent_req *req,
2996                                   struct ctdb_req_header *header,
2997                                   struct ctdb_req_control *request)
2998 {
2999         struct client_state *state = tevent_req_data(
3000                 req, struct client_state);
3001         struct ctdbd_context *ctdb = state->ctdb;
3002         struct ctdb_reply_control reply;
3003         struct database *db;
3004
3005         reply.rdata.opcode = request->opcode;
3006
3007         db = database_find(ctdb->db_map, request->rdata.data.db_id);
3008         if (db == NULL) {
3009                 reply.status = ENOENT;
3010                 reply.errmsg = "Database not found";
3011         } else {
3012                 reply.rdata.data.reason = NULL;
3013                 reply.status = 0;
3014                 reply.errmsg = NULL;
3015         }
3016
3017         client_send_control(req, header, &reply);
3018 }
3019
3020 static struct ctdb_iface_list *get_ctdb_iface_list(TALLOC_CTX *mem_ctx,
3021                                                    struct ctdbd_context *ctdb)
3022 {
3023         struct ctdb_iface_list *iface_list;
3024         struct interface *iface;
3025         unsigned int i;
3026
3027         iface_list = talloc_zero(mem_ctx, struct ctdb_iface_list);
3028         if (iface_list == NULL) {
3029                 goto done;
3030         }
3031
3032         iface_list->num = ctdb->iface_map->num;
3033         iface_list->iface = talloc_array(iface_list, struct ctdb_iface,
3034                                          iface_list->num);
3035         if (iface_list->iface == NULL) {
3036                 TALLOC_FREE(iface_list);
3037                 goto done;
3038         }
3039
3040         for (i=0; i<iface_list->num; i++) {
3041                 iface = &ctdb->iface_map->iface[i];
3042                 iface_list->iface[i] = (struct ctdb_iface) {
3043                         .link_state = iface->link_up,
3044                         .references = iface->references,
3045                 };
3046                 strlcpy(iface_list->iface[i].name, iface->name,
3047                         sizeof(iface_list->iface[i].name));
3048         }
3049
3050 done:
3051         return iface_list;
3052 }
3053
3054 static void control_get_public_ip_info(TALLOC_CTX *mem_ctx,
3055                                        struct tevent_req *req,
3056                                        struct ctdb_req_header *header,
3057                                        struct ctdb_req_control *request)
3058 {
3059         struct client_state *state = tevent_req_data(
3060                 req, struct client_state);
3061         struct ctdbd_context *ctdb = state->ctdb;
3062         struct ctdb_reply_control reply;
3063         ctdb_sock_addr *addr = request->rdata.data.addr;
3064         struct ctdb_public_ip_list *known = NULL;
3065         struct ctdb_public_ip_info *info = NULL;
3066         unsigned i;
3067
3068         reply.rdata.opcode = request->opcode;
3069
3070         info = talloc_zero(mem_ctx, struct ctdb_public_ip_info);
3071         if (info == NULL) {
3072                 reply.status = ENOMEM;
3073                 reply.errmsg = "Memory error";
3074                 goto done;
3075         }
3076
3077         reply.rdata.data.ipinfo = info;
3078
3079         if (ctdb->known_ips != NULL) {
3080                 known = &ctdb->known_ips[header->destnode];
3081         } else {
3082                 /* No IPs defined so create a dummy empty struct and
3083                  * fall through.  The given IP won't be matched
3084                  * below...
3085                  */
3086                 known = talloc_zero(mem_ctx, struct ctdb_public_ip_list);;
3087                 if (known == NULL) {
3088                         reply.status = ENOMEM;
3089                         reply.errmsg = "Memory error";
3090                         goto done;
3091                 }
3092         }
3093
3094         for (i = 0; i < known->num; i++) {
3095                 if (ctdb_sock_addr_same_ip(&known->ip[i].addr,
3096                                            addr)) {
3097                         break;
3098                 }
3099         }
3100
3101         if (i == known->num) {
3102                 D_ERR("GET_PUBLIC_IP_INFO: not known public IP %s\n",
3103                       ctdb_sock_addr_to_string(mem_ctx, addr, false));
3104                 reply.status = -1;
3105                 reply.errmsg = "Unknown address";
3106                 goto done;
3107         }
3108
3109         info->ip = known->ip[i];
3110
3111         /* The fake PUBLICIPS stanza and resulting known_ips data
3112          * don't know anything about interfaces, so completely fake
3113          * this.
3114          */
3115         info->active_idx = 0;
3116
3117         info->ifaces = get_ctdb_iface_list(mem_ctx, ctdb);
3118         if (info->ifaces == NULL) {
3119                 reply.status = ENOMEM;
3120                 reply.errmsg = "Memory error";
3121                 goto done;
3122         }
3123
3124         reply.status = 0;
3125         reply.errmsg = NULL;
3126
3127 done:
3128         client_send_control(req, header, &reply);
3129 }
3130
3131 static void control_get_ifaces(TALLOC_CTX *mem_ctx,
3132                                struct tevent_req *req,
3133                                struct ctdb_req_header *header,
3134                                struct ctdb_req_control *request)
3135 {
3136         struct client_state *state = tevent_req_data(
3137                 req, struct client_state);
3138         struct ctdbd_context *ctdb = state->ctdb;
3139         struct ctdb_reply_control reply;
3140         struct ctdb_iface_list *iface_list;
3141
3142         reply.rdata.opcode = request->opcode;
3143
3144         iface_list = get_ctdb_iface_list(mem_ctx, ctdb);
3145         if (iface_list == NULL) {
3146                 goto fail;
3147         }
3148
3149         reply.rdata.data.iface_list = iface_list;
3150         reply.status = 0;
3151         reply.errmsg = NULL;
3152         client_send_control(req, header, &reply);
3153         return;
3154
3155 fail:
3156         reply.status = -1;
3157         reply.errmsg = "Memory error";
3158         client_send_control(req, header, &reply);
3159 }
3160
3161 static void control_set_iface_link_state(TALLOC_CTX *mem_ctx,
3162                                          struct tevent_req *req,
3163                                          struct ctdb_req_header *header,
3164                                          struct ctdb_req_control *request)
3165 {
3166         struct client_state *state = tevent_req_data(
3167                 req, struct client_state);
3168         struct ctdbd_context *ctdb = state->ctdb;
3169         struct ctdb_reply_control reply;
3170         struct ctdb_iface *in_iface;
3171         struct interface *iface = NULL;
3172         bool link_up = false;
3173         int i;
3174
3175         reply.rdata.opcode = request->opcode;
3176
3177         in_iface = request->rdata.data.iface;
3178
3179         if (in_iface->name[CTDB_IFACE_SIZE] != '\0') {
3180                 reply.errmsg = "interface name not terminated";
3181                 goto fail;
3182         }
3183
3184         switch (in_iface->link_state) {
3185                 case 0:
3186                         link_up = false;
3187                         break;
3188
3189                 case 1:
3190                         link_up = true;
3191                         break;
3192
3193                 default:
3194                         reply.errmsg = "invalid link state";
3195                         goto fail;
3196         }
3197
3198         if (in_iface->references != 0) {
3199                 reply.errmsg = "references should be 0";
3200                 goto fail;
3201         }
3202
3203         for (i=0; i<ctdb->iface_map->num; i++) {
3204                 if (strcmp(ctdb->iface_map->iface[i].name,
3205                            in_iface->name) == 0) {
3206                         iface = &ctdb->iface_map->iface[i];
3207                         break;
3208                 }
3209         }
3210
3211         if (iface == NULL) {
3212                 reply.errmsg = "interface not found";
3213                 goto fail;
3214         }
3215
3216         iface->link_up = link_up;
3217
3218         reply.status = 0;
3219         reply.errmsg = NULL;
3220         client_send_control(req, header, &reply);
3221         return;
3222
3223 fail:
3224         reply.status = -1;
3225         client_send_control(req, header, &reply);
3226 }
3227
3228 static void control_set_db_readonly(TALLOC_CTX *mem_ctx,
3229                                     struct tevent_req *req,
3230                                     struct ctdb_req_header *header,
3231                                     struct ctdb_req_control *request)
3232 {
3233         struct client_state *state = tevent_req_data(
3234                 req, struct client_state);
3235         struct ctdbd_context *ctdb = state->ctdb;
3236         struct ctdb_reply_control reply;
3237         struct database *db;
3238
3239         reply.rdata.opcode = request->opcode;
3240
3241         db = database_find(ctdb->db_map, request->rdata.data.db_id);
3242         if (db == NULL) {
3243                 reply.status = ENOENT;
3244                 reply.errmsg = "Database not found";
3245                 goto done;
3246         }
3247
3248         if (db->flags & CTDB_DB_FLAGS_PERSISTENT) {
3249                 reply.status = EINVAL;
3250                 reply.errmsg = "Can not set READONLY on persistent db";
3251                 goto done;
3252         }
3253
3254         db->flags |= CTDB_DB_FLAGS_READONLY;
3255         reply.status = 0;
3256         reply.errmsg = NULL;
3257
3258 done:
3259         client_send_control(req, header, &reply);
3260 }
3261
3262 struct traverse_start_ext_state {
3263         struct tevent_req *req;
3264         struct ctdb_req_header *header;
3265         uint32_t reqid;
3266         uint64_t srvid;
3267         bool withemptyrecords;
3268         int status;
3269 };
3270
3271 static int traverse_start_ext_handler(struct tdb_context *tdb,
3272                                       TDB_DATA key, TDB_DATA data,
3273                                       void *private_data)
3274 {
3275         struct traverse_start_ext_state *state =
3276                 (struct traverse_start_ext_state *)private_data;
3277         struct ctdb_rec_data rec;
3278         struct ctdb_req_message_data message;
3279         size_t np;
3280
3281         if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
3282                 return 0;
3283         }
3284
3285         if ((data.dsize == sizeof(struct ctdb_ltdb_header)) &&
3286             (!state->withemptyrecords)) {
3287                 return 0;
3288         }
3289
3290         rec = (struct ctdb_rec_data) {
3291                 .reqid = state->reqid,
3292                 .header = NULL,
3293                 .key = key,
3294                 .data = data,
3295         };
3296
3297         message.srvid = state->srvid;
3298         message.data.dsize = ctdb_rec_data_len(&rec);
3299         message.data.dptr = talloc_size(state->req, message.data.dsize);
3300         if (message.data.dptr == NULL) {
3301                 state->status = ENOMEM;
3302                 return 1;
3303         }
3304
3305         ctdb_rec_data_push(&rec, message.data.dptr, &np);
3306         client_send_message(state->req, state->header, &message);
3307
3308         talloc_free(message.data.dptr);
3309
3310         return 0;
3311 }
3312
3313 static void control_traverse_start_ext(TALLOC_CTX *mem_ctx,
3314                                        struct tevent_req *req,
3315                                        struct ctdb_req_header *header,
3316                                        struct ctdb_req_control *request)
3317 {
3318         struct client_state *state = tevent_req_data(
3319                 req, struct client_state);
3320         struct ctdbd_context *ctdb = state->ctdb;
3321         struct ctdb_reply_control reply;
3322         struct database *db;
3323         struct ctdb_traverse_start_ext *ext;
3324         struct traverse_start_ext_state t_state;
3325         struct ctdb_rec_data rec;
3326         struct ctdb_req_message_data message;
3327         uint8_t buffer[32];
3328         size_t np;
3329         int ret;
3330
3331         reply.rdata.opcode = request->opcode;
3332
3333         ext = request->rdata.data.traverse_start_ext;
3334
3335         db = database_find(ctdb->db_map, ext->db_id);
3336         if (db == NULL) {
3337                 reply.status = -1;
3338                 reply.errmsg = "Unknown database";
3339                 client_send_control(req, header, &reply);
3340                 return;
3341         }
3342
3343         t_state = (struct traverse_start_ext_state) {
3344                 .req = req,
3345                 .header = header,
3346                 .reqid = ext->reqid,
3347                 .srvid = ext->srvid,
3348                 .withemptyrecords = ext->withemptyrecords,
3349         };
3350
3351         ret = tdb_traverse_read(db->tdb, traverse_start_ext_handler, &t_state);
3352         DEBUG(DEBUG_INFO, ("traversed %d records\n", ret));
3353         if (t_state.status != 0) {
3354                 reply.status = -1;
3355                 reply.errmsg = "Memory error";
3356                 client_send_control(req, header, &reply);
3357         }
3358
3359         reply.status = 0;
3360         client_send_control(req, header, &reply);
3361
3362         rec = (struct ctdb_rec_data) {
3363                 .reqid = ext->reqid,
3364                 .header = NULL,
3365                 .key = tdb_null,
3366                 .data = tdb_null,
3367         };
3368
3369         message.srvid = ext->srvid;
3370         message.data.dsize = ctdb_rec_data_len(&rec);
3371         ctdb_rec_data_push(&rec, buffer, &np);
3372         message.data.dptr = buffer;
3373         client_send_message(req, header, &message);
3374 }
3375
3376 static void control_set_db_sticky(TALLOC_CTX *mem_ctx,
3377                                     struct tevent_req *req,
3378                                     struct ctdb_req_header *header,
3379                                     struct ctdb_req_control *request)
3380 {
3381         struct client_state *state = tevent_req_data(
3382                 req, struct client_state);
3383         struct ctdbd_context *ctdb = state->ctdb;
3384         struct ctdb_reply_control reply;
3385         struct database *db;
3386
3387         reply.rdata.opcode = request->opcode;
3388
3389         db = database_find(ctdb->db_map, request->rdata.data.db_id);
3390         if (db == NULL) {
3391                 reply.status = ENOENT;
3392                 reply.errmsg = "Database not found";
3393                 goto done;
3394         }
3395
3396         if (db->flags & CTDB_DB_FLAGS_PERSISTENT) {
3397                 reply.status = EINVAL;
3398                 reply.errmsg = "Can not set STICKY on persistent db";
3399                 goto done;
3400         }
3401
3402         db->flags |= CTDB_DB_FLAGS_STICKY;
3403         reply.status = 0;
3404         reply.errmsg = NULL;
3405
3406 done:
3407         client_send_control(req, header, &reply);
3408 }
3409
3410 static void control_ipreallocated(TALLOC_CTX *mem_ctx,
3411                                   struct tevent_req *req,
3412                                   struct ctdb_req_header *header,
3413                                   struct ctdb_req_control *request)
3414 {
3415         struct ctdb_reply_control reply;
3416
3417         /* Always succeed */
3418         reply.rdata.opcode = request->opcode;
3419         reply.status = 0;
3420         reply.errmsg = NULL;
3421
3422         client_send_control(req, header, &reply);
3423 }
3424
3425 static void control_get_runstate(TALLOC_CTX *mem_ctx,
3426                                  struct tevent_req *req,
3427                                  struct ctdb_req_header *header,
3428                                  struct ctdb_req_control *request)
3429 {
3430         struct client_state *state = tevent_req_data(
3431                 req, struct client_state);
3432         struct ctdbd_context *ctdb = state->ctdb;
3433         struct ctdb_reply_control reply;
3434
3435         reply.rdata.opcode = request->opcode;
3436         reply.rdata.data.runstate = ctdb->runstate;
3437         reply.status = 0;
3438         reply.errmsg = NULL;
3439
3440         client_send_control(req, header, &reply);
3441 }
3442
3443 static void control_get_nodes_file(TALLOC_CTX *mem_ctx,
3444                                    struct tevent_req *req,
3445                                    struct ctdb_req_header *header,
3446                                    struct ctdb_req_control *request)
3447 {
3448         struct ctdb_reply_control reply;
3449         struct ctdb_node_map *nodemap;
3450
3451         reply.rdata.opcode = request->opcode;
3452
3453         nodemap = read_nodes_file(mem_ctx, header->destnode);
3454         if (nodemap == NULL) {
3455                 goto fail;
3456         }
3457
3458         reply.rdata.data.nodemap = nodemap;
3459         reply.status = 0;
3460         reply.errmsg = NULL;
3461         client_send_control(req, header, &reply);
3462         return;
3463
3464 fail:
3465         reply.status = -1;
3466         reply.errmsg = "Failed to read nodes file";
3467         client_send_control(req, header, &reply);
3468 }
3469
3470 static void control_db_open_flags(TALLOC_CTX *mem_ctx,
3471                                   struct tevent_req *req,
3472                                   struct ctdb_req_header *header,
3473                                   struct ctdb_req_control *request)
3474 {
3475         struct client_state *state = tevent_req_data(
3476                 req, struct client_state);
3477         struct ctdbd_context *ctdb = state->ctdb;
3478         struct ctdb_reply_control reply;
3479         struct database *db;
3480
3481         reply.rdata.opcode = request->opcode;
3482
3483         db = database_find(ctdb->db_map, request->rdata.data.db_id);
3484         if (db == NULL) {
3485                 reply.status = ENOENT;
3486                 reply.errmsg = "Database not found";
3487         } else {
3488                 reply.rdata.data.tdb_flags = database_flags(db->flags);
3489                 reply.status = 0;
3490                 reply.errmsg = NULL;
3491         }
3492
3493         client_send_control(req, header, &reply);
3494 }
3495
3496 static void control_db_attach_replicated(TALLOC_CTX *mem_ctx,
3497                                          struct tevent_req *req,
3498                                          struct ctdb_req_header *header,
3499                                          struct ctdb_req_control *request)
3500 {
3501         struct client_state *state = tevent_req_data(
3502                 req, struct client_state);
3503         struct ctdbd_context *ctdb = state->ctdb;
3504         struct ctdb_reply_control reply;
3505         struct database *db;
3506
3507         reply.rdata.opcode = request->opcode;
3508
3509         for (db = ctdb->db_map->db; db != NULL; db = db->next) {
3510                 if (strcmp(db->name, request->rdata.data.db_name) == 0) {
3511                         goto done;
3512                 }
3513         }
3514
3515         db = database_new(ctdb->db_map, request->rdata.data.db_name,
3516                           CTDB_DB_FLAGS_REPLICATED);
3517         if (db == NULL) {
3518                 reply.status = -1;
3519                 reply.errmsg = "Failed to attach database";
3520                 client_send_control(req, header, &reply);
3521                 return;
3522         }
3523
3524 done:
3525         reply.rdata.data.db_id = db->id;
3526         reply.status = 0;
3527         reply.errmsg = NULL;
3528         client_send_control(req, header, &reply);
3529 }
3530
3531 static void control_check_pid_srvid(TALLOC_CTX *mem_ctx,
3532                                     struct tevent_req *req,
3533                                     struct ctdb_req_header *header,
3534                                     struct ctdb_req_control *request)
3535 {
3536         struct client_state *state = tevent_req_data(
3537                 req, struct client_state);
3538         struct ctdbd_context *ctdb = state->ctdb;
3539         struct ctdb_client *client;
3540         struct client_state *cstate;
3541         struct ctdb_reply_control reply;
3542         bool pid_found, srvid_found;
3543         int ret;
3544
3545         reply.rdata.opcode = request->opcode;
3546
3547         pid_found = false;
3548         srvid_found = false;
3549
3550         for (client=ctdb->client_list; client != NULL; client=client->next) {
3551                 if (client->pid == request->rdata.data.pid_srvid->pid) {
3552                         pid_found = true;
3553                         cstate = (struct client_state *)client->state;
3554                         ret = srvid_exists(ctdb->srv,
3555                                            request->rdata.data.pid_srvid->srvid,
3556                                            cstate);
3557                         if (ret == 0) {
3558                                 srvid_found = true;
3559                                 ret = kill(cstate->pid, 0);
3560                                 if (ret != 0) {
3561                                         reply.status = ret;
3562                                         reply.errmsg = strerror(errno);
3563                                 } else {
3564                                         reply.status = 0;
3565                                         reply.errmsg = NULL;
3566                                 }
3567                         }
3568                 }
3569         }
3570
3571         if (! pid_found) {
3572                 reply.status = -1;
3573                 reply.errmsg = "No client for PID";
3574         } else if (! srvid_found) {
3575                 reply.status = -1;
3576                 reply.errmsg = "No client for PID and SRVID";
3577         }
3578
3579         client_send_control(req, header, &reply);
3580 }
3581
3582 static bool fake_control_failure(TALLOC_CTX *mem_ctx,
3583                                  struct tevent_req *req,
3584                                  struct ctdb_req_header *header,
3585                                  struct ctdb_req_control *request)
3586 {
3587         struct client_state *state = tevent_req_data(
3588                 req, struct client_state);
3589         struct ctdbd_context *ctdb = state->ctdb;
3590         struct ctdb_reply_control reply;
3591         struct fake_control_failure *f = NULL;
3592
3593         D_DEBUG("Checking fake control failure for control %u on node %u\n",
3594                 request->opcode, header->destnode);
3595         for (f = ctdb->control_failures; f != NULL; f = f->next) {
3596                 if (f->opcode == request->opcode &&
3597                     (f->pnn == header->destnode ||
3598                      f->pnn == CTDB_UNKNOWN_PNN)) {
3599
3600                         reply.rdata.opcode = request->opcode;
3601                         if (strcmp(f->error, "TIMEOUT") == 0) {
3602                                 /* Causes no reply */
3603                                 D_ERR("Control %u fake timeout on node %u\n",
3604                                       request->opcode, header->destnode);
3605                                 return true;
3606                         } else if (strcmp(f->error, "ERROR") == 0) {
3607                                 D_ERR("Control %u fake error on node %u\n",
3608                                       request->opcode, header->destnode);
3609                                 reply.status = -1;
3610                                 reply.errmsg = f->comment;
3611                                 client_send_control(req, header, &reply);
3612                                 return true;
3613                         }
3614                 }
3615         }
3616
3617         return false;
3618 }
3619
3620 static void control_error(TALLOC_CTX *mem_ctx,
3621                           struct tevent_req *req,
3622                           struct ctdb_req_header *header,
3623                           struct ctdb_req_control *request)
3624 {
3625         struct ctdb_reply_control reply;
3626
3627         D_DEBUG("Control %u not implemented\n", request->opcode);
3628
3629         reply.rdata.opcode = request->opcode;
3630         reply.status = -1;
3631         reply.errmsg = "Not implemented";
3632
3633         client_send_control(req, header, &reply);
3634 }
3635
3636 /*
3637  * Handling protocol - messages
3638  */
3639
3640 struct disable_recoveries_state {
3641         struct node *node;
3642 };
3643
3644 static void disable_recoveries_callback(struct tevent_req *subreq)
3645 {
3646         struct disable_recoveries_state *substate = tevent_req_callback_data(
3647                 subreq, struct disable_recoveries_state);
3648         bool status;
3649
3650         status = tevent_wakeup_recv(subreq);
3651         TALLOC_FREE(subreq);
3652         if (! status) {
3653                 DEBUG(DEBUG_INFO, ("tevent_wakeup_recv failed\n"));
3654         }
3655
3656         substate->node->recovery_disabled = false;
3657         TALLOC_FREE(substate->node->recovery_substate);
3658 }
3659
3660 static void message_disable_recoveries(TALLOC_CTX *mem_ctx,
3661                                        struct tevent_req *req,
3662                                        struct ctdb_req_header *header,
3663                                        struct ctdb_req_message *request)
3664 {
3665         struct client_state *state = tevent_req_data(
3666                 req, struct client_state);
3667         struct tevent_req *subreq;
3668         struct ctdbd_context *ctdb = state->ctdb;
3669         struct disable_recoveries_state *substate;
3670         struct ctdb_disable_message *disable = request->data.disable;
3671         struct ctdb_req_message_data reply;
3672         struct node *node;
3673         int ret = -1;
3674         TDB_DATA data;
3675
3676         node = &ctdb->node_map->node[header->destnode];
3677
3678         if (disable->timeout == 0) {
3679                 TALLOC_FREE(node->recovery_substate);
3680                 node->recovery_disabled = false;
3681                 DEBUG(DEBUG_INFO, ("Enabled recoveries on node %u\n",
3682                                    header->destnode));
3683                 goto done;
3684         }
3685
3686         substate = talloc_zero(ctdb->node_map,
3687                                struct disable_recoveries_state);
3688         if (substate == NULL) {
3689                 goto fail;
3690         }
3691
3692         substate->node = node;
3693
3694         subreq = tevent_wakeup_send(substate, state->ev,
3695                                     tevent_timeval_current_ofs(
3696                                             disable->timeout, 0));
3697         if (subreq == NULL) {
3698                 talloc_free(substate);
3699                 goto fail;
3700         }
3701         tevent_req_set_callback(subreq, disable_recoveries_callback, substate);
3702
3703         DEBUG(DEBUG_INFO, ("Disabled recoveries for %d seconds on node %u\n",
3704                            disable->timeout, header->destnode));
3705         node->recovery_substate = substate;
3706         node->recovery_disabled = true;
3707
3708 done:
3709         ret = header->destnode;
3710
3711 fail:
3712         reply.srvid = disable->srvid;
3713         data.dptr = (uint8_t *)&ret;
3714         data.dsize = sizeof(int);
3715         reply.data = data;
3716
3717         client_send_message(req, header, &reply);
3718 }
3719
3720 static void message_takeover_run(TALLOC_CTX *mem_ctx,
3721                                  struct tevent_req *req,
3722                                  struct ctdb_req_header *header,
3723                                  struct ctdb_req_message *request)
3724 {
3725         struct client_state *state = tevent_req_data(
3726                 req, struct client_state);
3727         struct ctdbd_context *ctdb = state->ctdb;
3728         struct ctdb_srvid_message *srvid = request->data.msg;
3729         struct ctdb_req_message_data reply;
3730         int ret = -1;
3731         TDB_DATA data;
3732
3733         if (header->destnode != ctdb->node_map->recmaster) {
3734                 /* No reply! Only recmaster replies... */
3735                 return;
3736         }
3737
3738         DEBUG(DEBUG_INFO, ("IP takover run on node %u\n",
3739                            header->destnode));
3740         ret = header->destnode;
3741
3742         reply.srvid = srvid->srvid;
3743         data.dptr = (uint8_t *)&ret;
3744         data.dsize = sizeof(int);
3745         reply.data = data;
3746
3747         client_send_message(req, header, &reply);
3748 }
3749
3750 /*
3751  * Handle a single client
3752  */
3753
3754 static void client_read_handler(uint8_t *buf, size_t buflen,
3755                                 void *private_data);
3756 static void client_dead_handler(void *private_data);
3757 static void client_process_packet(struct tevent_req *req,
3758                                   uint8_t *buf, size_t buflen);
3759 static void client_process_call(struct tevent_req *req,
3760                                 uint8_t *buf, size_t buflen);
3761 static void client_process_message(struct tevent_req *req,
3762                                    uint8_t *buf, size_t buflen);
3763 static void client_process_control(struct tevent_req *req,
3764                                    uint8_t *buf, size_t buflen);
3765 static void client_reply_done(struct tevent_req *subreq);
3766
3767 static struct tevent_req *client_send(TALLOC_CTX *mem_ctx,
3768                                       struct tevent_context *ev,
3769                                       int fd, struct ctdbd_context *ctdb,
3770                                       int pnn)
3771 {
3772         struct tevent_req *req;
3773         struct client_state *state;
3774         int ret;
3775
3776         req = tevent_req_create(mem_ctx, &state, struct client_state);
3777         if (req == NULL) {
3778                 return NULL;
3779         }
3780
3781         state->ev = ev;
3782         state->fd = fd;
3783         state->ctdb = ctdb;
3784         state->pnn = pnn;
3785
3786