a569b0686c3479ffa7ca04f0ea0321d2f92bce4f
[samba.git] / ctdb / tests / src / fake_ctdbd.c
1 /*
2    Fake CTDB server for testing
3
4    Copyright (C) Amitay Isaacs  2016
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "replace.h"
21 #include "system/network.h"
22 #include "system/time.h"
23 #include "system/filesys.h"
24
25 #include <popt.h>
26 #include <talloc.h>
27 #include <tevent.h>
28 #include <tdb.h>
29
30 #include "lib/util/dlinklist.h"
31 #include "lib/util/tevent_unix.h"
32 #include "lib/util/debug.h"
33 #include "lib/util/samba_util.h"
34 #include "lib/async_req/async_sock.h"
35
36 #include "protocol/protocol.h"
37 #include "protocol/protocol_api.h"
38 #include "protocol/protocol_util.h"
39 #include "protocol/protocol_private.h"
40
41 #include "common/comm.h"
42 #include "common/logging.h"
43 #include "common/tunable.h"
44 #include "common/srvid.h"
45 #include "common/system.h"
46
47 #include "ipalloc_read_known_ips.h"
48
49
50 #define CTDB_PORT 4379
51
52 /* A fake flag that is only supported by some functions */
53 #define NODE_FLAGS_FAKE_TIMEOUT 0x80000000
54
55 struct node {
56         ctdb_sock_addr addr;
57         uint32_t pnn;
58         uint32_t flags;
59         uint32_t capabilities;
60         bool recovery_disabled;
61         void *recovery_substate;
62 };
63
64 struct node_map {
65         uint32_t num_nodes;
66         struct node *node;
67         uint32_t pnn;
68         uint32_t recmaster;
69 };
70
71 struct interface {
72         const char *name;
73         bool link_up;
74         uint32_t references;
75 };
76
77 struct interface_map {
78         int num;
79         struct interface *iface;
80 };
81
82 struct vnn_map {
83         uint32_t recmode;
84         uint32_t generation;
85         uint32_t size;
86         uint32_t *map;
87 };
88
89 struct database {
90         struct database *prev, *next;
91         const char *name;
92         const char *path;
93         struct tdb_context *tdb;
94         uint32_t id;
95         uint8_t flags;
96         uint64_t seq_num;
97 };
98
99 struct database_map {
100         struct database *db;
101         const char *dbdir;
102 };
103
104 struct fake_control_failure {
105         struct fake_control_failure  *prev, *next;
106         enum ctdb_controls opcode;
107         uint32_t pnn;
108         const char *error;
109         const char *comment;
110 };
111
112 struct ctdb_client {
113         struct ctdb_client *prev, *next;
114         struct ctdbd_context *ctdb;
115         pid_t pid;
116         void *state;
117 };
118
119 struct ctdbd_context {
120         struct node_map *node_map;
121         struct interface_map *iface_map;
122         struct vnn_map *vnn_map;
123         struct database_map *db_map;
124         struct srvid_context *srv;
125         int num_clients;
126         struct timeval start_time;
127         struct timeval recovery_start_time;
128         struct timeval recovery_end_time;
129         bool takeover_disabled;
130         int log_level;
131         enum ctdb_runstate runstate;
132         struct ctdb_tunable_list tun_list;
133         char *reclock;
134         struct ctdb_public_ip_list *known_ips;
135         struct fake_control_failure *control_failures;
136         struct ctdb_client *client_list;
137 };
138
139 /*
140  * Parse routines
141  */
142
143 static struct node_map *nodemap_init(TALLOC_CTX *mem_ctx)
144 {
145         struct node_map *node_map;
146
147         node_map = talloc_zero(mem_ctx, struct node_map);
148         if (node_map == NULL) {
149                 return NULL;
150         }
151
152         node_map->pnn = CTDB_UNKNOWN_PNN;
153         node_map->recmaster = CTDB_UNKNOWN_PNN;
154
155         return node_map;
156 }
157
158 /* Read a nodemap from stdin.  Each line looks like:
159  *  <PNN> <FLAGS> [RECMASTER] [CURRENT] [CAPABILITIES]
160  * EOF or a blank line terminates input.
161  *
162  * By default, capabilities for each node are
163  * CTDB_CAP_RECMASTER|CTDB_CAP_LMASTER.  These 2
164  * capabilities can be faked off by adding, for example,
165  * -CTDB_CAP_RECMASTER.
166  */
167
168 static bool nodemap_parse(struct node_map *node_map)
169 {
170         char line[1024];
171
172         while ((fgets(line, sizeof(line), stdin) != NULL)) {
173                 uint32_t pnn, flags, capabilities;
174                 char *tok, *t;
175                 char *ip;
176                 ctdb_sock_addr saddr;
177                 struct node *node;
178                 int ret;
179
180                 if (line[0] == '\n') {
181                         break;
182                 }
183
184                 /* Get rid of pesky newline */
185                 if ((t = strchr(line, '\n')) != NULL) {
186                         *t = '\0';
187                 }
188
189                 /* Get PNN */
190                 tok = strtok(line, " \t");
191                 if (tok == NULL) {
192                         fprintf(stderr, "bad line (%s) - missing PNN\n", line);
193                         continue;
194                 }
195                 pnn = (uint32_t)strtoul(tok, NULL, 0);
196
197                 /* Get IP */
198                 tok = strtok(NULL, " \t");
199                 if (tok == NULL) {
200                         fprintf(stderr, "bad line (%s) - missing IP\n", line);
201                         continue;
202                 }
203                 ret = ctdb_sock_addr_from_string(tok, &saddr, false);
204                 if (ret != 0) {
205                         fprintf(stderr, "bad line (%s) - invalid IP\n", line);
206                         continue;
207                 }
208                 ctdb_sock_addr_set_port(&saddr, CTDB_PORT);
209                 ip = talloc_strdup(node_map, tok);
210                 if (ip == NULL) {
211                         goto fail;
212                 }
213
214                 /* Get flags */
215                 tok = strtok(NULL, " \t");
216                 if (tok == NULL) {
217                         fprintf(stderr, "bad line (%s) - missing flags\n",
218                                 line);
219                         continue;
220                 }
221                 flags = (uint32_t)strtoul(tok, NULL, 0);
222                 /* Handle deleted nodes */
223                 if (flags & NODE_FLAGS_DELETED) {
224                         talloc_free(ip);
225                         ip = talloc_strdup(node_map, "0.0.0.0");
226                         if (ip == NULL) {
227                                 goto fail;
228                         }
229                 }
230                 capabilities = CTDB_CAP_RECMASTER|CTDB_CAP_LMASTER;
231
232                 tok = strtok(NULL, " \t");
233                 while (tok != NULL) {
234                         if (strcmp(tok, "CURRENT") == 0) {
235                                 node_map->pnn = pnn;
236                         } else if (strcmp(tok, "RECMASTER") == 0) {
237                                 node_map->recmaster = pnn;
238                         } else if (strcmp(tok, "-CTDB_CAP_RECMASTER") == 0) {
239                                 capabilities &= ~CTDB_CAP_RECMASTER;
240                         } else if (strcmp(tok, "-CTDB_CAP_LMASTER") == 0) {
241                                 capabilities &= ~CTDB_CAP_LMASTER;
242                         } else if (strcmp(tok, "TIMEOUT") == 0) {
243                                 /* This can be done with just a flag
244                                  * value but it is probably clearer
245                                  * and less error-prone to fake this
246                                  * with an explicit token */
247                                 flags |= NODE_FLAGS_FAKE_TIMEOUT;
248                         }
249                         tok = strtok(NULL, " \t");
250                 }
251
252                 node_map->node = talloc_realloc(node_map, node_map->node,
253                                                 struct node,
254                                                 node_map->num_nodes + 1);
255                 if (node_map->node == NULL) {
256                         goto fail;
257                 }
258                 node = &node_map->node[node_map->num_nodes];
259
260                 ret = ctdb_sock_addr_from_string(ip, &node->addr, false);
261                 if (ret != 0) {
262                         fprintf(stderr, "bad line (%s) - invalid IP\n", line);
263                         continue;
264                 }
265                 ctdb_sock_addr_set_port(&node->addr, CTDB_PORT);
266                 node->pnn = pnn;
267                 node->flags = flags;
268                 node->capabilities = capabilities;
269                 node->recovery_disabled = false;
270                 node->recovery_substate = NULL;
271
272                 node_map->num_nodes += 1;
273         }
274
275         if (node_map->num_nodes == 0) {
276                 goto fail;
277         }
278
279         DEBUG(DEBUG_INFO, ("Parsing nodemap done\n"));
280         return true;
281
282 fail:
283         DEBUG(DEBUG_INFO, ("Parsing nodemap failed\n"));
284         return false;
285
286 }
287
288 /* Append a node to a node map with given address and flags */
289 static bool node_map_add(struct ctdb_node_map *nodemap,
290                          const char *nstr, uint32_t flags)
291 {
292         ctdb_sock_addr addr;
293         uint32_t num;
294         struct ctdb_node_and_flags *n;
295         int ret;
296
297         ret = ctdb_sock_addr_from_string(nstr, &addr, false);
298         if (ret != 0) {
299                 fprintf(stderr, "Invalid IP address %s\n", nstr);
300                 return false;
301         }
302         ctdb_sock_addr_set_port(&addr, CTDB_PORT);
303
304         num = nodemap->num;
305         nodemap->node = talloc_realloc(nodemap, nodemap->node,
306                                        struct ctdb_node_and_flags, num+1);
307         if (nodemap->node == NULL) {
308                 return false;
309         }
310
311         n = &nodemap->node[num];
312         n->addr = addr;
313         n->pnn = num;
314         n->flags = flags;
315
316         nodemap->num = num+1;
317         return true;
318 }
319
320 /* Read a nodes file into a node map */
321 static struct ctdb_node_map *ctdb_read_nodes_file(TALLOC_CTX *mem_ctx,
322                                                   const char *nlist)
323 {
324         char **lines;
325         int nlines;
326         int i;
327         struct ctdb_node_map *nodemap;
328
329         nodemap = talloc_zero(mem_ctx, struct ctdb_node_map);
330         if (nodemap == NULL) {
331                 return NULL;
332         }
333
334         lines = file_lines_load(nlist, &nlines, 0, mem_ctx);
335         if (lines == NULL) {
336                 return NULL;
337         }
338
339         while (nlines > 0 && strcmp(lines[nlines-1], "") == 0) {
340                 nlines--;
341         }
342
343         for (i=0; i<nlines; i++) {
344                 char *node;
345                 uint32_t flags;
346                 size_t len;
347
348                 node = lines[i];
349                 /* strip leading spaces */
350                 while((*node == ' ') || (*node == '\t')) {
351                         node++;
352                 }
353
354                 len = strlen(node);
355
356                 /* strip trailing spaces */
357                 while ((len > 1) &&
358                        ((node[len-1] == ' ') || (node[len-1] == '\t')))
359                 {
360                         node[len-1] = '\0';
361                         len--;
362                 }
363
364                 if (len == 0) {
365                         continue;
366                 }
367                 if (*node == '#') {
368                         /* A "deleted" node is a node that is
369                            commented out in the nodes file.  This is
370                            used instead of removing a line, which
371                            would cause subsequent nodes to change
372                            their PNN. */
373                         flags = NODE_FLAGS_DELETED;
374                         node = discard_const("0.0.0.0");
375                 } else {
376                         flags = 0;
377                 }
378                 if (! node_map_add(nodemap, node, flags)) {
379                         talloc_free(lines);
380                         TALLOC_FREE(nodemap);
381                         return NULL;
382                 }
383         }
384
385         talloc_free(lines);
386         return nodemap;
387 }
388
389 static struct ctdb_node_map *read_nodes_file(TALLOC_CTX *mem_ctx,
390                                              uint32_t pnn)
391 {
392         struct ctdb_node_map *nodemap;
393         char nodes_list[PATH_MAX];
394         const char *ctdb_base;
395         int num;
396
397         ctdb_base = getenv("CTDB_BASE");
398         if (ctdb_base == NULL) {
399                 D_ERR("CTDB_BASE is not set\n");
400                 return NULL;
401         }
402
403         /* read optional node-specific nodes file */
404         num = snprintf(nodes_list, sizeof(nodes_list),
405                        "%s/nodes.%d", ctdb_base, pnn);
406         if (num == sizeof(nodes_list)) {
407                 D_ERR("nodes file path too long\n");
408                 return NULL;
409         }
410         nodemap = ctdb_read_nodes_file(mem_ctx, nodes_list);
411         if (nodemap != NULL) {
412                 /* Fake a load failure for an empty nodemap */
413                 if (nodemap->num == 0) {
414                         talloc_free(nodemap);
415
416                         D_ERR("Failed to read nodes file \"%s\"\n", nodes_list);
417                         return NULL;
418                 }
419
420                 return nodemap;
421         }
422
423         /* read normal nodes file */
424         num = snprintf(nodes_list, sizeof(nodes_list), "%s/nodes", ctdb_base);
425         if (num == sizeof(nodes_list)) {
426                 D_ERR("nodes file path too long\n");
427                 return NULL;
428         }
429         nodemap = ctdb_read_nodes_file(mem_ctx, nodes_list);
430         if (nodemap != NULL) {
431                 return nodemap;
432         }
433
434         DBG_ERR("Failed to read nodes file \"%s\"\n", nodes_list);
435         return NULL;
436 }
437
438 static struct interface_map *interfaces_init(TALLOC_CTX *mem_ctx)
439 {
440         struct interface_map *iface_map;
441
442         iface_map = talloc_zero(mem_ctx, struct interface_map);
443         if (iface_map == NULL) {
444                 return NULL;
445         }
446
447         return iface_map;
448 }
449
450 /* Read interfaces information.  Same format as "ctdb ifaces -Y"
451  * output:
452  *   :Name:LinkStatus:References:
453  *   :eth2:1:4294967294
454  *   :eth1:1:4294967292
455  */
456
457 static bool interfaces_parse(struct interface_map *iface_map)
458 {
459         char line[1024];
460
461         while ((fgets(line, sizeof(line), stdin) != NULL)) {
462                 uint16_t link_state;
463                 uint32_t references;
464                 char *tok, *t, *name;
465                 struct interface *iface;
466
467                 if (line[0] == '\n') {
468                         break;
469                 }
470
471                 /* Get rid of pesky newline */
472                 if ((t = strchr(line, '\n')) != NULL) {
473                         *t = '\0';
474                 }
475
476                 if (strcmp(line, ":Name:LinkStatus:References:") == 0) {
477                         continue;
478                 }
479
480                 /* Leading colon... */
481                 // tok = strtok(line, ":");
482
483                 /* name */
484                 tok = strtok(line, ":");
485                 if (tok == NULL) {
486                         fprintf(stderr, "bad line (%s) - missing name\n", line);
487                         continue;
488                 }
489                 name = tok;
490
491                 /* link_state */
492                 tok = strtok(NULL, ":");
493                 if (tok == NULL) {
494                         fprintf(stderr, "bad line (%s) - missing link state\n",
495                                 line);
496                         continue;
497                 }
498                 link_state = (uint16_t)strtoul(tok, NULL, 0);
499
500                 /* references... */
501                 tok = strtok(NULL, ":");
502                 if (tok == NULL) {
503                         fprintf(stderr, "bad line (%s) - missing references\n",
504                                 line);
505                         continue;
506                 }
507                 references = (uint32_t)strtoul(tok, NULL, 0);
508
509                 iface_map->iface = talloc_realloc(iface_map, iface_map->iface,
510                                                   struct interface,
511                                                   iface_map->num + 1);
512                 if (iface_map->iface == NULL) {
513                         goto fail;
514                 }
515
516                 iface = &iface_map->iface[iface_map->num];
517
518                 iface->name = talloc_strdup(iface_map, name);
519                 if (iface->name == NULL) {
520                         goto fail;
521                 }
522                 iface->link_up = link_state;
523                 iface->references = references;
524
525                 iface_map->num += 1;
526         }
527
528         if (iface_map->num == 0) {
529                 goto fail;
530         }
531
532         DEBUG(DEBUG_INFO, ("Parsing interfaces done\n"));
533         return true;
534
535 fail:
536         fprintf(stderr, "Parsing interfaces failed\n");
537         return false;
538 }
539
540 static struct vnn_map *vnnmap_init(TALLOC_CTX *mem_ctx)
541 {
542         struct vnn_map *vnn_map;
543
544         vnn_map = talloc_zero(mem_ctx, struct vnn_map);
545         if (vnn_map == NULL) {
546                 fprintf(stderr, "Memory error\n");
547                 return NULL;
548         }
549         vnn_map->recmode = CTDB_RECOVERY_ACTIVE;
550         vnn_map->generation = INVALID_GENERATION;
551
552         return vnn_map;
553 }
554
555 /* Read vnn map.
556  * output:
557  *   <GENERATION>
558  *   <LMASTER0>
559  *   <LMASTER1>
560  *   ...
561  */
562
563 static bool vnnmap_parse(struct vnn_map *vnn_map)
564 {
565         char line[1024];
566
567         while (fgets(line, sizeof(line), stdin) != NULL) {
568                 uint32_t n;
569                 char *t;
570
571                 if (line[0] == '\n') {
572                         break;
573                 }
574
575                 /* Get rid of pesky newline */
576                 if ((t = strchr(line, '\n')) != NULL) {
577                         *t = '\0';
578                 }
579
580                 n = (uint32_t) strtol(line, NULL, 0);
581
582                 /* generation */
583                 if (vnn_map->generation == INVALID_GENERATION) {
584                         vnn_map->generation = n;
585                         continue;
586                 }
587
588                 vnn_map->map = talloc_realloc(vnn_map, vnn_map->map, uint32_t,
589                                               vnn_map->size + 1);
590                 if (vnn_map->map == NULL) {
591                         fprintf(stderr, "Memory error\n");
592                         goto fail;
593                 }
594
595                 vnn_map->map[vnn_map->size] = n;
596                 vnn_map->size += 1;
597         }
598
599         if (vnn_map->size == 0) {
600                 goto fail;
601         }
602
603         DEBUG(DEBUG_INFO, ("Parsing vnnmap done\n"));
604         return true;
605
606 fail:
607         fprintf(stderr, "Parsing vnnmap failed\n");
608         return false;
609 }
610
611 static bool reclock_parse(struct ctdbd_context *ctdb)
612 {
613         char line[1024];
614         char *t;
615
616         if (fgets(line, sizeof(line), stdin) == NULL) {
617                 goto fail;
618         }
619
620         if (line[0] == '\n') {
621                 goto fail;
622         }
623
624         /* Get rid of pesky newline */
625         if ((t = strchr(line, '\n')) != NULL) {
626                 *t = '\0';
627         }
628
629         ctdb->reclock = talloc_strdup(ctdb, line);
630         if (ctdb->reclock == NULL) {
631                 goto fail;
632         }
633
634         /* Swallow possible blank line following section.  Picky
635          * compiler settings don't allow the return value to be
636          * ignored, so make the compiler happy.
637          */
638         if (fgets(line, sizeof(line), stdin) == NULL) {
639                 ;
640         }
641         DEBUG(DEBUG_INFO, ("Parsing reclock done\n"));
642         return true;
643
644 fail:
645         fprintf(stderr, "Parsing reclock failed\n");
646         return false;
647 }
648
649 static struct database_map *dbmap_init(TALLOC_CTX *mem_ctx,
650                                        const char *dbdir)
651 {
652         struct database_map *db_map;
653
654         db_map = talloc_zero(mem_ctx, struct database_map);
655         if (db_map == NULL) {
656                 return NULL;
657         }
658
659         db_map->dbdir = talloc_strdup(db_map, dbdir);
660         if (db_map->dbdir == NULL) {
661                 talloc_free(db_map);
662                 return NULL;
663         }
664
665         return db_map;
666 }
667
668 /* Read a database map from stdin.  Each line looks like:
669  *  <ID> <NAME> [FLAGS] [SEQ_NUM]
670  * EOF or a blank line terminates input.
671  *
672  * By default, flags and seq_num are 0
673  */
674
675 static bool dbmap_parse(struct database_map *db_map)
676 {
677         char line[1024];
678
679         while ((fgets(line, sizeof(line), stdin) != NULL)) {
680                 uint32_t id;
681                 uint8_t flags = 0;
682                 uint32_t seq_num = 0;
683                 char *tok, *t;
684                 char *name;
685                 struct database *db;
686
687                 if (line[0] == '\n') {
688                         break;
689                 }
690
691                 /* Get rid of pesky newline */
692                 if ((t = strchr(line, '\n')) != NULL) {
693                         *t = '\0';
694                 }
695
696                 /* Get ID */
697                 tok = strtok(line, " \t");
698                 if (tok == NULL) {
699                         fprintf(stderr, "bad line (%s) - missing ID\n", line);
700                         continue;
701                 }
702                 id = (uint32_t)strtoul(tok, NULL, 0);
703
704                 /* Get NAME */
705                 tok = strtok(NULL, " \t");
706                 if (tok == NULL) {
707                         fprintf(stderr, "bad line (%s) - missing NAME\n", line);
708                         continue;
709                 }
710                 name = talloc_strdup(db_map, tok);
711                 if (name == NULL) {
712                         goto fail;
713                 }
714
715                 /* Get flags */
716                 tok = strtok(NULL, " \t");
717                 while (tok != NULL) {
718                         if (strcmp(tok, "PERSISTENT") == 0) {
719                                 flags |= CTDB_DB_FLAGS_PERSISTENT;
720                         } else if (strcmp(tok, "STICKY") == 0) {
721                                 flags |= CTDB_DB_FLAGS_STICKY;
722                         } else if (strcmp(tok, "READONLY") == 0) {
723                                 flags |= CTDB_DB_FLAGS_READONLY;
724                         } else if (strcmp(tok, "REPLICATED") == 0) {
725                                 flags |= CTDB_DB_FLAGS_REPLICATED;
726                         } else if (tok[0] >= '0'&& tok[0] <= '9') {
727                                 uint8_t nv = CTDB_DB_FLAGS_PERSISTENT |
728                                              CTDB_DB_FLAGS_REPLICATED;
729
730                                 if ((flags & nv) == 0) {
731                                         fprintf(stderr,
732                                                 "seq_num for volatile db\n");
733                                         goto fail;
734                                 }
735                                 seq_num = (uint64_t)strtoull(tok, NULL, 0);
736                         }
737
738                         tok = strtok(NULL, " \t");
739                 }
740
741                 db = talloc_zero(db_map, struct database);
742                 if (db == NULL) {
743                         goto fail;
744                 }
745
746                 db->id = id;
747                 db->name = talloc_steal(db, name);
748                 db->path = talloc_asprintf(db, "%s/%s", db_map->dbdir, name);
749                 if (db->path == NULL) {
750                         talloc_free(db);
751                         goto fail;
752                 }
753                 db->flags = flags;
754                 db->seq_num = seq_num;
755
756                 DLIST_ADD_END(db_map->db, db);
757         }
758
759         if (db_map->db == NULL) {
760                 goto fail;
761         }
762
763         DEBUG(DEBUG_INFO, ("Parsing dbmap done\n"));
764         return true;
765
766 fail:
767         DEBUG(DEBUG_INFO, ("Parsing dbmap failed\n"));
768         return false;
769
770 }
771
772 static struct database *database_find(struct database_map *db_map,
773                                       uint32_t db_id)
774 {
775         struct database *db;
776
777         for (db = db_map->db; db != NULL; db = db->next) {
778                 if (db->id == db_id) {
779                         return db;
780                 }
781         }
782
783         return NULL;
784 }
785
786 static int database_count(struct database_map *db_map)
787 {
788         struct database *db;
789         int count = 0;
790
791         for (db = db_map->db; db != NULL; db = db->next) {
792                 count += 1;
793         }
794
795         return count;
796 }
797
798 static int database_flags(uint8_t db_flags)
799 {
800         int tdb_flags = 0;
801
802         if (db_flags & CTDB_DB_FLAGS_PERSISTENT) {
803                 tdb_flags = TDB_DEFAULT;
804         } else {
805                 /* volatile and replicated use the same flags */
806                 tdb_flags = TDB_NOSYNC |
807                             TDB_CLEAR_IF_FIRST |
808                             TDB_INCOMPATIBLE_HASH;
809         }
810
811         tdb_flags |= TDB_DISALLOW_NESTING;
812
813         return tdb_flags;
814 }
815
816 static struct database *database_new(struct database_map *db_map,
817                                      const char *name, uint8_t flags)
818 {
819         struct database *db;
820         TDB_DATA key;
821         int tdb_flags;
822
823         db = talloc_zero(db_map, struct database);
824         if (db == NULL) {
825                 return NULL;
826         }
827
828         db->name = talloc_strdup(db, name);
829         if (db->name == NULL) {
830                 goto fail;
831         }
832
833         db->path = talloc_asprintf(db, "%s/%s", db_map->dbdir, name);
834         if (db->path == NULL) {
835                 goto fail;
836         }
837
838         key.dsize = strlen(db->name) + 1;
839         key.dptr = discard_const(db->name);
840
841         db->id = tdb_jenkins_hash(&key);
842         db->flags = flags;
843
844         tdb_flags = database_flags(flags);
845
846         db->tdb = tdb_open(db->path, 8192, tdb_flags, O_CREAT|O_RDWR, 0644);
847         if (db->tdb == NULL) {
848                 DBG_ERR("tdb_open\n");
849                 goto fail;
850         }
851
852         DLIST_ADD_END(db_map->db, db);
853         return db;
854
855 fail:
856         DBG_ERR("Memory error\n");
857         talloc_free(db);
858         return NULL;
859
860 }
861
862 static int ltdb_store(struct database *db, TDB_DATA key,
863                       struct ctdb_ltdb_header *header, TDB_DATA data)
864 {
865         int ret;
866         bool db_volatile = true;
867         bool keep = false;
868
869         if (db->tdb == NULL) {
870                 return EINVAL;
871         }
872
873         if ((db->flags & CTDB_DB_FLAGS_PERSISTENT) ||
874             (db->flags & CTDB_DB_FLAGS_REPLICATED)) {
875                 db_volatile = false;
876         }
877
878         if (data.dsize > 0) {
879                 keep = true;
880         } else {
881                 if (db_volatile && header->rsn == 0) {
882                         keep = true;
883                 }
884         }
885
886         if (keep) {
887                 TDB_DATA rec[2];
888
889                 rec[0].dsize = ctdb_ltdb_header_len(header);
890                 rec[0].dptr = (uint8_t *)header;
891
892                 rec[1].dsize = data.dsize;
893                 rec[1].dptr = data.dptr;
894
895                 ret = tdb_storev(db->tdb, key, rec, 2, TDB_REPLACE);
896         } else {
897                 if (header->rsn > 0) {
898                         ret = tdb_delete(db->tdb, key);
899                 } else {
900                         ret = 0;
901                 }
902         }
903
904         return ret;
905 }
906
907 static int ltdb_fetch(struct database *db, TDB_DATA key,
908                       struct ctdb_ltdb_header *header,
909                       TALLOC_CTX *mem_ctx, TDB_DATA *data)
910 {
911         TDB_DATA rec;
912         size_t np;
913         int ret;
914
915         if (db->tdb == NULL) {
916                 return EINVAL;
917         }
918
919         rec = tdb_fetch(db->tdb, key);
920         ret = ctdb_ltdb_header_pull(rec.dptr, rec.dsize, header, &np);
921         if (ret != 0) {
922                 if (rec.dptr != NULL) {
923                         free(rec.dptr);
924                 }
925
926                 *header = (struct ctdb_ltdb_header) {
927                         .rsn = 0,
928                         .dmaster = 0,
929                         .flags = 0,
930                 };
931
932                 ret = ltdb_store(db, key, header, tdb_null);
933                 if (ret != 0) {
934                         return ret;
935                 }
936
937                 *data = tdb_null;
938                 return 0;
939         }
940
941         data->dsize = rec.dsize - ctdb_ltdb_header_len(header);
942         data->dptr = talloc_memdup(mem_ctx,
943                                    rec.dptr + ctdb_ltdb_header_len(header),
944                                    data->dsize);
945
946         free(rec.dptr);
947
948         if (data->dptr == NULL) {
949                 return ENOMEM;
950         }
951
952         return 0;
953 }
954
955 static int database_seqnum(struct database *db, uint64_t *seqnum)
956 {
957         const char *keyname = CTDB_DB_SEQNUM_KEY;
958         TDB_DATA key, data;
959         struct ctdb_ltdb_header header;
960         size_t np;
961         int ret;
962
963         if (db->tdb == NULL) {
964                 *seqnum = db->seq_num;
965                 return 0;
966         }
967
968         key.dptr = discard_const(keyname);
969         key.dsize = strlen(keyname) + 1;
970
971         ret = ltdb_fetch(db, key, &header, db, &data);
972         if (ret != 0) {
973                 return ret;
974         }
975
976         if (data.dsize == 0) {
977                 *seqnum = 0;
978                 return 0;
979         }
980
981         ret = ctdb_uint64_pull(data.dptr, data.dsize, seqnum, &np);
982         talloc_free(data.dptr);
983         if (ret != 0) {
984                 *seqnum = 0;
985         }
986
987         return ret;
988 }
989
990 static int ltdb_transaction_update(uint32_t reqid,
991                                    struct ctdb_ltdb_header *no_header,
992                                    TDB_DATA key, TDB_DATA data,
993                                    void *private_data)
994 {
995         struct database *db = (struct database *)private_data;
996         TALLOC_CTX *tmp_ctx = talloc_new(db);
997         struct ctdb_ltdb_header header = { 0 }, oldheader;
998         TDB_DATA olddata;
999         int ret;
1000
1001         if (db->tdb == NULL) {
1002                 return EINVAL;
1003         }
1004
1005         ret = ctdb_ltdb_header_extract(&data, &header);
1006         if (ret != 0) {
1007                 return ret;
1008         }
1009
1010         ret = ltdb_fetch(db, key, &oldheader, tmp_ctx, &olddata);
1011         if (ret != 0) {
1012                 return ret;
1013         }
1014
1015         if (olddata.dsize > 0) {
1016                 if (oldheader.rsn > header.rsn ||
1017                     (oldheader.rsn == header.rsn &&
1018                      olddata.dsize != data.dsize)) {
1019                         return -1;
1020                 }
1021         }
1022
1023         talloc_free(tmp_ctx);
1024
1025         ret = ltdb_store(db, key, &header, data);
1026         return ret;
1027 }
1028
1029 static int ltdb_transaction(struct database *db,
1030                             struct ctdb_rec_buffer *recbuf)
1031 {
1032         int ret;
1033
1034         if (db->tdb == NULL) {
1035                 return EINVAL;
1036         }
1037
1038         ret = tdb_transaction_start(db->tdb);
1039         if (ret == -1) {
1040                 return ret;
1041         }
1042
1043         ret = ctdb_rec_buffer_traverse(recbuf, ltdb_transaction_update, db);
1044         if (ret != 0) {
1045                 tdb_transaction_cancel(db->tdb);
1046         }
1047
1048         ret = tdb_transaction_commit(db->tdb);
1049         return ret;
1050 }
1051
1052 static bool public_ips_parse(struct ctdbd_context *ctdb,
1053                              uint32_t numnodes)
1054 {
1055         bool status;
1056
1057         if (numnodes == 0) {
1058                 D_ERR("Must initialise nodemap before public IPs\n");
1059                 return false;
1060         }
1061
1062         ctdb->known_ips = ipalloc_read_known_ips(ctdb, numnodes, false);
1063
1064         status = (ctdb->known_ips != NULL && ctdb->known_ips->num != 0);
1065
1066         if (status) {
1067                 D_INFO("Parsing public IPs done\n");
1068         } else {
1069                 D_INFO("Parsing public IPs failed\n");
1070         }
1071
1072         return status;
1073 }
1074
1075 /* Read information about controls to fail.  Format is:
1076  *   <opcode> <pnn> {ERROR|TIMEOUT} <comment>
1077  */
1078 static bool control_failures_parse(struct ctdbd_context *ctdb)
1079 {
1080         char line[1024];
1081
1082         while ((fgets(line, sizeof(line), stdin) != NULL)) {
1083                 char *tok, *t;
1084                 enum ctdb_controls opcode;
1085                 uint32_t pnn;
1086                 const char *error;
1087                 const char *comment;
1088                 struct fake_control_failure *failure = NULL;
1089
1090                 if (line[0] == '\n') {
1091                         break;
1092                 }
1093
1094                 /* Get rid of pesky newline */
1095                 if ((t = strchr(line, '\n')) != NULL) {
1096                         *t = '\0';
1097                 }
1098
1099                 /* Get opcode */
1100                 tok = strtok(line, " \t");
1101                 if (tok == NULL) {
1102                         D_ERR("bad line (%s) - missing opcode\n", line);
1103                         continue;
1104                 }
1105                 opcode = (enum ctdb_controls)strtoul(tok, NULL, 0);
1106
1107                 /* Get PNN */
1108                 tok = strtok(NULL, " \t");
1109                 if (tok == NULL) {
1110                         D_ERR("bad line (%s) - missing PNN\n", line);
1111                         continue;
1112                 }
1113                 pnn = (uint32_t)strtoul(tok, NULL, 0);
1114
1115                 /* Get error */
1116                 tok = strtok(NULL, " \t");
1117                 if (tok == NULL) {
1118                         D_ERR("bad line (%s) - missing errno\n", line);
1119                         continue;
1120                 }
1121                 error = talloc_strdup(ctdb, tok);
1122                 if (error == NULL) {
1123                         goto fail;
1124                 }
1125                 if (strcmp(error, "ERROR") != 0 &&
1126                     strcmp(error, "TIMEOUT") != 0) {
1127                         D_ERR("bad line (%s) "
1128                               "- error must be \"ERROR\" or \"TIMEOUT\"\n",
1129                               line);
1130                         goto fail;
1131                 }
1132
1133                 /* Get comment */
1134                 tok = strtok(NULL, "\n"); /* rest of line */
1135                 if (tok == NULL) {
1136                         D_ERR("bad line (%s) - missing comment\n", line);
1137                         continue;
1138                 }
1139                 comment = talloc_strdup(ctdb, tok);
1140                 if (comment == NULL) {
1141                         goto fail;
1142                 }
1143
1144                 failure = talloc_zero(ctdb, struct fake_control_failure);
1145                 if (failure == NULL) {
1146                         goto fail;
1147                 }
1148
1149                 failure->opcode = opcode;
1150                 failure->pnn = pnn;
1151                 failure->error = error;
1152                 failure->comment = comment;
1153
1154                 DLIST_ADD(ctdb->control_failures, failure);
1155         }
1156
1157         if (ctdb->control_failures == NULL) {
1158                 goto fail;
1159         }
1160
1161         D_INFO("Parsing fake control failures done\n");
1162         return true;
1163
1164 fail:
1165         D_INFO("Parsing fake control failures failed\n");
1166         return false;
1167 }
1168
1169 static bool runstate_parse(struct ctdbd_context *ctdb)
1170 {
1171         char line[1024];
1172         char *t;
1173
1174         if (fgets(line, sizeof(line), stdin) == NULL) {
1175                 goto fail;
1176         }
1177
1178         if (line[0] == '\n') {
1179                 goto fail;
1180         }
1181
1182         /* Get rid of pesky newline */
1183         if ((t = strchr(line, '\n')) != NULL) {
1184                 *t = '\0';
1185         }
1186
1187         ctdb->runstate = ctdb_runstate_from_string(line);
1188         if (ctdb->runstate == CTDB_RUNSTATE_UNKNOWN) {
1189                 goto fail;
1190         }
1191
1192         /* Swallow possible blank line following section.  Picky
1193          * compiler settings don't allow the return value to be
1194          * ignored, so make the compiler happy.
1195          */
1196         if (fgets(line, sizeof(line), stdin) == NULL) {
1197                 ;
1198         }
1199         D_INFO("Parsing runstate done\n");
1200         return true;
1201
1202 fail:
1203         D_ERR("Parsing runstate failed\n");
1204         return false;
1205 }
1206
1207 /*
1208  * Manage clients
1209  */
1210
1211 static int ctdb_client_destructor(struct ctdb_client *client)
1212 {
1213         DLIST_REMOVE(client->ctdb->client_list, client);
1214         return 0;
1215 }
1216
1217 static int client_add(struct ctdbd_context *ctdb, pid_t client_pid,
1218                       void *client_state)
1219 {
1220         struct ctdb_client *client;
1221
1222         client = talloc_zero(client_state, struct ctdb_client);
1223         if (client == NULL) {
1224                 return ENOMEM;
1225         }
1226
1227         client->ctdb = ctdb;
1228         client->pid = client_pid;
1229         client->state = client_state;
1230
1231         DLIST_ADD(ctdb->client_list, client);
1232         talloc_set_destructor(client, ctdb_client_destructor);
1233         return 0;
1234 }
1235
1236 static void *client_find(struct ctdbd_context *ctdb, pid_t client_pid)
1237 {
1238         struct ctdb_client *client;
1239
1240         for (client=ctdb->client_list; client != NULL; client=client->next) {
1241                 if (client->pid == client_pid) {
1242                         return client->state;
1243                 }
1244         }
1245
1246         return NULL;
1247 }
1248
1249 /*
1250  * CTDB context setup
1251  */
1252
1253 static uint32_t new_generation(uint32_t old_generation)
1254 {
1255         uint32_t generation;
1256
1257         while (1) {
1258                 generation = random();
1259                 if (generation != INVALID_GENERATION &&
1260                     generation != old_generation) {
1261                         break;
1262                 }
1263         }
1264
1265         return generation;
1266 }
1267
1268 static struct ctdbd_context *ctdbd_setup(TALLOC_CTX *mem_ctx,
1269                                          const char *dbdir)
1270 {
1271         struct ctdbd_context *ctdb;
1272         char line[1024];
1273         bool status;
1274         int ret;
1275
1276         ctdb = talloc_zero(mem_ctx, struct ctdbd_context);
1277         if (ctdb == NULL) {
1278                 return NULL;
1279         }
1280
1281         ctdb->node_map = nodemap_init(ctdb);
1282         if (ctdb->node_map == NULL) {
1283                 goto fail;
1284         }
1285
1286         ctdb->iface_map = interfaces_init(ctdb);
1287         if (ctdb->iface_map == NULL) {
1288                 goto fail;
1289         }
1290
1291         ctdb->vnn_map = vnnmap_init(ctdb);
1292         if (ctdb->vnn_map == NULL) {
1293                 goto fail;
1294         }
1295
1296         ctdb->db_map = dbmap_init(ctdb, dbdir);
1297         if (ctdb->db_map == NULL) {
1298                 goto fail;
1299         }
1300
1301         ret = srvid_init(ctdb, &ctdb->srv);
1302         if (ret != 0) {
1303                 goto fail;
1304         }
1305
1306         ctdb->runstate = CTDB_RUNSTATE_RUNNING;
1307
1308         while (fgets(line, sizeof(line), stdin) != NULL) {
1309                 char *t;
1310
1311                 if ((t = strchr(line, '\n')) != NULL) {
1312                         *t = '\0';
1313                 }
1314
1315                 if (strcmp(line, "NODEMAP") == 0) {
1316                         status = nodemap_parse(ctdb->node_map);
1317                 } else if (strcmp(line, "IFACES") == 0) {
1318                         status = interfaces_parse(ctdb->iface_map);
1319                 } else if (strcmp(line, "VNNMAP") == 0) {
1320                         status = vnnmap_parse(ctdb->vnn_map);
1321                 } else if (strcmp(line, "DBMAP") == 0) {
1322                         status = dbmap_parse(ctdb->db_map);
1323                 } else if (strcmp(line, "PUBLICIPS") == 0) {
1324                         status = public_ips_parse(ctdb,
1325                                                   ctdb->node_map->num_nodes);
1326                 } else if (strcmp(line, "RECLOCK") == 0) {
1327                         status = reclock_parse(ctdb);
1328                 } else if (strcmp(line, "CONTROLFAILS") == 0) {
1329                         status = control_failures_parse(ctdb);
1330                 } else if (strcmp(line, "RUNSTATE") == 0) {
1331                         status = runstate_parse(ctdb);
1332                 } else {
1333                         fprintf(stderr, "Unknown line %s\n", line);
1334                         status = false;
1335                 }
1336
1337                 if (! status) {
1338                         goto fail;
1339                 }
1340         }
1341
1342         ctdb->start_time = tevent_timeval_current();
1343         ctdb->recovery_start_time = tevent_timeval_current();
1344         ctdb->vnn_map->recmode = CTDB_RECOVERY_NORMAL;
1345         if (ctdb->vnn_map->generation == INVALID_GENERATION) {
1346                 ctdb->vnn_map->generation =
1347                         new_generation(ctdb->vnn_map->generation);
1348         }
1349         ctdb->recovery_end_time = tevent_timeval_current();
1350
1351         ctdb->log_level = DEBUG_ERR;
1352
1353         ctdb_tunable_set_defaults(&ctdb->tun_list);
1354
1355         return ctdb;
1356
1357 fail:
1358         TALLOC_FREE(ctdb);
1359         return NULL;
1360 }
1361
1362 static bool ctdbd_verify(struct ctdbd_context *ctdb)
1363 {
1364         struct node *node;
1365         unsigned int i;
1366
1367         if (ctdb->node_map->num_nodes == 0) {
1368                 return true;
1369         }
1370
1371         /* Make sure all the nodes are in order */
1372         for (i=0; i<ctdb->node_map->num_nodes; i++) {
1373                 node = &ctdb->node_map->node[i];
1374                 if (node->pnn != i) {
1375                         fprintf(stderr, "Expected node %u, found %u\n",
1376                                 i, node->pnn);
1377                         return false;
1378                 }
1379         }
1380
1381         node = &ctdb->node_map->node[ctdb->node_map->pnn];
1382         if (node->flags & NODE_FLAGS_DISCONNECTED) {
1383                 DEBUG(DEBUG_INFO, ("Node disconnected, exiting\n"));
1384                 exit(0);
1385         }
1386
1387         return true;
1388 }
1389
1390 /*
1391  * Doing a recovery
1392  */
1393
1394 struct recover_state {
1395         struct tevent_context *ev;
1396         struct ctdbd_context *ctdb;
1397 };
1398
1399 static int recover_check(struct tevent_req *req);
1400 static void recover_wait_done(struct tevent_req *subreq);
1401 static void recover_done(struct tevent_req *subreq);
1402
1403 static struct tevent_req *recover_send(TALLOC_CTX *mem_ctx,
1404                                        struct tevent_context *ev,
1405                                        struct ctdbd_context *ctdb)
1406 {
1407         struct tevent_req *req;
1408         struct recover_state *state;
1409         int ret;
1410
1411         req = tevent_req_create(mem_ctx, &state, struct recover_state);
1412         if (req == NULL) {
1413                 return NULL;
1414         }
1415
1416         state->ev = ev;
1417         state->ctdb = ctdb;
1418
1419         ret = recover_check(req);
1420         if (ret != 0) {
1421                 tevent_req_error(req, ret);
1422                 return tevent_req_post(req, ev);
1423         }
1424
1425         return req;
1426 }
1427
1428 static int recover_check(struct tevent_req *req)
1429 {
1430         struct recover_state *state = tevent_req_data(
1431                 req, struct recover_state);
1432         struct ctdbd_context *ctdb = state->ctdb;
1433         struct tevent_req *subreq;
1434         bool recovery_disabled;
1435         unsigned int i;
1436
1437         recovery_disabled = false;
1438         for (i=0; i<ctdb->node_map->num_nodes; i++) {
1439                 if (ctdb->node_map->node[i].recovery_disabled) {
1440                         recovery_disabled = true;
1441                         break;
1442                 }
1443         }
1444
1445         subreq = tevent_wakeup_send(state, state->ev,
1446                                     tevent_timeval_current_ofs(1, 0));
1447         if (subreq == NULL) {
1448                 return ENOMEM;
1449         }
1450
1451         if (recovery_disabled) {
1452                 tevent_req_set_callback(subreq, recover_wait_done, req);
1453         } else {
1454                 ctdb->recovery_start_time = tevent_timeval_current();
1455                 tevent_req_set_callback(subreq, recover_done, req);
1456         }
1457
1458         return 0;
1459 }
1460
1461 static void recover_wait_done(struct tevent_req *subreq)
1462 {
1463         struct tevent_req *req = tevent_req_callback_data(
1464                 subreq, struct tevent_req);
1465         int ret;
1466         bool status;
1467
1468         status = tevent_wakeup_recv(subreq);
1469         TALLOC_FREE(subreq);
1470         if (! status) {
1471                 tevent_req_error(req, EIO);
1472                 return;
1473         }
1474
1475         ret = recover_check(req);
1476         if (ret != 0) {
1477                 tevent_req_error(req, ret);
1478         }
1479 }
1480
1481 static void recover_done(struct tevent_req *subreq)
1482 {
1483         struct tevent_req *req = tevent_req_callback_data(
1484                 subreq, struct tevent_req);
1485         struct recover_state *state = tevent_req_data(
1486                 req, struct recover_state);
1487         struct ctdbd_context *ctdb = state->ctdb;
1488         bool status;
1489
1490         status = tevent_wakeup_recv(subreq);
1491         TALLOC_FREE(subreq);
1492         if (! status) {
1493                 tevent_req_error(req, EIO);
1494                 return;
1495         }
1496
1497         ctdb->vnn_map->recmode = CTDB_RECOVERY_NORMAL;
1498         ctdb->recovery_end_time = tevent_timeval_current();
1499         ctdb->vnn_map->generation = new_generation(ctdb->vnn_map->generation);
1500
1501         tevent_req_done(req);
1502 }
1503
1504 static bool recover_recv(struct tevent_req *req, int *perr)
1505 {
1506         int err;
1507
1508         if (tevent_req_is_unix_error(req, &err)) {
1509                 if (perr != NULL) {
1510                         *perr = err;
1511                 }
1512                 return false;
1513         }
1514
1515         return true;
1516 }
1517
1518 /*
1519  * Routines for ctdb_req_header
1520  */
1521
1522 static void header_fix_pnn(struct ctdb_req_header *header,
1523                            struct ctdbd_context *ctdb)
1524 {
1525         if (header->srcnode == CTDB_CURRENT_NODE) {
1526                 header->srcnode = ctdb->node_map->pnn;
1527         }
1528
1529         if (header->destnode == CTDB_CURRENT_NODE) {
1530                 header->destnode = ctdb->node_map->pnn;
1531         }
1532 }
1533
1534 static struct ctdb_req_header header_reply_call(
1535                                         struct ctdb_req_header *header,
1536                                         struct ctdbd_context *ctdb)
1537 {
1538         struct ctdb_req_header reply_header;
1539
1540         reply_header = (struct ctdb_req_header) {
1541                 .ctdb_magic = CTDB_MAGIC,
1542                 .ctdb_version = CTDB_PROTOCOL,
1543                 .generation = ctdb->vnn_map->generation,
1544                 .operation = CTDB_REPLY_CALL,
1545                 .destnode = header->srcnode,
1546                 .srcnode = header->destnode,
1547                 .reqid = header->reqid,
1548         };
1549
1550         return reply_header;
1551 }
1552
1553 static struct ctdb_req_header header_reply_control(
1554                                         struct ctdb_req_header *header,
1555                                         struct ctdbd_context *ctdb)
1556 {
1557         struct ctdb_req_header reply_header;
1558
1559         reply_header = (struct ctdb_req_header) {
1560                 .ctdb_magic = CTDB_MAGIC,
1561                 .ctdb_version = CTDB_PROTOCOL,
1562                 .generation = ctdb->vnn_map->generation,
1563                 .operation = CTDB_REPLY_CONTROL,
1564                 .destnode = header->srcnode,
1565                 .srcnode = header->destnode,
1566                 .reqid = header->reqid,
1567         };
1568
1569         return reply_header;
1570 }
1571
1572 static struct ctdb_req_header header_reply_message(
1573                                         struct ctdb_req_header *header,
1574                                         struct ctdbd_context *ctdb)
1575 {
1576         struct ctdb_req_header reply_header;
1577
1578         reply_header = (struct ctdb_req_header) {
1579                 .ctdb_magic = CTDB_MAGIC,
1580                 .ctdb_version = CTDB_PROTOCOL,
1581                 .generation = ctdb->vnn_map->generation,
1582                 .operation = CTDB_REQ_MESSAGE,
1583                 .destnode = header->srcnode,
1584                 .srcnode = header->destnode,
1585                 .reqid = 0,
1586         };
1587
1588         return reply_header;
1589 }
1590
1591 /*
1592  * Client state
1593  */
1594
1595 struct client_state {
1596         struct tevent_context *ev;
1597         int fd;
1598         struct ctdbd_context *ctdb;
1599         int pnn;
1600         pid_t pid;
1601         struct comm_context *comm;
1602         struct srvid_register_state *rstate;
1603         int status;
1604 };
1605
1606 /*
1607  * Send replies to call, controls and messages
1608  */
1609
1610 static void client_reply_done(struct tevent_req *subreq);
1611
1612 static void client_send_call(struct tevent_req *req,
1613                              struct ctdb_req_header *header,
1614                              struct ctdb_reply_call *reply)
1615 {
1616         struct client_state *state = tevent_req_data(
1617                 req, struct client_state);
1618         struct ctdbd_context *ctdb = state->ctdb;
1619         struct tevent_req *subreq;
1620         struct ctdb_req_header reply_header;
1621         uint8_t *buf;
1622         size_t datalen, buflen;
1623         int ret;
1624
1625         reply_header = header_reply_call(header, ctdb);
1626
1627         datalen = ctdb_reply_call_len(&reply_header, reply);
1628         ret = ctdb_allocate_pkt(state, datalen, &buf, &buflen);
1629         if (ret != 0) {
1630                 tevent_req_error(req, ret);
1631                 return;
1632         }
1633
1634         ret = ctdb_reply_call_push(&reply_header, reply, buf, &buflen);
1635         if (ret != 0) {
1636                 tevent_req_error(req, ret);
1637                 return;
1638         }
1639
1640         subreq = comm_write_send(state, state->ev, state->comm, buf, buflen);
1641         if (tevent_req_nomem(subreq, req)) {
1642                 return;
1643         }
1644         tevent_req_set_callback(subreq, client_reply_done, req);
1645
1646         talloc_steal(subreq, buf);
1647 }
1648
1649 static void client_send_message(struct tevent_req *req,
1650                                 struct ctdb_req_header *header,
1651                                 struct ctdb_req_message_data *message)
1652 {
1653         struct client_state *state = tevent_req_data(
1654                 req, struct client_state);
1655         struct ctdbd_context *ctdb = state->ctdb;
1656         struct tevent_req *subreq;
1657         struct ctdb_req_header reply_header;
1658         uint8_t *buf;
1659         size_t datalen, buflen;
1660         int ret;
1661
1662         reply_header = header_reply_message(header, ctdb);
1663
1664         datalen = ctdb_req_message_data_len(&reply_header, message);
1665         ret = ctdb_allocate_pkt(state, datalen, &buf, &buflen);
1666         if (ret != 0) {
1667                 tevent_req_error(req, ret);
1668                 return;
1669         }
1670
1671         ret = ctdb_req_message_data_push(&reply_header, message,
1672                                          buf, &buflen);
1673         if (ret != 0) {
1674                 tevent_req_error(req, ret);
1675                 return;
1676         }
1677
1678         DEBUG(DEBUG_INFO, ("message srvid = 0x%"PRIx64"\n", message->srvid));
1679
1680         subreq = comm_write_send(state, state->ev, state->comm, buf, buflen);
1681         if (tevent_req_nomem(subreq, req)) {
1682                 return;
1683         }
1684         tevent_req_set_callback(subreq, client_reply_done, req);
1685
1686         talloc_steal(subreq, buf);
1687 }
1688
1689 static void client_send_control(struct tevent_req *req,
1690                                 struct ctdb_req_header *header,
1691                                 struct ctdb_reply_control *reply)
1692 {
1693         struct client_state *state = tevent_req_data(
1694                 req, struct client_state);
1695         struct ctdbd_context *ctdb = state->ctdb;
1696         struct tevent_req *subreq;
1697         struct ctdb_req_header reply_header;
1698         uint8_t *buf;
1699         size_t datalen, buflen;
1700         int ret;
1701
1702         reply_header = header_reply_control(header, ctdb);
1703
1704         datalen = ctdb_reply_control_len(&reply_header, reply);
1705         ret = ctdb_allocate_pkt(state, datalen, &buf, &buflen);
1706         if (ret != 0) {
1707                 tevent_req_error(req, ret);
1708                 return;
1709         }
1710
1711         ret = ctdb_reply_control_push(&reply_header, reply, buf, &buflen);
1712         if (ret != 0) {
1713                 tevent_req_error(req, ret);
1714                 return;
1715         }
1716
1717         DEBUG(DEBUG_INFO, ("reply opcode = %u\n", reply->rdata.opcode));
1718
1719         subreq = comm_write_send(state, state->ev, state->comm, buf, buflen);
1720         if (tevent_req_nomem(subreq, req)) {
1721                 return;
1722         }
1723         tevent_req_set_callback(subreq, client_reply_done, req);
1724
1725         talloc_steal(subreq, buf);
1726 }
1727
1728 static void client_reply_done(struct tevent_req *subreq)
1729 {
1730         struct tevent_req *req = tevent_req_callback_data(
1731                 subreq, struct tevent_req);
1732         int ret;
1733         bool status;
1734
1735         status = comm_write_recv(subreq, &ret);
1736         TALLOC_FREE(subreq);
1737         if (! status) {
1738                 tevent_req_error(req, ret);
1739         }
1740 }
1741
1742 /*
1743  * Handling protocol - controls
1744  */
1745
1746 static void control_process_exists(TALLOC_CTX *mem_ctx,
1747                                    struct tevent_req *req,
1748                                    struct ctdb_req_header *header,
1749                                    struct ctdb_req_control *request)
1750 {
1751         struct client_state *state = tevent_req_data(
1752                 req, struct client_state);
1753         struct ctdbd_context *ctdb = state->ctdb;
1754         struct client_state *cstate;
1755         struct ctdb_reply_control reply;
1756
1757         reply.rdata.opcode = request->opcode;
1758
1759         cstate = client_find(ctdb, request->rdata.data.pid);
1760         if (cstate == NULL) {
1761                 reply.status = -1;
1762                 reply.errmsg = "No client for PID";
1763         } else {
1764                 reply.status = kill(request->rdata.data.pid, 0);
1765                 reply.errmsg = NULL;
1766         }
1767
1768         client_send_control(req, header, &reply);
1769 }
1770
1771 static void control_ping(TALLOC_CTX *mem_ctx,
1772                          struct tevent_req *req,
1773                          struct ctdb_req_header *header,
1774                          struct ctdb_req_control *request)
1775 {
1776         struct client_state *state = tevent_req_data(
1777                 req, struct client_state);
1778         struct ctdbd_context *ctdb = state->ctdb;
1779         struct ctdb_reply_control reply;
1780
1781         reply.rdata.opcode = request->opcode;
1782         reply.status = ctdb->num_clients;
1783         reply.errmsg = NULL;
1784
1785         client_send_control(req, header, &reply);
1786 }
1787
1788 static void control_getdbpath(TALLOC_CTX *mem_ctx,
1789                               struct tevent_req *req,
1790                               struct ctdb_req_header *header,
1791                               struct ctdb_req_control *request)
1792 {
1793         struct client_state *state = tevent_req_data(
1794                 req, struct client_state);
1795         struct ctdbd_context *ctdb = state->ctdb;
1796         struct ctdb_reply_control reply;
1797         struct database *db;
1798
1799         reply.rdata.opcode = request->opcode;
1800
1801         db = database_find(ctdb->db_map, request->rdata.data.db_id);
1802         if (db == NULL) {
1803                 reply.status = ENOENT;
1804                 reply.errmsg = "Database not found";
1805         } else {
1806                 reply.rdata.data.db_path =
1807                         talloc_strdup(mem_ctx, db->path);
1808                 if (reply.rdata.data.db_path == NULL) {
1809                         reply.status = ENOMEM;
1810                         reply.errmsg = "Memory error";
1811                 } else {
1812                         reply.status = 0;
1813                         reply.errmsg = NULL;
1814                 }
1815         }
1816
1817         client_send_control(req, header, &reply);
1818 }
1819
1820 static void control_getvnnmap(TALLOC_CTX *mem_ctx,
1821                               struct tevent_req *req,
1822                               struct ctdb_req_header *header,
1823                               struct ctdb_req_control *request)
1824 {
1825         struct client_state *state = tevent_req_data(
1826                 req, struct client_state);
1827         struct ctdbd_context *ctdb = state->ctdb;
1828         struct ctdb_reply_control reply;
1829         struct ctdb_vnn_map *vnnmap;
1830
1831         reply.rdata.opcode = request->opcode;
1832
1833         vnnmap = talloc_zero(mem_ctx, struct ctdb_vnn_map);
1834         if (vnnmap == NULL) {
1835                 reply.status = ENOMEM;
1836                 reply.errmsg = "Memory error";
1837         } else {
1838                 vnnmap->generation = ctdb->vnn_map->generation;
1839                 vnnmap->size = ctdb->vnn_map->size;
1840                 vnnmap->map = ctdb->vnn_map->map;
1841
1842                 reply.rdata.data.vnnmap = vnnmap;
1843                 reply.status = 0;
1844                 reply.errmsg = NULL;
1845         }
1846
1847         client_send_control(req, header, &reply);
1848 }
1849
1850 static void control_get_debug(TALLOC_CTX *mem_ctx,
1851                               struct tevent_req *req,
1852                               struct ctdb_req_header *header,
1853                               struct ctdb_req_control *request)
1854 {
1855         struct client_state *state = tevent_req_data(
1856                 req, struct client_state);
1857         struct ctdbd_context *ctdb = state->ctdb;
1858         struct ctdb_reply_control reply;
1859
1860         reply.rdata.opcode = request->opcode;
1861         reply.rdata.data.loglevel = (uint32_t)ctdb->log_level;
1862         reply.status = 0;
1863         reply.errmsg = NULL;
1864
1865         client_send_control(req, header, &reply);
1866 }
1867
1868 static void control_set_debug(TALLOC_CTX *mem_ctx,
1869                               struct tevent_req *req,
1870                               struct ctdb_req_header *header,
1871                               struct ctdb_req_control *request)
1872 {
1873         struct client_state *state = tevent_req_data(
1874                 req, struct client_state);
1875         struct ctdbd_context *ctdb = state->ctdb;
1876         struct ctdb_reply_control reply;
1877
1878         ctdb->log_level = (int)request->rdata.data.loglevel;
1879
1880         reply.rdata.opcode = request->opcode;
1881         reply.status = 0;
1882         reply.errmsg = NULL;
1883
1884         client_send_control(req, header, &reply);
1885 }
1886
1887 static void control_get_dbmap(TALLOC_CTX *mem_ctx,
1888                               struct tevent_req *req,
1889                                struct ctdb_req_header *header,
1890                               struct ctdb_req_control *request)
1891 {
1892         struct client_state *state = tevent_req_data(
1893                 req, struct client_state);
1894         struct ctdbd_context *ctdb = state->ctdb;
1895         struct ctdb_reply_control reply;
1896         struct ctdb_dbid_map *dbmap;
1897         struct database *db;
1898         unsigned int i;
1899
1900         reply.rdata.opcode = request->opcode;
1901
1902         dbmap = talloc_zero(mem_ctx, struct ctdb_dbid_map);
1903         if (dbmap == NULL) {
1904                 goto fail;
1905         }
1906
1907         dbmap->num = database_count(ctdb->db_map);
1908         dbmap->dbs = talloc_zero_array(dbmap, struct ctdb_dbid, dbmap->num);
1909         if (dbmap->dbs == NULL) {
1910                 goto fail;
1911         }
1912
1913         db = ctdb->db_map->db;
1914         for (i = 0; i < dbmap->num; i++) {
1915                 dbmap->dbs[i] = (struct ctdb_dbid) {
1916                         .db_id = db->id,
1917                         .flags = db->flags,
1918                 };
1919
1920                 db = db->next;
1921         }
1922
1923         reply.rdata.data.dbmap = dbmap;
1924         reply.status = 0;
1925         reply.errmsg = NULL;
1926         client_send_control(req, header, &reply);
1927         return;
1928
1929 fail:
1930         reply.status = -1;
1931         reply.errmsg = "Memory error";
1932         client_send_control(req, header, &reply);
1933 }
1934
1935 static void control_get_recmode(TALLOC_CTX *mem_ctx,
1936                                 struct tevent_req *req,
1937                                 struct ctdb_req_header *header,
1938                                 struct ctdb_req_control *request)
1939 {
1940         struct client_state *state = tevent_req_data(
1941                 req, struct client_state);
1942         struct ctdbd_context *ctdb = state->ctdb;
1943         struct ctdb_reply_control reply;
1944
1945         reply.rdata.opcode = request->opcode;
1946         reply.status = ctdb->vnn_map->recmode;
1947         reply.errmsg = NULL;
1948
1949         client_send_control(req, header, &reply);
1950 }
1951
1952 struct set_recmode_state {
1953         struct tevent_req *req;
1954         struct ctdbd_context *ctdb;
1955         struct ctdb_req_header header;
1956         struct ctdb_reply_control reply;
1957 };
1958
1959 static void set_recmode_callback(struct tevent_req *subreq)
1960 {
1961         struct set_recmode_state *substate = tevent_req_callback_data(
1962                 subreq, struct set_recmode_state);
1963         bool status;
1964         int ret;
1965
1966         status = recover_recv(subreq, &ret);
1967         TALLOC_FREE(subreq);
1968         if (! status) {
1969                 substate->reply.status = ret;
1970                 substate->reply.errmsg = "recovery failed";
1971         } else {
1972                 substate->reply.status = 0;
1973                 substate->reply.errmsg = NULL;
1974         }
1975
1976         client_send_control(substate->req, &substate->header, &substate->reply);
1977         talloc_free(substate);
1978 }
1979
1980 static void control_set_recmode(TALLOC_CTX *mem_ctx,
1981                                 struct tevent_req *req,
1982                                 struct ctdb_req_header *header,
1983                                 struct ctdb_req_control *request)
1984 {
1985         struct client_state *state = tevent_req_data(
1986                 req, struct client_state);
1987         struct tevent_req *subreq;
1988         struct ctdbd_context *ctdb = state->ctdb;
1989         struct set_recmode_state *substate;
1990         struct ctdb_reply_control reply;
1991
1992         reply.rdata.opcode = request->opcode;
1993
1994         if (request->rdata.data.recmode == CTDB_RECOVERY_NORMAL) {
1995                 reply.status = -1;
1996                 reply.errmsg = "Client cannot set recmode to NORMAL";
1997                 goto fail;
1998         }
1999
2000         substate = talloc_zero(ctdb, struct set_recmode_state);
2001         if (substate == NULL) {
2002                 reply.status = -1;
2003                 reply.errmsg = "Memory error";
2004                 goto fail;
2005         }
2006
2007         substate->req = req;
2008         substate->ctdb = ctdb;
2009         substate->header = *header;
2010         substate->reply.rdata.opcode = request->opcode;
2011
2012         subreq = recover_send(substate, state->ev, state->ctdb);
2013         if (subreq == NULL) {
2014                 talloc_free(substate);
2015                 goto fail;
2016         }
2017         tevent_req_set_callback(subreq, set_recmode_callback, substate);
2018
2019         ctdb->vnn_map->recmode = CTDB_RECOVERY_ACTIVE;
2020         return;
2021
2022 fail:
2023         client_send_control(req, header, &reply);
2024
2025 }
2026
2027 static void control_db_attach(TALLOC_CTX *mem_ctx,
2028                               struct tevent_req *req,
2029                               struct ctdb_req_header *header,
2030                               struct ctdb_req_control *request)
2031 {
2032         struct client_state *state = tevent_req_data(
2033                 req, struct client_state);
2034         struct ctdbd_context *ctdb = state->ctdb;
2035         struct ctdb_reply_control reply;
2036         struct database *db;
2037
2038         reply.rdata.opcode = request->opcode;
2039
2040         for (db = ctdb->db_map->db; db != NULL; db = db->next) {
2041                 if (strcmp(db->name, request->rdata.data.db_name) == 0) {
2042                         goto done;
2043                 }
2044         }
2045
2046         db = database_new(ctdb->db_map, request->rdata.data.db_name, 0);
2047         if (db == NULL) {
2048                 reply.status = -1;
2049                 reply.errmsg = "Failed to attach database";
2050                 client_send_control(req, header, &reply);
2051                 return;
2052         }
2053
2054 done:
2055         reply.rdata.data.db_id = db->id;
2056         reply.status = 0;
2057         reply.errmsg = NULL;
2058         client_send_control(req, header, &reply);
2059 }
2060
2061 static void srvid_handler_done(struct tevent_req *subreq);
2062
2063 static void srvid_handler(uint64_t srvid, TDB_DATA data, void *private_data)
2064 {
2065         struct client_state *state = talloc_get_type_abort(
2066                 private_data, struct client_state);
2067         struct ctdbd_context *ctdb = state->ctdb;
2068         struct tevent_req *subreq;
2069         struct ctdb_req_header request_header;
2070         struct ctdb_req_message_data message;
2071         uint8_t *buf;
2072         size_t datalen, buflen;
2073         int ret;
2074
2075         request_header = (struct ctdb_req_header) {
2076                 .ctdb_magic = CTDB_MAGIC,
2077                 .ctdb_version = CTDB_PROTOCOL,
2078                 .generation = ctdb->vnn_map->generation,
2079                 .operation = CTDB_REQ_MESSAGE,
2080                 .destnode = state->pnn,
2081                 .srcnode = ctdb->node_map->recmaster,
2082                 .reqid = 0,
2083         };
2084
2085         message = (struct ctdb_req_message_data) {
2086                 .srvid = srvid,
2087                 .data = data,
2088         };
2089
2090         datalen = ctdb_req_message_data_len(&request_header, &message);
2091         ret = ctdb_allocate_pkt(state, datalen, &buf, &buflen);
2092         if (ret != 0) {
2093                 return;
2094         }
2095
2096         ret = ctdb_req_message_data_push(&request_header,
2097                                          &message,
2098                                          buf,
2099                                          &buflen);
2100         if (ret != 0) {
2101                 talloc_free(buf);
2102                 return;
2103         }
2104
2105         subreq = comm_write_send(state, state->ev, state->comm, buf, buflen);
2106         if (subreq == NULL) {
2107                 talloc_free(buf);
2108                 return;
2109         }
2110         tevent_req_set_callback(subreq, srvid_handler_done, state);
2111
2112         talloc_steal(subreq, buf);
2113 }
2114
2115 static void srvid_handler_done(struct tevent_req *subreq)
2116 {
2117         struct client_state *state = tevent_req_callback_data(
2118                 subreq, struct client_state);
2119         int ret;
2120         bool ok;
2121
2122         ok = comm_write_recv(subreq, &ret);
2123         TALLOC_FREE(subreq);
2124         if (!ok) {
2125                 DEBUG(DEBUG_ERR,
2126                       ("Failed to dispatch message to client pid=%u, ret=%d\n",
2127                        state->pid,
2128                        ret));
2129         }
2130 }
2131
2132 static void control_register_srvid(TALLOC_CTX *mem_ctx,
2133                                    struct tevent_req *req,
2134                                    struct ctdb_req_header *header,
2135                                    struct ctdb_req_control *request)
2136 {
2137         struct client_state *state = tevent_req_data(
2138                 req, struct client_state);
2139         struct ctdbd_context *ctdb = state->ctdb;
2140         struct ctdb_reply_control reply;
2141         int ret;
2142
2143         reply.rdata.opcode = request->opcode;
2144
2145         ret = srvid_register(ctdb->srv, state, request->srvid,
2146                              srvid_handler, state);
2147         if (ret != 0) {
2148                 reply.status = -1;
2149                 reply.errmsg = "Memory error";
2150                 goto fail;
2151         }
2152
2153         DEBUG(DEBUG_INFO, ("Register srvid 0x%"PRIx64"\n", request->srvid));
2154
2155         reply.status = 0;
2156         reply.errmsg = NULL;
2157
2158 fail:
2159         client_send_control(req, header, &reply);
2160 }
2161
2162 static void control_deregister_srvid(TALLOC_CTX *mem_ctx,
2163                                      struct tevent_req *req,
2164                                      struct ctdb_req_header *header,
2165                                      struct ctdb_req_control *request)
2166 {
2167         struct client_state *state = tevent_req_data(
2168                 req, struct client_state);
2169         struct ctdbd_context *ctdb = state->ctdb;
2170         struct ctdb_reply_control reply;
2171         int ret;
2172
2173         reply.rdata.opcode = request->opcode;
2174
2175         ret = srvid_deregister(ctdb->srv, request->srvid, state);
2176         if (ret != 0) {
2177                 reply.status = -1;
2178                 reply.errmsg = "srvid not registered";
2179                 goto fail;
2180         }
2181
2182         DEBUG(DEBUG_INFO, ("Deregister srvid 0x%"PRIx64"\n", request->srvid));
2183
2184         reply.status = 0;
2185         reply.errmsg = NULL;
2186
2187 fail:
2188         client_send_control(req, header, &reply);
2189 }
2190
2191 static void control_get_dbname(TALLOC_CTX *mem_ctx,
2192                                struct tevent_req *req,
2193                                struct ctdb_req_header *header,
2194                                struct ctdb_req_control *request)
2195 {
2196         struct client_state *state = tevent_req_data(
2197                 req, struct client_state);
2198         struct ctdbd_context *ctdb = state->ctdb;
2199         struct ctdb_reply_control reply;
2200         struct database *db;
2201
2202         reply.rdata.opcode = request->opcode;
2203
2204         db = database_find(ctdb->db_map, request->rdata.data.db_id);
2205         if (db == NULL) {
2206                 reply.status = ENOENT;
2207                 reply.errmsg = "Database not found";
2208         } else {
2209                 reply.rdata.data.db_name = talloc_strdup(mem_ctx, db->name);
2210                 if (reply.rdata.data.db_name == NULL) {
2211                         reply.status = ENOMEM;
2212                         reply.errmsg = "Memory error";
2213                 } else {
2214                         reply.status = 0;
2215                         reply.errmsg = NULL;
2216                 }
2217         }
2218
2219         client_send_control(req, header, &reply);
2220 }
2221
2222 static void control_get_pid(TALLOC_CTX *mem_ctx,
2223                             struct tevent_req *req,
2224                             struct ctdb_req_header *header,
2225                             struct ctdb_req_control *request)
2226 {
2227         struct ctdb_reply_control reply;
2228
2229         reply.rdata.opcode = request->opcode;
2230         reply.status = getpid();
2231         reply.errmsg = NULL;
2232
2233         client_send_control(req, header, &reply);
2234 }
2235
2236 static void control_get_pnn(TALLOC_CTX *mem_ctx,
2237                             struct tevent_req *req,
2238                             struct ctdb_req_header *header,
2239                             struct ctdb_req_control *request)
2240 {
2241         struct ctdb_reply_control reply;
2242
2243         reply.rdata.opcode = request->opcode;
2244         reply.status = header->destnode;
2245         reply.errmsg = NULL;
2246
2247         client_send_control(req, header, &reply);
2248 }
2249
2250 static void control_shutdown(TALLOC_CTX *mem_ctx,
2251                              struct tevent_req *req,
2252                              struct ctdb_req_header *hdr,
2253                              struct ctdb_req_control *request)
2254 {
2255         struct client_state *state = tevent_req_data(
2256                 req, struct client_state);
2257
2258         state->status = 99;
2259 }
2260
2261 static void control_set_tunable(TALLOC_CTX *mem_ctx,
2262                                 struct tevent_req *req,
2263                                 struct ctdb_req_header *header,
2264                                 struct ctdb_req_control *request)
2265 {
2266         struct client_state *state = tevent_req_data(
2267                 req, struct client_state);
2268         struct ctdbd_context *ctdb = state->ctdb;
2269         struct ctdb_reply_control reply;
2270         bool ret, obsolete;
2271
2272         reply.rdata.opcode = request->opcode;
2273         reply.errmsg = NULL;
2274
2275         ret = ctdb_tunable_set_value(&ctdb->tun_list,
2276                                      request->rdata.data.tunable->name,
2277                                      request->rdata.data.tunable->value,
2278                                      &obsolete);
2279         if (! ret) {
2280                 reply.status = -1;
2281         } else if (obsolete) {
2282                 reply.status = 1;
2283         } else {
2284                 reply.status = 0;
2285         }
2286
2287         client_send_control(req, header, &reply);
2288 }
2289
2290 static void control_get_tunable(TALLOC_CTX *mem_ctx,
2291                                 struct tevent_req *req,
2292                                 struct ctdb_req_header *header,
2293                                 struct ctdb_req_control *request)
2294 {
2295         struct client_state *state = tevent_req_data(
2296                 req, struct client_state);
2297         struct ctdbd_context *ctdb = state->ctdb;
2298         struct ctdb_reply_control reply;
2299         uint32_t value;
2300         bool ret;
2301
2302         reply.rdata.opcode = request->opcode;
2303         reply.errmsg = NULL;
2304
2305         ret = ctdb_tunable_get_value(&ctdb->tun_list,
2306                                      request->rdata.data.tun_var, &value);
2307         if (! ret) {
2308                 reply.status = -1;
2309         } else {
2310                 reply.rdata.data.tun_value = value;
2311                 reply.status = 0;
2312         }
2313
2314         client_send_control(req, header, &reply);
2315 }
2316
2317 static void control_list_tunables(TALLOC_CTX *mem_ctx,
2318                                   struct tevent_req *req,
2319                                   struct ctdb_req_header *header,
2320                                   struct ctdb_req_control *request)
2321 {
2322         struct ctdb_reply_control reply;
2323         struct ctdb_var_list *var_list;
2324
2325         reply.rdata.opcode = request->opcode;
2326         reply.errmsg = NULL;
2327
2328         var_list = ctdb_tunable_names(mem_ctx);
2329         if (var_list == NULL) {
2330                 reply.status = -1;
2331         } else {
2332                 reply.rdata.data.tun_var_list = var_list;
2333                 reply.status = 0;
2334         }
2335
2336         client_send_control(req, header, &reply);
2337 }
2338
2339 static void control_modify_flags(TALLOC_CTX *mem_ctx,
2340                                  struct tevent_req *req,
2341                                  struct ctdb_req_header *header,
2342                                  struct ctdb_req_control *request)
2343 {
2344         struct client_state *state = tevent_req_data(
2345                 req, struct client_state);
2346         struct ctdbd_context *ctdb = state->ctdb;
2347         struct ctdb_node_flag_change *change = request->rdata.data.flag_change;
2348         struct ctdb_reply_control reply;
2349         struct node *node;
2350
2351         reply.rdata.opcode = request->opcode;
2352
2353         if ((change->old_flags & ~NODE_FLAGS_PERMANENTLY_DISABLED) ||
2354             (change->new_flags & ~NODE_FLAGS_PERMANENTLY_DISABLED) != 0) {
2355                 DEBUG(DEBUG_INFO,
2356                       ("MODIFY_FLAGS control not for PERMANENTLY_DISABLED\n"));
2357                 reply.status = EINVAL;
2358                 reply.errmsg = "Failed to MODIFY_FLAGS";
2359                 client_send_control(req, header, &reply);
2360                 return;
2361         }
2362
2363         /* There's all sorts of broadcast weirdness here.  Only change
2364          * the specified node, not the destination node of the
2365          * control. */
2366         node = &ctdb->node_map->node[change->pnn];
2367
2368         if ((node->flags &
2369              change->old_flags & NODE_FLAGS_PERMANENTLY_DISABLED) == 0 &&
2370             (change->new_flags & NODE_FLAGS_PERMANENTLY_DISABLED) != 0) {
2371                 DEBUG(DEBUG_INFO,("Disabling node %d\n", header->destnode));
2372                 node->flags |= NODE_FLAGS_PERMANENTLY_DISABLED;
2373                 goto done;
2374         }
2375
2376         if ((node->flags &
2377              change->old_flags & NODE_FLAGS_PERMANENTLY_DISABLED) != 0 &&
2378             (change->new_flags & NODE_FLAGS_PERMANENTLY_DISABLED) == 0) {
2379                 DEBUG(DEBUG_INFO,("Enabling node %d\n", header->destnode));
2380                 node->flags &= ~NODE_FLAGS_PERMANENTLY_DISABLED;
2381                 goto done;
2382         }
2383
2384         DEBUG(DEBUG_INFO, ("Flags unchanged for node %d\n", header->destnode));
2385
2386 done:
2387         reply.status = 0;
2388         reply.errmsg = NULL;
2389         client_send_control(req, header, &reply);
2390 }
2391
2392 static void control_get_all_tunables(TALLOC_CTX *mem_ctx,
2393                                      struct tevent_req *req,
2394                                      struct ctdb_req_header *header,
2395                                      struct ctdb_req_control *request)
2396 {
2397         struct client_state *state = tevent_req_data(
2398                 req, struct client_state);
2399         struct ctdbd_context *ctdb = state->ctdb;
2400         struct ctdb_reply_control reply;
2401
2402         reply.rdata.opcode = request->opcode;
2403         reply.rdata.data.tun_list = &ctdb->tun_list;
2404         reply.status = 0;
2405         reply.errmsg = NULL;
2406
2407         client_send_control(req, header, &reply);
2408 }
2409
2410 static void control_db_attach_persistent(TALLOC_CTX *mem_ctx,
2411                                          struct tevent_req *req,
2412                                          struct ctdb_req_header *header,
2413                                          struct ctdb_req_control *request)
2414 {
2415         struct client_state *state = tevent_req_data(
2416                 req, struct client_state);
2417         struct ctdbd_context *ctdb = state->ctdb;
2418         struct ctdb_reply_control reply;
2419         struct database *db;
2420
2421         reply.rdata.opcode = request->opcode;
2422
2423         for (db = ctdb->db_map->db; db != NULL; db = db->next) {
2424                 if (strcmp(db->name, request->rdata.data.db_name) == 0) {
2425                         goto done;
2426                 }
2427         }
2428
2429         db = database_new(ctdb->db_map, request->rdata.data.db_name,
2430                           CTDB_DB_FLAGS_PERSISTENT);
2431         if (db == NULL) {
2432                 reply.status = -1;
2433                 reply.errmsg = "Failed to attach database";
2434                 client_send_control(req, header, &reply);
2435                 return;
2436         }
2437
2438 done:
2439         reply.rdata.data.db_id = db->id;
2440         reply.status = 0;
2441         reply.errmsg = NULL;
2442         client_send_control(req, header, &reply);
2443 }
2444
2445 static void control_uptime(TALLOC_CTX *mem_ctx,
2446                            struct tevent_req *req,
2447                            struct ctdb_req_header *header,
2448                            struct ctdb_req_control *request)
2449 {
2450         struct client_state *state = tevent_req_data(
2451                 req, struct client_state);
2452         struct ctdbd_context *ctdb = state->ctdb;
2453         struct ctdb_reply_control reply;
2454         struct ctdb_uptime *uptime;;
2455
2456         reply.rdata.opcode = request->opcode;
2457
2458         uptime = talloc_zero(mem_ctx, struct ctdb_uptime);
2459         if (uptime == NULL) {
2460                 goto fail;
2461         }
2462
2463         uptime->current_time = tevent_timeval_current();
2464         uptime->ctdbd_start_time = ctdb->start_time;
2465         uptime->last_recovery_started = ctdb->recovery_start_time;
2466         uptime->last_recovery_finished = ctdb->recovery_end_time;
2467
2468         reply.rdata.data.uptime = uptime;
2469         reply.status = 0;
2470         reply.errmsg = NULL;
2471         client_send_control(req, header, &reply);
2472         return;
2473
2474 fail:
2475         reply.status = -1;
2476         reply.errmsg = "Memory error";
2477         client_send_control(req, header, &reply);
2478 }
2479
2480 static void control_reload_nodes_file(TALLOC_CTX *mem_ctx,
2481                                       struct tevent_req *req,
2482                                       struct ctdb_req_header *header,
2483                                       struct ctdb_req_control *request)
2484 {
2485         struct client_state *state = tevent_req_data(
2486                 req, struct client_state);
2487         struct ctdbd_context *ctdb = state->ctdb;
2488         struct ctdb_reply_control reply;
2489         struct ctdb_node_map *nodemap;
2490         struct node_map *node_map = ctdb->node_map;
2491         unsigned int i;
2492
2493         reply.rdata.opcode = request->opcode;
2494
2495         nodemap = read_nodes_file(mem_ctx, header->destnode);
2496         if (nodemap == NULL) {
2497                 goto fail;
2498         }
2499
2500         for (i=0; i<nodemap->num; i++) {
2501                 struct node *node;
2502
2503                 if (i < node_map->num_nodes &&
2504                     ctdb_sock_addr_same(&nodemap->node[i].addr,
2505                                         &node_map->node[i].addr)) {
2506                         continue;
2507                 }
2508
2509                 if (nodemap->node[i].flags & NODE_FLAGS_DELETED) {
2510                         int ret;
2511
2512                         node = &node_map->node[i];
2513
2514                         node->flags |= NODE_FLAGS_DELETED;
2515                         ret = ctdb_sock_addr_from_string("0.0.0.0", &node->addr,
2516                                                          false);
2517                         if (ret != 0) {
2518                                 /* Can't happen, but Coverity... */
2519                                 goto fail;
2520                         }
2521
2522                         continue;
2523                 }
2524
2525                 if (i < node_map->num_nodes &&
2526                     node_map->node[i].flags & NODE_FLAGS_DELETED) {
2527                         node = &node_map->node[i];
2528
2529                         node->flags &= ~NODE_FLAGS_DELETED;
2530                         node->addr = nodemap->node[i].addr;
2531
2532                         continue;
2533                 }
2534
2535                 node_map->node = talloc_realloc(node_map, node_map->node,
2536                                                 struct node,
2537                                                 node_map->num_nodes+1);
2538                 if (node_map->node == NULL) {
2539                         goto fail;
2540                 }
2541                 node = &node_map->node[node_map->num_nodes];
2542
2543                 node->addr = nodemap->node[i].addr;
2544                 node->pnn = nodemap->node[i].pnn;
2545                 node->flags = 0;
2546                 node->capabilities = CTDB_CAP_DEFAULT;
2547                 node->recovery_disabled = false;
2548                 node->recovery_substate = NULL;
2549
2550                 node_map->num_nodes += 1;
2551         }
2552
2553         talloc_free(nodemap);
2554
2555         reply.status = 0;
2556         reply.errmsg = NULL;
2557         client_send_control(req, header, &reply);
2558         return;
2559
2560 fail:
2561         reply.status = -1;
2562         reply.errmsg = "Memory error";
2563         client_send_control(req, header, &reply);
2564 }
2565
2566 static void control_get_capabilities(TALLOC_CTX *mem_ctx,
2567                                      struct tevent_req *req,
2568                                      struct ctdb_req_header *header,
2569                                      struct ctdb_req_control *request)
2570 {
2571         struct client_state *state = tevent_req_data(
2572                 req, struct client_state);
2573         struct ctdbd_context *ctdb = state->ctdb;
2574         struct ctdb_reply_control reply;
2575         struct node *node;
2576         uint32_t caps = 0;
2577
2578         reply.rdata.opcode = request->opcode;
2579
2580         node = &ctdb->node_map->node[header->destnode];
2581         caps = node->capabilities;
2582
2583         if (node->flags & NODE_FLAGS_FAKE_TIMEOUT) {
2584                 /* Don't send reply */
2585                 return;
2586         }
2587
2588         reply.rdata.data.caps = caps;
2589         reply.status = 0;
2590         reply.errmsg = NULL;
2591
2592         client_send_control(req, header, &reply);
2593 }
2594
2595 static void control_release_ip(TALLOC_CTX *mem_ctx,
2596                                struct tevent_req *req,
2597                                struct ctdb_req_header *header,
2598                                struct ctdb_req_control *request)
2599 {
2600         struct client_state *state = tevent_req_data(
2601                 req, struct client_state);
2602         struct ctdbd_context *ctdb = state->ctdb;
2603         struct ctdb_public_ip *ip = request->rdata.data.pubip;
2604         struct ctdb_reply_control reply;
2605         struct ctdb_public_ip_list *ips = NULL;
2606         struct ctdb_public_ip *t = NULL;
2607         unsigned int i;
2608
2609         reply.rdata.opcode = request->opcode;
2610
2611         if (ctdb->known_ips == NULL) {
2612                 D_INFO("RELEASE_IP %s - not a public IP\n",
2613                        ctdb_sock_addr_to_string(mem_ctx, &ip->addr, false));
2614                 goto done;
2615         }
2616
2617         ips = &ctdb->known_ips[header->destnode];
2618
2619         t = NULL;
2620         for (i = 0; i < ips->num; i++) {
2621                 if (ctdb_sock_addr_same_ip(&ips->ip[i].addr, &ip->addr)) {
2622                         t = &ips->ip[i];
2623                         break;
2624                 }
2625         }
2626         if (t == NULL) {
2627                 D_INFO("RELEASE_IP %s - not a public IP\n",
2628                        ctdb_sock_addr_to_string(mem_ctx, &ip->addr, false));
2629                 goto done;
2630         }
2631
2632         if (t->pnn != header->destnode) {
2633                 if (header->destnode == ip->pnn) {
2634                         D_ERR("error: RELEASE_IP %s - to TAKE_IP node %d\n",
2635                               ctdb_sock_addr_to_string(mem_ctx,
2636                                                        &ip->addr, false),
2637                               ip->pnn);
2638                         reply.status = -1;
2639                         reply.errmsg = "RELEASE_IP to TAKE_IP node";
2640                         client_send_control(req, header, &reply);
2641                         return;
2642                 }
2643
2644                 D_INFO("RELEASE_IP %s - to node %d - redundant\n",
2645                        ctdb_sock_addr_to_string(mem_ctx, &ip->addr, false),
2646                        ip->pnn);
2647                 t->pnn = ip->pnn;
2648         } else {
2649                 D_NOTICE("RELEASE_IP %s - to node %d\n",
2650                          ctdb_sock_addr_to_string(mem_ctx, &ip->addr, false),
2651                           ip->pnn);
2652                 t->pnn = ip->pnn;
2653         }
2654
2655 done:
2656         reply.status = 0;
2657         reply.errmsg = NULL;
2658         client_send_control(req, header, &reply);
2659 }
2660
2661 static void control_takeover_ip(TALLOC_CTX *mem_ctx,
2662                                 struct tevent_req *req,
2663                                 struct ctdb_req_header *header,
2664                                 struct ctdb_req_control *request)
2665 {
2666         struct client_state *state = tevent_req_data(
2667                 req, struct client_state);
2668         struct ctdbd_context *ctdb = state->ctdb;
2669         struct ctdb_public_ip *ip = request->rdata.data.pubip;
2670         struct ctdb_reply_control reply;
2671         struct ctdb_public_ip_list *ips = NULL;
2672         struct ctdb_public_ip *t = NULL;
2673         unsigned int i;
2674
2675         reply.rdata.opcode = request->opcode;
2676
2677         if (ctdb->known_ips == NULL) {
2678                 D_INFO("TAKEOVER_IP %s - not a public IP\n",
2679                        ctdb_sock_addr_to_string(mem_ctx, &ip->addr, false));
2680                 goto done;
2681         }
2682
2683         ips = &ctdb->known_ips[header->destnode];
2684
2685         t = NULL;
2686         for (i = 0; i < ips->num; i++) {
2687                 if (ctdb_sock_addr_same_ip(&ips->ip[i].addr, &ip->addr)) {
2688                         t = &ips->ip[i];
2689                         break;
2690                 }
2691         }
2692         if (t == NULL) {
2693                 D_INFO("TAKEOVER_IP %s - not a public IP\n",
2694                        ctdb_sock_addr_to_string(mem_ctx, &ip->addr, false));
2695                 goto done;
2696         }
2697
2698         if (t->pnn == header->destnode) {
2699                 D_INFO("TAKEOVER_IP %s - redundant\n",
2700                        ctdb_sock_addr_to_string(mem_ctx, &ip->addr, false));
2701         } else {
2702                 D_NOTICE("TAKEOVER_IP %s\n",
2703                          ctdb_sock_addr_to_string(mem_ctx, &ip->addr, false));
2704                 t->pnn = ip->pnn;
2705         }
2706
2707 done:
2708         reply.status = 0;
2709         reply.errmsg = NULL;
2710         client_send_control(req, header, &reply);
2711 }
2712
2713 static void control_get_public_ips(TALLOC_CTX *mem_ctx,
2714                                    struct tevent_req *req,
2715                                    struct ctdb_req_header *header,
2716                                    struct ctdb_req_control *request)
2717 {
2718         struct client_state *state = tevent_req_data(
2719                 req, struct client_state);
2720         struct ctdbd_context *ctdb = state->ctdb;
2721         struct ctdb_reply_control reply;
2722         struct ctdb_public_ip_list *ips = NULL;
2723
2724         reply.rdata.opcode = request->opcode;
2725
2726         if (ctdb->known_ips == NULL) {
2727                 /* No IPs defined so create a dummy empty struct and ship it */
2728                 ips = talloc_zero(mem_ctx, struct ctdb_public_ip_list);;
2729                 if (ips == NULL) {
2730                         reply.status = ENOMEM;
2731                         reply.errmsg = "Memory error";
2732                         goto done;
2733                 }
2734                 goto ok;
2735         }
2736
2737         ips = &ctdb->known_ips[header->destnode];
2738
2739         if (request->flags & CTDB_PUBLIC_IP_FLAGS_ONLY_AVAILABLE) {
2740                 /* If runstate is not RUNNING or a node is then return
2741                  * no available IPs.  Don't worry about interface
2742                  * states here - we're not faking down to that level.
2743                  */
2744                 uint32_t flags = ctdb->node_map->node[header->destnode].flags;
2745                 if (ctdb->runstate != CTDB_RUNSTATE_RUNNING ||
2746                     ((flags & (NODE_FLAGS_INACTIVE|NODE_FLAGS_DISABLED)) != 0)) {
2747                         /* No available IPs: return dummy empty struct */
2748                         ips = talloc_zero(mem_ctx, struct ctdb_public_ip_list);;
2749                         if (ips == NULL) {
2750                                 reply.status = ENOMEM;
2751                                 reply.errmsg = "Memory error";
2752                                 goto done;
2753                         }
2754                 }
2755         }
2756
2757 ok:
2758         reply.rdata.data.pubip_list = ips;
2759         reply.status = 0;
2760         reply.errmsg = NULL;
2761
2762 done:
2763         client_send_control(req, header, &reply);
2764 }
2765
2766 static void control_get_nodemap(TALLOC_CTX *mem_ctx,
2767                                 struct tevent_req *req,
2768                                 struct ctdb_req_header *header,
2769                                 struct ctdb_req_control *request)
2770 {
2771         struct client_state *state = tevent_req_data(
2772                 req, struct client_state);
2773         struct ctdbd_context *ctdb = state->ctdb;
2774         struct ctdb_reply_control reply;
2775         struct ctdb_node_map *nodemap;
2776         struct node *node;
2777         unsigned int i;
2778
2779         reply.rdata.opcode = request->opcode;
2780
2781         nodemap = talloc_zero(mem_ctx, struct ctdb_node_map);
2782         if (nodemap == NULL) {
2783                 goto fail;
2784         }
2785
2786         nodemap->num = ctdb->node_map->num_nodes;
2787         nodemap->node = talloc_array(nodemap, struct ctdb_node_and_flags,
2788                                      nodemap->num);
2789         if (nodemap->node == NULL) {
2790                 goto fail;
2791         }
2792
2793         for (i=0; i<nodemap->num; i++) {
2794                 node = &ctdb->node_map->node[i];
2795                 nodemap->node[i] = (struct ctdb_node_and_flags) {
2796                         .pnn = node->pnn,
2797                         .flags = node->flags,
2798                         .addr = node->addr,
2799                 };
2800         }
2801
2802         reply.rdata.data.nodemap = nodemap;
2803         reply.status = 0;
2804         reply.errmsg = NULL;
2805         client_send_control(req, header, &reply);
2806         return;
2807
2808 fail:
2809         reply.status = -1;
2810         reply.errmsg = "Memory error";
2811         client_send_control(req, header, &reply);
2812 }
2813
2814 static void control_get_reclock_file(TALLOC_CTX *mem_ctx,
2815                                      struct tevent_req *req,
2816                                      struct ctdb_req_header *header,
2817                                      struct ctdb_req_control *request)
2818 {
2819         struct client_state *state = tevent_req_data(
2820                 req, struct client_state);
2821         struct ctdbd_context *ctdb = state->ctdb;
2822         struct ctdb_reply_control reply;
2823
2824         reply.rdata.opcode = request->opcode;
2825
2826         if (ctdb->reclock != NULL) {
2827                 reply.rdata.data.reclock_file =
2828                         talloc_strdup(mem_ctx, ctdb->reclock);
2829                 if (reply.rdata.data.reclock_file == NULL) {
2830                         reply.status = ENOMEM;
2831                         reply.errmsg = "Memory error";
2832                         goto done;
2833                 }
2834         } else {
2835                 reply.rdata.data.reclock_file = NULL;
2836         }
2837
2838         reply.status = 0;
2839         reply.errmsg = NULL;
2840
2841 done:
2842         client_send_control(req, header, &reply);
2843 }
2844
2845 static void control_stop_node(TALLOC_CTX *mem_ctx,
2846                               struct tevent_req *req,
2847                               struct ctdb_req_header *header,
2848                               struct ctdb_req_control *request)
2849 {
2850         struct client_state *state = tevent_req_data(
2851                 req, struct client_state);
2852         struct ctdbd_context *ctdb = state->ctdb;
2853         struct ctdb_reply_control reply;
2854
2855         reply.rdata.opcode = request->opcode;
2856
2857         DEBUG(DEBUG_INFO, ("Stopping node\n"));
2858         ctdb->node_map->node[header->destnode].flags |= NODE_FLAGS_STOPPED;
2859
2860         reply.status = 0;
2861         reply.errmsg = NULL;
2862
2863         client_send_control(req, header, &reply);
2864         return;
2865 }
2866
2867 static void control_continue_node(TALLOC_CTX *mem_ctx,
2868                                   struct tevent_req *req,
2869                                   struct ctdb_req_header *header,
2870                                   struct ctdb_req_control *request)
2871 {
2872         struct client_state *state = tevent_req_data(
2873                 req, struct client_state);
2874         struct ctdbd_context *ctdb = state->ctdb;
2875         struct ctdb_reply_control reply;
2876
2877         reply.rdata.opcode = request->opcode;
2878
2879         DEBUG(DEBUG_INFO, ("Continue node\n"));
2880         ctdb->node_map->node[header->destnode].flags &= ~NODE_FLAGS_STOPPED;
2881
2882         reply.status = 0;
2883         reply.errmsg = NULL;
2884
2885         client_send_control(req, header, &reply);
2886         return;
2887 }
2888
2889 static void set_ban_state_callback(struct tevent_req *subreq)
2890 {
2891         struct node *node = tevent_req_callback_data(
2892                 subreq, struct node);
2893         bool status;
2894
2895         status = tevent_wakeup_recv(subreq);
2896         TALLOC_FREE(subreq);
2897         if (! status) {
2898                 DEBUG(DEBUG_INFO, ("tevent_wakeup_recv failed\n"));
2899         }
2900
2901         node->flags &= ~NODE_FLAGS_BANNED;
2902 }
2903
2904 static void control_set_ban_state(TALLOC_CTX *mem_ctx,
2905                                   struct tevent_req *req,
2906                                   struct ctdb_req_header *header,
2907                                   struct ctdb_req_control *request)
2908 {
2909         struct client_state *state = tevent_req_data(
2910                 req, struct client_state);
2911         struct tevent_req *subreq;
2912         struct ctdbd_context *ctdb = state->ctdb;
2913         struct ctdb_ban_state *ban = request->rdata.data.ban_state;
2914         struct ctdb_reply_control reply;
2915         struct node *node;
2916
2917         reply.rdata.opcode = request->opcode;
2918
2919         if (ban->pnn != header->destnode) {
2920                 DEBUG(DEBUG_INFO,
2921                       ("SET_BAN_STATE control for PNN %d rejected\n",
2922                        ban->pnn));
2923                 reply.status = EINVAL;
2924                 goto fail;
2925         }
2926
2927         node = &ctdb->node_map->node[header->destnode];
2928
2929         if (ban->time == 0) {
2930                 DEBUG(DEBUG_INFO,("Unbanning this node\n"));
2931                 node->flags &= ~NODE_FLAGS_BANNED;
2932                 goto done;
2933         }
2934
2935         subreq = tevent_wakeup_send(ctdb->node_map, state->ev,
2936                                     tevent_timeval_current_ofs(
2937                                             ban->time, 0));
2938         if (subreq == NULL) {
2939                 reply.status = ENOMEM;
2940                 goto fail;
2941         }
2942         tevent_req_set_callback(subreq, set_ban_state_callback, node);
2943
2944         DEBUG(DEBUG_INFO, ("Banning this node for %d seconds\n", ban->time));
2945         node->flags |= NODE_FLAGS_BANNED;
2946         ctdb->vnn_map->generation = INVALID_GENERATION;
2947
2948 done:
2949         reply.status = 0;
2950         reply.errmsg = NULL;
2951
2952         client_send_control(req, header, &reply);
2953         return;
2954
2955 fail:
2956         reply.errmsg = "Failed to ban node";
2957 }
2958
2959 static void control_trans3_commit(TALLOC_CTX *mem_ctx,
2960                                   struct tevent_req *req,
2961                                   struct ctdb_req_header *header,
2962                                   struct ctdb_req_control *request)
2963 {
2964         struct client_state *state = tevent_req_data(
2965                 req, struct client_state);
2966         struct ctdbd_context *ctdb = state->ctdb;
2967         struct ctdb_reply_control reply;
2968         struct database *db;
2969         int ret;
2970
2971         reply.rdata.opcode = request->opcode;
2972
2973         db = database_find(ctdb->db_map, request->rdata.data.recbuf->db_id);
2974         if (db == NULL) {
2975                 reply.status = -1;
2976                 reply.errmsg = "Unknown database";
2977                 client_send_control(req, header, &reply);
2978                 return;
2979         }
2980
2981         if (! (db->flags &
2982                (CTDB_DB_FLAGS_PERSISTENT|CTDB_DB_FLAGS_REPLICATED))) {
2983                 reply.status = -1;
2984                 reply.errmsg = "Transactions on volatile database";
2985                 client_send_control(req, header, &reply);
2986                 return;
2987         }
2988
2989         ret = ltdb_transaction(db, request->rdata.data.recbuf);
2990         if (ret != 0) {
2991                 reply.status = -1;
2992                 reply.errmsg = "Transaction failed";
2993                 client_send_control(req, header, &reply);
2994                 return;
2995         }
2996
2997         reply.status = 0;
2998         reply.errmsg = NULL;
2999         client_send_control(req, header, &reply);
3000 }
3001
3002 static void control_get_db_seqnum(TALLOC_CTX *mem_ctx,
3003                                struct tevent_req *req,
3004                                struct ctdb_req_header *header,
3005                                struct ctdb_req_control *request)
3006 {
3007         struct client_state *state = tevent_req_data(
3008                 req, struct client_state);
3009         struct ctdbd_context *ctdb = state->ctdb;
3010         struct ctdb_reply_control reply;
3011         struct database *db;
3012         int ret;
3013
3014         reply.rdata.opcode = request->opcode;
3015
3016         db = database_find(ctdb->db_map, request->rdata.data.db_id);
3017         if (db == NULL) {
3018                 reply.status = ENOENT;
3019                 reply.errmsg = "Database not found";
3020         } else {
3021                 uint64_t seqnum;
3022
3023                 ret = database_seqnum(db, &seqnum);
3024                 if (ret == 0) {
3025                         reply.rdata.data.seqnum = seqnum;
3026                         reply.status = 0;
3027                         reply.errmsg = NULL;
3028                 } else {
3029                         reply.status = ret;
3030                         reply.errmsg = "Failed to get seqnum";
3031                 }
3032         }
3033
3034         client_send_control(req, header, &reply);
3035 }
3036
3037 static void control_db_get_health(TALLOC_CTX *mem_ctx,
3038                                   struct tevent_req *req,
3039                                   struct ctdb_req_header *header,
3040                                   struct ctdb_req_control *request)
3041 {
3042         struct client_state *state = tevent_req_data(
3043                 req, struct client_state);
3044         struct ctdbd_context *ctdb = state->ctdb;
3045         struct ctdb_reply_control reply;
3046         struct database *db;
3047
3048         reply.rdata.opcode = request->opcode;
3049
3050         db = database_find(ctdb->db_map, request->rdata.data.db_id);
3051         if (db == NULL) {
3052                 reply.status = ENOENT;
3053                 reply.errmsg = "Database not found";
3054         } else {
3055                 reply.rdata.data.reason = NULL;
3056                 reply.status = 0;
3057                 reply.errmsg = NULL;
3058         }
3059
3060         client_send_control(req, header, &reply);
3061 }
3062
3063 static struct ctdb_iface_list *get_ctdb_iface_list(TALLOC_CTX *mem_ctx,
3064                                                    struct ctdbd_context *ctdb)
3065 {
3066         struct ctdb_iface_list *iface_list;
3067         struct interface *iface;
3068         unsigned int i;
3069
3070         iface_list = talloc_zero(mem_ctx, struct ctdb_iface_list);
3071         if (iface_list == NULL) {
3072                 goto done;
3073         }
3074
3075         iface_list->num = ctdb->iface_map->num;
3076         iface_list->iface = talloc_array(iface_list, struct ctdb_iface,
3077                                          iface_list->num);
3078         if (iface_list->iface == NULL) {
3079                 TALLOC_FREE(iface_list);
3080                 goto done;
3081         }
3082
3083         for (i=0; i<iface_list->num; i++) {
3084                 iface = &ctdb->iface_map->iface[i];
3085                 iface_list->iface[i] = (struct ctdb_iface) {
3086                         .link_state = iface->link_up,
3087                         .references = iface->references,
3088                 };
3089                 strlcpy(iface_list->iface[i].name, iface->name,
3090                         sizeof(iface_list->iface[i].name));
3091         }
3092
3093 done:
3094         return iface_list;
3095 }
3096
3097 static void control_get_public_ip_info(TALLOC_CTX *mem_ctx,
3098                                        struct tevent_req *req,
3099                                        struct ctdb_req_header *header,
3100                                        struct ctdb_req_control *request)
3101 {
3102         struct client_state *state = tevent_req_data(
3103                 req, struct client_state);
3104         struct ctdbd_context *ctdb = state->ctdb;
3105         struct ctdb_reply_control reply;
3106         ctdb_sock_addr *addr = request->rdata.data.addr;
3107         struct ctdb_public_ip_list *known = NULL;
3108         struct ctdb_public_ip_info *info = NULL;
3109         unsigned i;
3110
3111         reply.rdata.opcode = request->opcode;
3112
3113         info = talloc_zero(mem_ctx, struct ctdb_public_ip_info);
3114         if (info == NULL) {
3115                 reply.status = ENOMEM;
3116                 reply.errmsg = "Memory error";
3117                 goto done;
3118         }
3119
3120         reply.rdata.data.ipinfo = info;
3121
3122         if (ctdb->known_ips != NULL) {
3123                 known = &ctdb->known_ips[header->destnode];
3124         } else {
3125                 /* No IPs defined so create a dummy empty struct and
3126                  * fall through.  The given IP won't be matched
3127                  * below...
3128                  */
3129                 known = talloc_zero(mem_ctx, struct ctdb_public_ip_list);;
3130                 if (known == NULL) {
3131                         reply.status = ENOMEM;
3132                         reply.errmsg = "Memory error";
3133                         goto done;
3134                 }
3135         }
3136
3137         for (i = 0; i < known->num; i++) {
3138                 if (ctdb_sock_addr_same_ip(&known->ip[i].addr,
3139                                            addr)) {
3140                         break;
3141                 }
3142         }
3143
3144         if (i == known->num) {
3145                 D_ERR("GET_PUBLIC_IP_INFO: not known public IP %s\n",
3146                       ctdb_sock_addr_to_string(mem_ctx, addr, false));
3147                 reply.status = -1;
3148                 reply.errmsg = "Unknown address";
3149                 goto done;
3150         }
3151
3152         info->ip = known->ip[i];
3153
3154         /* The fake PUBLICIPS stanza and resulting known_ips data
3155          * don't know anything about interfaces, so completely fake
3156          * this.
3157          */
3158         info->active_idx = 0;
3159
3160         info->ifaces = get_ctdb_iface_list(mem_ctx, ctdb);
3161         if (info->ifaces == NULL) {
3162                 reply.status = ENOMEM;
3163                 reply.errmsg = "Memory error";
3164                 goto done;
3165         }
3166
3167         reply.status = 0;
3168         reply.errmsg = NULL;
3169
3170 done:
3171         client_send_control(req, header, &reply);
3172 }
3173
3174 static void control_get_ifaces(TALLOC_CTX *mem_ctx,
3175                                struct tevent_req *req,
3176                                struct ctdb_req_header *header,
3177                                struct ctdb_req_control *request)
3178 {
3179         struct client_state *state = tevent_req_data(
3180                 req, struct client_state);
3181         struct ctdbd_context *ctdb = state->ctdb;
3182         struct ctdb_reply_control reply;
3183         struct ctdb_iface_list *iface_list;
3184
3185         reply.rdata.opcode = request->opcode;
3186
3187         iface_list = get_ctdb_iface_list(mem_ctx, ctdb);
3188         if (iface_list == NULL) {
3189                 goto fail;
3190         }
3191
3192         reply.rdata.data.iface_list = iface_list;
3193         reply.status = 0;
3194         reply.errmsg = NULL;
3195         client_send_control(req, header, &reply);
3196         return;
3197
3198 fail:
3199         reply.status = -1;
3200         reply.errmsg = "Memory error";
3201         client_send_control(req, header, &reply);
3202 }
3203
3204 static void control_set_iface_link_state(TALLOC_CTX *mem_ctx,
3205                                          struct tevent_req *req,
3206                                          struct ctdb_req_header *header,
3207                                          struct ctdb_req_control *request)
3208 {
3209         struct client_state *state = tevent_req_data(
3210                 req, struct client_state);
3211         struct ctdbd_context *ctdb = state->ctdb;
3212         struct ctdb_reply_control reply;
3213         struct ctdb_iface *in_iface;
3214         struct interface *iface = NULL;
3215         bool link_up = false;
3216         int i;
3217
3218         reply.rdata.opcode = request->opcode;
3219
3220         in_iface = request->rdata.data.iface;
3221
3222         if (in_iface->name[CTDB_IFACE_SIZE] != '\0') {
3223                 reply.errmsg = "interface name not terminated";
3224                 goto fail;
3225         }
3226
3227         switch (in_iface->link_state) {
3228                 case 0:
3229                         link_up = false;
3230                         break;
3231
3232                 case 1:
3233                         link_up = true;
3234                         break;
3235
3236                 default:
3237                         reply.errmsg = "invalid link state";
3238                         goto fail;
3239         }
3240
3241         if (in_iface->references != 0) {
3242                 reply.errmsg = "references should be 0";
3243                 goto fail;
3244         }
3245
3246         for (i=0; i<ctdb->iface_map->num; i++) {
3247                 if (strcmp(ctdb->iface_map->iface[i].name,
3248                            in_iface->name) == 0) {
3249                         iface = &ctdb->iface_map->iface[i];
3250                         break;
3251                 }
3252         }
3253
3254         if (iface == NULL) {
3255                 reply.errmsg = "interface not found";
3256                 goto fail;
3257         }
3258
3259         iface->link_up = link_up;
3260
3261         reply.status = 0;
3262         reply.errmsg = NULL;
3263         client_send_control(req, header, &reply);
3264         return;
3265
3266 fail:
3267         reply.status = -1;
3268         client_send_control(req, header, &reply);
3269 }
3270
3271 static void control_set_db_readonly(TALLOC_CTX *mem_ctx,
3272                                     struct tevent_req *req,
3273                                     struct ctdb_req_header *header,
3274                                     struct ctdb_req_control *request)
3275 {
3276         struct client_state *state = tevent_req_data(
3277                 req, struct client_state);
3278         struct ctdbd_context *ctdb = state->ctdb;
3279         struct ctdb_reply_control reply;
3280         struct database *db;
3281
3282         reply.rdata.opcode = request->opcode;
3283
3284         db = database_find(ctdb->db_map, request->rdata.data.db_id);
3285         if (db == NULL) {
3286                 reply.status = ENOENT;
3287                 reply.errmsg = "Database not found";
3288                 goto done;
3289         }
3290
3291         if (db->flags & CTDB_DB_FLAGS_PERSISTENT) {
3292                 reply.status = EINVAL;
3293                 reply.errmsg = "Can not set READONLY on persistent db";
3294                 goto done;
3295         }
3296
3297         db->flags |= CTDB_DB_FLAGS_READONLY;
3298         reply.status = 0;
3299         reply.errmsg = NULL;
3300
3301 done:
3302         client_send_control(req, header, &reply);
3303 }
3304
3305 struct traverse_start_ext_state {
3306         struct tevent_req *req;
3307         struct ctdb_req_header *header;
3308         uint32_t reqid;
3309         uint64_t srvid;
3310         bool withemptyrecords;
3311         int status;
3312 };
3313
3314 static int traverse_start_ext_handler(struct tdb_context *tdb,
3315                                       TDB_DATA key, TDB_DATA data,
3316                                       void *private_data)
3317 {
3318         struct traverse_start_ext_state *state =
3319                 (struct traverse_start_ext_state *)private_data;
3320         struct ctdb_rec_data rec;
3321         struct ctdb_req_message_data message;
3322         size_t np;
3323
3324         if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
3325                 return 0;
3326         }
3327
3328         if ((data.dsize == sizeof(struct ctdb_ltdb_header)) &&
3329             (!state->withemptyrecords)) {
3330                 return 0;
3331         }
3332
3333         rec = (struct ctdb_rec_data) {
3334                 .reqid = state->reqid,
3335                 .header = NULL,
3336                 .key = key,
3337                 .data = data,
3338         };
3339
3340         message.srvid = state->srvid;
3341         message.data.dsize = ctdb_rec_data_len(&rec);
3342         message.data.dptr = talloc_size(state->req, message.data.dsize);
3343         if (message.data.dptr == NULL) {
3344                 state->status = ENOMEM;
3345                 return 1;
3346         }
3347
3348         ctdb_rec_data_push(&rec, message.data.dptr, &np);
3349         client_send_message(state->req, state->header, &message);
3350
3351         talloc_free(message.data.dptr);
3352
3353         return 0;
3354 }
3355
3356 static void control_traverse_start_ext(TALLOC_CTX *mem_ctx,
3357                                        struct tevent_req *req,
3358                                        struct ctdb_req_header *header,
3359                                        struct ctdb_req_control *request)
3360 {
3361         struct client_state *state = tevent_req_data(
3362                 req, struct client_state);
3363         struct ctdbd_context *ctdb = state->ctdb;
3364         struct ctdb_reply_control reply;
3365         struct database *db;
3366         struct ctdb_traverse_start_ext *ext;
3367         struct traverse_start_ext_state t_state;
3368         struct ctdb_rec_data rec;
3369         struct ctdb_req_message_data message;
3370         uint8_t buffer[32];
3371         size_t np;
3372         int ret;
3373
3374         reply.rdata.opcode = request->opcode;
3375
3376         ext = request->rdata.data.traverse_start_ext;
3377
3378         db = database_find(ctdb->db_map, ext->db_id);
3379         if (db == NULL) {
3380                 reply.status = -1;
3381                 reply.errmsg = "Unknown database";
3382                 client_send_control(req, header, &reply);
3383                 return;
3384         }
3385
3386         t_state = (struct traverse_start_ext_state) {
3387                 .req = req,
3388                 .header = header,
3389                 .reqid = ext->reqid,
3390                 .srvid = ext->srvid,
3391                 .withemptyrecords = ext->withemptyrecords,
3392         };
3393
3394         ret = tdb_traverse_read(db->tdb, traverse_start_ext_handler, &t_state);
3395         DEBUG(DEBUG_INFO, ("traversed %d records\n", ret));
3396         if (t_state.status != 0) {
3397                 reply.status = -1;
3398                 reply.errmsg = "Memory error";
3399                 client_send_control(req, header, &reply);
3400         }
3401
3402         reply.status = 0;
3403         client_send_control(req, header, &reply);
3404
3405         rec = (struct ctdb_rec_data) {
3406                 .reqid = ext->reqid,
3407                 .header = NULL,
3408                 .key = tdb_null,
3409                 .data = tdb_null,
3410         };
3411
3412         message.srvid = ext->srvid;
3413         message.data.dsize = ctdb_rec_data_len(&rec);
3414         ctdb_rec_data_push(&rec, buffer, &np);
3415         message.data.dptr = buffer;
3416         client_send_message(req, header, &message);
3417 }
3418
3419 static void control_set_db_sticky(TALLOC_CTX *mem_ctx,
3420                                     struct tevent_req *req,
3421                                     struct ctdb_req_header *header,
3422                                     struct ctdb_req_control *request)
3423 {
3424         struct client_state *state = tevent_req_data(
3425                 req, struct client_state);
3426         struct ctdbd_context *ctdb = state->ctdb;
3427         struct ctdb_reply_control reply;
3428         struct database *db;
3429
3430         reply.rdata.opcode = request->opcode;
3431
3432         db = database_find(ctdb->db_map, request->rdata.data.db_id);
3433         if (db == NULL) {
3434                 reply.status = ENOENT;
3435                 reply.errmsg = "Database not found";
3436                 goto done;
3437         }
3438
3439         if (db->flags & CTDB_DB_FLAGS_PERSISTENT) {
3440                 reply.status = EINVAL;
3441                 reply.errmsg = "Can not set STICKY on persistent db";
3442                 goto done;
3443         }
3444
3445         db->flags |= CTDB_DB_FLAGS_STICKY;
3446         reply.status = 0;
3447         reply.errmsg = NULL;
3448
3449 done:
3450         client_send_control(req, header, &reply);
3451 }
3452
3453 static void control_start_ipreallocate(TALLOC_CTX *mem_ctx,
3454                                        struct tevent_req *req,
3455                                        struct ctdb_req_header *header,
3456                                        struct ctdb_req_control *request)
3457 {
3458         struct ctdb_reply_control reply;
3459
3460         /* Always succeed */
3461         reply.rdata.opcode = request->opcode;
3462         reply.status = 0;
3463         reply.errmsg = NULL;
3464
3465         client_send_control(req, header, &reply);
3466 }
3467
3468 static void control_ipreallocated(TALLOC_CTX *mem_ctx,
3469                                   struct tevent_req *req,
3470                                   struct ctdb_req_header *header,
3471                                   struct ctdb_req_control *request)
3472 {
3473         struct ctdb_reply_control reply;
3474
3475         /* Always succeed */
3476         reply.rdata.opcode = request->opcode;
3477         reply.status = 0;
3478         reply.errmsg = NULL;
3479
3480         client_send_control(req, header, &reply);
3481 }
3482
3483 static void control_get_runstate(TALLOC_CTX *mem_ctx,
3484                                  struct tevent_req *req,
3485                                  struct ctdb_req_header *header,
3486                                  struct ctdb_req_control *request)
3487 {
3488         struct client_state *state = tevent_req_data(
3489                 req, struct client_state);
3490         struct ctdbd_context *ctdb = state->ctdb;
3491         struct ctdb_reply_control reply;
3492
3493         reply.rdata.opcode = request->opcode;
3494         reply.rdata.data.runstate = ctdb->runstate;
3495         reply.status = 0;
3496         reply.errmsg = NULL;
3497
3498         client_send_control(req, header, &reply);
3499 }
3500
3501 static void control_get_nodes_file(TALLOC_CTX *mem_ctx,
3502                                    struct tevent_req *req,
3503                                    struct ctdb_req_header *header,
3504                                    struct ctdb_req_control *request)
3505 {
3506         struct ctdb_reply_control reply;
3507         struct ctdb_node_map *nodemap;
3508
3509         reply.rdata.opcode = request->opcode;
3510
3511         nodemap = read_nodes_file(mem_ctx, header->destnode);
3512         if (nodemap == NULL) {
3513                 goto fail;
3514         }
3515
3516         reply.rdata.data.nodemap = nodemap;
3517         reply.status = 0;
3518         reply.errmsg = NULL;
3519         client_send_control(req, header, &reply);
3520         return;
3521
3522 fail:
3523         reply.status = -1;
3524         reply.errmsg = "Failed to read nodes file";
3525         client_send_control(req, header, &reply);
3526 }
3527
3528 static void control_db_open_flags(TALLOC_CTX *mem_ctx,
3529                                   struct tevent_req *req,
3530                                   struct ctdb_req_header *header,
3531                                   struct ctdb_req_control *request)
3532 {
3533         struct client_state *state = tevent_req_data(
3534                 req, struct client_state);
3535         struct ctdbd_context *ctdb = state->ctdb;
3536         struct ctdb_reply_control reply;
3537         struct database *db;
3538
3539         reply.rdata.opcode = request->opcode;
3540
3541         db = database_find(ctdb->db_map, request->rdata.data.db_id);
3542         if (db == NULL) {
3543                 reply.status = ENOENT;
3544                 reply.errmsg = "Database not found";
3545         } else {
3546                 reply.rdata.data.tdb_flags = database_flags(db->flags);
3547                 reply.status = 0;
3548                 reply.errmsg = NULL;
3549         }
3550
3551         client_send_control(req, header, &reply);
3552 }
3553
3554 static void control_db_attach_replicated(TALLOC_CTX *mem_ctx,
3555                                          struct tevent_req *req,
3556                                          struct ctdb_req_header *header,
3557                                          struct ctdb_req_control *request)
3558 {
3559         struct client_state *state = tevent_req_data(
3560                 req, struct client_state);
3561         struct ctdbd_context *ctdb = state->ctdb;
3562         struct ctdb_reply_control reply;
3563         struct database *db;
3564
3565         reply.rdata.opcode = request->opcode;
3566
3567         for (db = ctdb->db_map->db; db != NULL; db = db->next) {
3568                 if (strcmp(db->name, request->rdata.data.db_name) == 0) {
3569                         goto done;
3570                 }
3571         }
3572
3573         db = database_new(ctdb->db_map, request->rdata.data.db_name,
3574                           CTDB_DB_FLAGS_REPLICATED);
3575         if (db == NULL) {
3576                 reply.status = -1;
3577                 reply.errmsg = "Failed to attach database";
3578                 client_send_control(req, header, &reply);
3579                 return;
3580         }
3581
3582 done:
3583         reply.rdata.data.db_id = db->id;
3584         reply.status = 0;
3585         reply.errmsg = NULL;
3586         client_send_control(req, header, &reply);
3587 }
3588
3589 static void control_check_pid_srvid(TALLOC_CTX *mem_ctx,
3590                                     struct tevent_req *req,
3591                                     struct ctdb_req_header *header,
3592                                     struct ctdb_req_control *request)
3593 {
3594         struct client_state *state = tevent_req_data(
3595                 req, struct client_state);
3596         struct ctdbd_context *ctdb = state->ctdb;
3597         struct ctdb_client *client;
3598         struct client_state *cstate;
3599         struct ctdb_reply_control reply;
3600         bool pid_found, srvid_found;
3601         int ret;
3602
3603         reply.rdata.opcode = request->opcode;
3604
3605         pid_found = false;
3606         srvid_found = false;
3607
3608         for (client=ctdb->client_list; client != NULL; client=client->next) {
3609                 if (client->pid == request->rdata.data.pid_srvid->pid) {
3610                         pid_found = true;
3611                         cstate = (struct client_state *)client->state;
3612                         ret = srvid_exists(ctdb->srv,
3613                                            request->rdata.data.pid_srvid->srvid,
3614                                            cstate);
3615                         if (ret == 0) {
3616                                 srvid_found = true;
3617                                 ret = kill(cstate->pid, 0);
3618                                 if (ret != 0) {
3619                                         reply.status = ret;
3620                                         reply.errmsg = strerror(errno);
3621                                 } else {
3622                                         reply.status = 0;
3623                                         reply.errmsg = NULL;
3624                                 }
3625                         }
3626                 }
3627         }
3628
3629         if (! pid_found) {
3630                 reply.status = -1;
3631                 reply.errmsg = "No client for PID";
3632         } else if (! srvid_found) {
3633                 reply.status = -1;
3634                 reply.errmsg = "No client for PID and SRVID";
3635         }
3636
3637         client_send_control(req, header, &reply);
3638 }
3639
3640 static void control_disable_node(TALLOC_CTX *mem_ctx,
3641                                  struct tevent_req *req,
3642                                  struct ctdb_req_header *header,
3643                                  struct ctdb_req_control *request)
3644 {
3645         struct client_state *state = tevent_req_data(
3646                 req, struct client_state);
3647         struct ctdbd_context *ctdb = state->ctdb;
3648         struct ctdb_reply_control reply;
3649
3650         reply.rdata.opcode = request->opcode;
3651
3652         DEBUG(DEBUG_INFO, ("Disabling node\n"));
3653         ctdb->node_map->node[header->destnode].flags |=
3654                 NODE_FLAGS_PERMANENTLY_DISABLED;
3655
3656         reply.status = 0;
3657         reply.errmsg = NULL;
3658
3659         client_send_control(req, header, &reply);
3660         return;
3661 }
3662
3663 static void control_enable_node(TALLOC_CTX *mem_ctx,
3664                                   struct tevent_req *req,
3665                                   struct ctdb_req_header *header,
3666                                   struct ctdb_req_control *request)
3667 {
3668         struct client_state *state = tevent_req_data(
3669                 req, struct client_state);
3670         struct ctdbd_context *ctdb = state->ctdb;
3671         struct ctdb_reply_control reply;
3672
3673         reply.rdata.opcode = request->opcode;
3674
3675         DEBUG(DEBUG_INFO, ("Enable node\n"));
3676         ctdb->node_map->node[header->destnode].flags &=
3677                 ~NODE_FLAGS_PERMANENTLY_DISABLED;
3678
3679         reply.status = 0;
3680         reply.errmsg = NULL;
3681
3682         client_send_control(req, header, &reply);
3683         return;
3684 }
3685
3686 static bool fake_control_failure(TALLOC_CTX *mem_ctx,
3687                                  struct tevent_req *req,
3688                                  struct ctdb_req_header *header,
3689                                  struct ctdb_req_control *request)
3690 {
3691         struct client_state *state = tevent_req_data(
3692                 req, struct client_state);
3693         struct ctdbd_context *ctdb = state->ctdb;
3694         struct ctdb_reply_control reply;
3695         struct fake_control_failure *f = NULL;
3696
3697         D_DEBUG("Checking fake control failure for control %u on node %u\n",
3698                 request->opcode, header->destnode);
3699         for (f = ctdb->control_failures; f != NULL; f = f->next) {
3700                 if (f->opcode == request->opcode &&
3701                     (f->pnn == header->destnode ||
3702                      f->pnn == CTDB_UNKNOWN_PNN)) {
3703
3704                         reply.rdata.opcode = request->opcode;
3705                         if (strcmp(f->error, "TIMEOUT") == 0) {
3706                                 /* Causes no reply */
3707                                 D_ERR("Control %u fake timeout on node %u\n",
3708                                       request->opcode, header->destnode);
3709                                 return true;
3710                         } else if (strcmp(f->error, "ERROR") == 0) {
3711                                 D_ERR("Control %u fake error on node %u\n",
3712                                       request->opcode, header->destnode);
3713                                 reply.status = -1;
3714                                 reply.errmsg = f->comment;
3715                                 client_send_control(req, header, &reply);
3716                                 return true;
3717                         }
3718                 }
3719         }
3720
3721         return false;
3722 }
3723
3724 static void control_error(TALLOC_CTX *mem_ctx,
3725                           struct tevent_req *req,
3726                           struct ctdb_req_header *header,
3727                           struct ctdb_req_control *request)
3728 {
3729         struct ctdb_reply_control reply;
3730
3731         D_DEBUG("Control %u not implemented\n", request->opcode);
3732
3733         reply.rdata.opcode = request->opcode;
3734         reply.status = -1;
3735         reply.errmsg = "Not implemented";
3736
3737         client_send_control(req, header, &reply);
3738 }
3739
3740 /*
3741  * Handling protocol - messages
3742  */
3743
3744 struct disable_recoveries_state {
3745         struct node *node;
3746 };
3747
3748 static void disable_recoveries_callback(struct tevent_req *subreq)
3749 {
3750         struct disable_recoveries_state *substate = tevent_req_callback_data(
3751                 subreq, struct disable_recoveries_state);
3752         bool status;
3753
3754         status = tevent_wakeup_recv(subreq);
3755         TALLOC_FREE(subreq);
3756         if (! status) {
3757                 DEBUG(DEBUG_INFO, ("tevent_wakeup_recv failed\n"));
3758         }
3759
3760         substate->node->recovery_disabled = false;
3761         TALLOC_FREE(substate->node->recovery_substate);
3762 }
3763
3764 static void message_disable_recoveries(TALLOC_CTX *mem_ctx,
3765                                        struct tevent_req *req,
3766                                        struct ctdb_req_header *header,
3767                                        struct ctdb_req_message *request)
3768 {
3769         struct client_state *state = tevent_req_data(
3770                 req, struct client_state);
3771         struct tevent_req *subreq;
3772         struct ctdbd_context *ctdb = state->ctdb;
3773         struct disable_recoveries_state *substate;
3774         struct ctdb_disable_message *disable = request->data.disable;
3775         struct ctdb_req_message_data reply;
3776         struct node *node;
3777         int ret = -1;
3778         TDB_DATA data;
3779
3780         node = &ctdb->node_map->node[header->destnode];
3781
3782         if (disable->timeout == 0) {
3783                 TALLOC_FREE(node->recovery_substate);
3784                 node->recovery_disabled = false;
3785                 DEBUG(DEBUG_INFO, ("Enabled recoveries on node %u\n",
3786                                    header->destnode));
3787                 goto done;
3788         }
3789
3790         substate = talloc_zero(ctdb->node_map,
3791                                struct disable_recoveries_state);
3792         if (substate == NULL) {
3793                 goto fail;
3794         }
3795
3796         substate->node = node;
3797
3798         subreq = tevent_wakeup_send(substate, state->ev,
3799                                     tevent_timeval_current_ofs(
3800                                             disable->timeout, 0));
3801         if (subreq == NULL) {
3802                 talloc_free(substate);
3803                 goto fail;
3804         }
3805         tevent_req_set_callback(subreq, disable_recoveries_callback, substate);
3806
3807         DEBUG(DEBUG_INFO, ("Disabled recoveries for %d seconds on node %u\n",
3808                            disable->timeout, header->destnode));
3809         node->recovery_substate = substate;
3810         node->recovery_disabled = true;
3811
3812 done:
3813         ret = header->destnode;
3814
3815 fail:
3816         reply.srvid = disable->srvid;
3817         data.dptr = (uint8_t *)&ret;
3818         data.dsize = sizeof(int);
3819         reply.data = data;
3820
3821         client_send_message(req, header, &reply);
3822 }
3823
3824 static void message_takeover_run(TALLOC_CTX *mem_ctx,
3825                                  struct tevent_req *req,
3826                                  struct ctdb_req_header *header,
3827                                  struct ctdb_req_message *request)
3828 {
3829         struct client_state *state = tevent_req_data(
3830                 req, struct client_state);
3831         struct ctdbd_context *ctdb = state->ctdb;
3832         struct ctdb_srvid_message *srvid = request->data.msg;
3833         struct ctdb_req_message_data reply;
3834         int ret = -1;
3835         TDB_DATA data;
3836
3837         if (header->destnode != ctdb->node_map->recmaster) {
3838                 /* No reply! Only recmaster replies... */
3839                 return;
3840         }
3841
3842         DEBUG(DEBUG_INFO, ("IP takover run on node %u\n",
3843                            header->destnode));
3844         ret = header->destnode;
3845
3846         reply.srvid = srvid->srvid;
3847         data.dptr = (uint8_t *)&ret;
3848         data.dsize = sizeof(int);
3849         reply.data = data;
3850
3851         client_send_message(req, header, &reply);
3852 }
3853
3854 /*
3855  * Handle a single client
3856  */
3857
3858 static void client_read_handler(uint8_t *buf, size_t buflen,
3859                                 void *private_data);
3860 static void client_dead_handler(void *private_data);
3861 static void client_process_packet(struct tevent_req *req,
3862                                   uint8_t *buf, size_t buflen);
3863 static void client_process_call(struct tevent_req *req,
3864                                 uint8_t *buf, size_t buflen);
3865 static void client_process_message(struct tevent_req *req,
3866                                    uint8_t *buf, size_t buflen);
3867 static void client_process_control(struct tevent_req *req,
3868                                    uint8_t *buf, size_t buflen);
3869 static void client_reply_done(struct tevent_req *subreq);
3870
3871 static struct tevent_req *client_send(TALLOC_CTX *mem_ctx,
3872                                       struct tevent_context *ev,
3873                                       int fd, struct ctdbd_context *ctdb,
3874                                       int pnn)
3875 {
3876         struct tevent_req *req;
3877         struct client_state *state;
3878         int ret;
3879
3880         req = tevent_req_create(mem_ctx, &state, struct client_state);
3881         if (req == NULL) {
3882                 return NULL;
3883         }
3884
3885         state->ev = ev;
3886         state->fd = fd;
3887         state->ctdb = ctdb;
3888         state->pnn = pnn;
3889
3890         (void) ctdb_get_peer_pid(fd, &state->pid);
3891
3892         ret = comm_setup(state, ev, fd, client_read_handler, req,
3893                          client_dead_handler, req, &state->comm);
3894         if (ret != 0) {
3895                 tevent_req_error(req, ret);
3896                 return tevent_req_post(req, ev);
3897         }
3898
3899         ret = client_add(ctdb, state->pid, state);
3900         if (ret != 0) {
3901                 tevent_req_error(req, ret);
3902                 return tevent_req_post(req, ev);
3903         }
3904
3905         DEBUG(DEBUG_INFO, ("New client fd=%d\n", fd));
3906
3907         return req;
3908 }
3909
3910 static void client_read_handler(uint8_t *buf, size_t buflen,
3911                                 void *private_data)
3912 {
3913         struct tevent_req *req = talloc_get_type_abort(
3914                 private_data, struct tevent_req);
3915         struct client_state *state = tevent_req_data(
3916                 req, struct client_state);
3917         struct ctdbd_context *ctdb = state->ctdb;
3918         struct ctdb_req_header header;
3919         size_t np;
3920         unsigned int i;
3921         int ret;
3922
3923         ret = ctdb_req_header_pull(buf, buflen, &header, &np);
3924         if (ret != 0) {
3925                 return;
3926         }
3927
3928         if (buflen != header.length) {
3929                 return;
3930         }
3931
3932         ret = ctdb_req_header_verify(&header, 0);
3933         if (ret != 0) {
3934                 return;
3935         }
3936
3937         header_fix_pnn(&header, ctdb);
3938
3939         if (header.destnode == CTDB_BROADCAST_ALL) {
3940                 for (i=0; i<ctdb->node_map->num_nodes; i++) {
3941                         header.destnode = i;
3942
3943                         ctdb_req_header_push(&header, buf, &np);
3944                         client_process_packet(req, buf, buflen);
3945                 }
3946                 return;
3947         }
3948
3949         if (header.destnode == CTDB_BROADCAST_CONNECTED) {
3950                 for (i=0; i<ctdb->node_map->num_nodes; i++) {
3951                         if (ctdb->node_map->node[i].flags &
3952                             NODE_FLAGS_DISCONNECTED) {
3953                                 continue;
3954                         }
3955
3956                         header.destnode = i;
3957
3958                         ctdb_req_header_push(&header, buf, &np);
3959                         client_process_packet(req, buf, buflen);
3960                 }
3961                 return;
3962         }
3963
3964         if (header.destnode > ctdb->node_map->num_nodes) {
3965                 fprintf(stderr, "Invalid destination pnn 0x%x\n",
3966                         header.destnode);
3967                 return;
3968         }
3969
3970
3971         if (ctdb->node_map->node[header.destnode].flags & NODE_FLAGS_DISCONNECTED) {
3972                 fprintf(stderr, "Packet for disconnected node pnn %u\n",
3973                         header.destnode);
3974                 return;
3975         }
3976
3977         ctdb_req_header_push(&header, buf, &np);
3978         client_process_packet(req, buf, buflen);
3979 }
3980
3981 static void client_dead_handler(void *private_data)
3982 {
3983         struct tevent_req *req = talloc_get_type_abort(
3984                 private_data, struct tevent_req);
3985
3986         tevent_req_done(req);
3987 }
3988
3989 static void client_process_packet(struct tevent_req *req,
3990                                   uint8_t *buf, size_t buflen)
3991 {
3992         struct ctdb_req_header header;
3993         size_t np;
3994         int ret;
3995
3996         ret = ctdb_req_header_pull(buf, buflen, &header, &np);
3997         if (ret != 0) {
3998                 return;
3999         }
4000
4001         switch (header.operation) {
4002         case CTDB_REQ_CALL:
4003                 client_process_call(req, buf, buflen);
4004                 break;
4005
4006         case CTDB_REQ_MESSAGE:
4007                 client_process_message(req, buf, buflen);
4008                 break;
4009
4010         case CTDB_REQ_CONTROL:
4011                 client_process_control(req, buf, buflen);
4012                 break;
4013
4014         default:
4015                 break;
4016         }
4017 }
4018
4019 static void client_process_call(struct tevent_req *req,
4020                                 uint8_t *buf, size_t buflen)
4021 {
4022         struct client_state *state = tevent_req_data(
4023                 req, struct client_state);
4024         struct ctdbd_context *ctdb = state->ctdb;
4025         TALLOC_CTX *mem_ctx;
4026         struct ctdb_req_header header;
4027         struct ctdb_req_call request;
4028         struct ctdb_reply_call reply;
4029         struct database *db;
4030         struct ctdb_ltdb_header hdr;
4031         TDB_DATA data;
4032         int ret;
4033
4034         mem_ctx = talloc_new(state);
4035         if (tevent_req_nomem(mem_ctx, req)) {
4036                 return;
4037         }
4038
4039         ret = ctdb_req_call_pull(buf, buflen, &header, mem_ctx, &request);
4040         if (ret != 0) {
4041                 talloc_free(mem_ctx);
4042                 tevent_req_error(req, ret);
4043                 return;
4044         }
4045
4046         header_fix_pnn(&header, ctdb);
4047
4048         if (header.destnode >= ctdb->node_map->num_nodes) {
4049                 goto fail;
4050         }
4051
4052         DEBUG(DEBUG_INFO, ("call db_id = %u\n", request.db_id));
4053
4054         db = database_find(ctdb->db_map, request.db_id);
4055         if (db == NULL) {
4056                 goto fail;
4057         }
4058
4059         ret = ltdb_fetch(db, request.key, &hdr, mem_ctx, &data);
4060         if (ret != 0) {
4061                 goto fail;
4062         }
4063
4064         /* Fake migration */
4065         if (hdr.dmaster != ctdb->node_map->pnn) {
4066                 hdr.dmaster = ctdb->node_map->pnn;
4067
4068                 ret = ltdb_store(db, request.key, &hdr, data);
4069                 if (ret != 0) {
4070                         goto fail;
4071                 }
4072         }
4073
4074         talloc_free(mem_ctx);
4075
4076         reply.status = 0;
4077         reply.data = tdb_null;
4078
4079         client_send_call(req, &header, &reply);
4080         return;
4081
4082 fail:
4083         talloc_free(mem_ctx);
4084         reply.status = -1;
4085         reply.data = tdb_null;
4086
4087         client_send_call(req, &header, &reply);
4088 }
4089
4090 static void client_process_message(struct tevent_req *req,
4091                                    uint8_t *buf, size_t buflen)
4092 {
4093         struct client_state *state = tevent_req_data(
4094                 req, struct client_state);
4095         struct ctdbd_context *ctdb = state->ctdb;
4096         TALLOC_CTX *mem_ctx;
4097         struct ctdb_req_header header;
4098         struct ctdb_req_message request;
4099         uint64_t srvid;
4100         int ret;
4101
4102         mem_ctx = talloc_new(state);
4103         if (tevent_req_nomem(mem_ctx, req)) {
4104                 return;
4105         }
4106
4107         ret = ctdb_req_message_pull(buf, buflen, &header, mem_ctx, &request);
4108         if (ret != 0) {
4109                 talloc_free(mem_ctx);
4110                 tevent_req_error(req, ret);
4111                 return;
4112         }
4113
4114         header_fix_pnn(&header, ctdb);
4115
4116         if (header.destnode >= ctdb->node_map->num_nodes) {
4117                 /* Many messages are not replied to, so just behave as
4118                  * though this message was not received */
4119                 fprintf(stderr, "Invalid node %d\n", header.destnode);
4120                 talloc_free(mem_ctx);
4121                 return;
4122         }
4123
4124         srvid = request.srvid;
4125         DEBUG(DEBUG_INFO, ("request srvid = 0x%"PRIx64"\n", srvid));
4126
4127         if (srvid == CTDB_SRVID_DISABLE_RECOVERIES) {
4128                 message_disable_recoveries(mem_ctx, req, &header, &request);
4129         } else if (srvid == CTDB_SRVID_TAKEOVER_RUN) {
4130                 message_takeover_run(mem_ctx, req, &header, &request);
4131         } else {
4132                 D_DEBUG("Message id 0x%"PRIx64" not implemented\n", srvid);
4133         }
4134
4135         /* check srvid */
4136         talloc_free(mem_ctx);
4137 }
4138
4139 static void client_process_control(struct tevent_req *req,
4140                                    uint8_t *buf, size_t buflen)
4141 {
4142         struct client_state *state = tevent_req_data(
4143                 req, struct client_state);
4144         struct ctdbd_context *ctdb = state->ctdb;
4145         TALLOC_CTX *mem_ctx;
4146         struct ctdb_req_header header;
4147         struct ctdb_req_control request;
4148         int ret;
4149
4150         mem_ctx = talloc_new(state);
4151         if (tevent_req_nomem(mem_ctx, req)) {
4152                 return;
4153         }
4154
4155         ret = ctdb_req_control_pull(buf, buflen, &header, mem_ctx, &request);
4156         if (ret != 0) {
4157                 talloc_free(mem_ctx);
4158                 tevent_req_error(req, ret);
4159                 return;
4160         }
4161
4162         header_fix_pnn(&header, ctdb);
4163
4164         if (header.destnode >= ctdb->node_map->num_nodes) {
4165                 struct ctdb_reply_control reply;
4166
4167                 reply.rdata.opcode = request.opcode;
4168                 reply.errmsg = "Invalid node";
4169                 reply.status = -1;
4170                 client_send_control(req, &header, &reply);
4171                 return;
4172         }
4173
4174         DEBUG(DEBUG_INFO, ("request opcode = %u, reqid = %u\n",
4175                            request.opcode, header.reqid));
4176
4177         if (fake_control_failure(mem_ctx, req, &header, &request)) {
4178                 goto done;
4179         }
4180
4181         switch (request.opcode) {
4182         case CTDB_CONTROL_PROCESS_EXISTS:
4183                 control_process_exists(mem_ctx, req, &header, &request);
4184                 break;
4185
4186         case CTDB_CONTROL_PING:
4187                 control_ping(mem_ctx, req, &header, &request);
4188                 break;
4189
4190         case CTDB_CONTROL_GETDBPATH:
4191                 control_getdbpath(mem_ctx, req, &header, &request);
4192                 break;
4193
4194         case CTDB_CONTROL_GETVNNMAP:
4195                 control_getvnnmap(mem_ctx, req, &header, &request);
4196                 break;
4197
4198         case CTDB_CONTROL_GET_DEBUG:
4199                 control_get_debug(mem_ctx, req, &header, &request);
4200                 break;
4201
4202         case CTDB_CONTROL_SET_DEBUG:
4203                 control_set_debug(mem_ctx, req, &header, &request);
4204                 break;
4205
4206         case CTDB_CONTROL_GET_DBMAP:
4207                 control_get_dbmap(mem_ctx, req, &header, &request);
4208                 break;
4209
4210         case CTDB_CONTROL_GET_RECMODE:
4211                 control_get_recmode(mem_ctx, req, &header, &request);
4212                 break;
4213
4214         case CTDB_CONTROL_SET_RECMODE:
4215                 control_set_recmode(mem_ctx, req, &header, &request);
4216                 break;
4217
4218         case CTDB_CONTROL_DB_ATTACH:
4219                 control_db_attach(mem_ctx, req, &header, &request);
4220                 break;
4221
4222         case CTDB_CONTROL_REGISTER_SRVID:
4223                 control_register_srvid(mem_ctx, req, &header, &request);
4224                 break;
4225
4226         case CTDB_CONTROL_DEREGISTER_SRVID:
4227                 control_deregister_srvid(mem_ctx, req, &header, &request);
4228                 break;
4229
4230         case CTDB_CONTROL_GET_DBNAME:
4231                 control_get_dbname(mem_ctx, req, &header, &request);
4232                 break;
4233
4234         case CTDB_CONTROL_GET_PID:
4235                 control_get_pid(mem_ctx, req, &header, &request);
4236                 break;
4237
4238         case CTDB_CONTROL_GET_PNN:
4239                 control_get_pnn(mem_ctx, req, &header, &request);
4240                 break;
4241
4242         case CTDB_CONTROL_SHUTDOWN:
4243                 control_shutdown(mem_ctx, req, &header, &request);
4244                 break;
4245
4246         case CTDB_CONTROL_SET_TUNABLE:
4247                 control_set_tunable(mem_ctx, req, &header, &request);
4248                 break;
4249
4250         case CTDB_CONTROL_GET_TUNABLE:
4251                 control_get_tunable(mem_ctx, req, &header, &request);
4252                 break;
4253
4254         case CTDB_CONTROL_LIST_TUNABLES:
4255                 control_list_tunables(mem_ctx, req, &header, &request);
4256                 break;
4257
4258         case CTDB_CONTROL_MODIFY_FLAGS:
4259                 control_modify_flags(mem_ctx, req, &header, &request);
4260                 break;
4261
4262         case CTDB_CONTROL_GET_ALL_TUNABLES:
4263                 control_get_all_tunables(mem_ctx, req, &header, &request);
4264                 break;
4265
4266         case CTDB_CONTROL_DB_ATTACH_PERSISTENT:
4267                 control_db_attach_persistent(mem_ctx, req, &header, &request);
4268                 break;
4269
4270         case CTDB_CONTROL_UPTIME:
4271                 control_uptime(mem_ctx, req, &header, &request);
4272                 break;
4273
4274         case CTDB_CONTROL_RELOAD_NODES_FILE:
4275                 control_reload_nodes_file(mem_ctx, req, &header, &request);
4276                 break;
4277
4278         case CTDB_CONTROL_GET_CAPABILITIES:
4279                 control_get_capabilities(mem_ctx, req, &header, &request);
4280                 break;
4281
4282         case CTDB_CONTROL_RELEASE_IP:
4283                 control_release_ip(mem_ctx, req, &header, &request);
4284                 break;
4285
4286         case CTDB_CONTROL_TAKEOVER_IP:
4287                 control_takeover_ip(mem_ctx, req, &header, &request);
4288                 break;
4289
4290         case CTDB_CONTROL_GET_PUBLIC_IPS:
4291                 control_get_public_ips(mem_ctx, req, &header, &request);
4292                 break;
4293
4294         case CTDB_CONTROL_GET_NODEMAP:
4295                 control_get_nodemap(mem_ctx, req, &header, &request);
4296                 break;
4297
4298         case CTDB_CONTROL_GET_RECLOCK_FILE:
4299                 control_get_reclock_file(mem_ctx, req, &header, &request);
4300                 break;
4301
4302         case CTDB_CONTROL_STOP_NODE:
4303                 control_stop_node(mem_ctx, req, &header, &request);
4304                 break;
4305
4306         case CTDB_CONTROL_CONTINUE_NODE:
4307                 control_continue_node(mem_ctx, req, &header, &request);
4308                 break;
4309
4310         case CTDB_CONTROL_SET_BAN_STATE:
4311                 control_set_ban_state(mem_ctx, req, &header, &request);
4312                 break;
4313
4314         case CTDB_CONTROL_TRANS3_COMMIT:
4315                 control_trans3_commit(mem_ctx, req, &header, &request);
4316                 break;
4317
4318         case CTDB_CONTROL_GET_DB_SEQNUM:
4319                 control_get_db_seqnum(mem_ctx, req, &header, &request);
4320                 break;
4321
4322         case CTDB_CONTROL_DB_GET_HEALTH:
4323                 control_db_get_health(mem_ctx, req, &header, &request);
4324                 break;
4325
4326         case CTDB_CONTROL_GET_PUBLIC_IP_INFO:
4327                 control_get_public_ip_info(mem_ctx, req, &header, &request);
4328                 break;
4329
4330         case CTDB_CONTROL_GET_IFACES:
4331                 control_get_ifaces(mem_ctx, req, &header, &request);
4332                 break;
4333
4334         case CTDB_CONTROL_SET_IFACE_LINK_STATE:
4335                 control_set_iface_link_state(mem_ctx, req, &header, &request);
4336                 break;
4337
4338         case CTDB_CONTROL_SET_DB_READONLY:
4339                 control_set_db_readonly(mem_ctx, req, &header, &request);
4340                 break;
4341
4342         case CTDB_CONTROL_TRAVERSE_START_EXT:
4343                 control_traverse_start_ext(mem_ctx, req, &header, &request);
4344                 break;
4345
4346         case CTDB_CONTROL_SET_DB_STICKY:
4347                 control_set_db_sticky(mem_ctx, req, &header, &request);
4348                 break;
4349
4350         case CTDB_CONTROL_IPREALLOCATED:
4351                 control_ipreallocated(mem_ctx, req, &header, &request);
4352                 break;
4353
4354         case CTDB_CONTROL_GET_RUNSTATE:
4355                 control_get_runstate(mem_ctx, req, &header, &request);
4356                 break;
4357
4358         case CTDB_CONTROL_GET_NODES_FILE:
4359                 control_get_nodes_file(mem_ctx, req, &header, &request);
4360                 break;
4361
4362         case CTDB_CONTROL_DB_OPEN_FLAGS:
4363                 control_db_open_flags(mem_ctx, req, &header, &request);
4364                 break;
4365
4366         case CTDB_CONTROL_DB_ATTACH_REPLICATED:
4367                 control_db_attach_replicated(mem_ctx, req, &header, &request);
4368                 break;
4369
4370         case CTDB_CONTROL_CHECK_PID_SRVID:
4371                 control_check_pid_srvid(mem_ctx, req, &header, &request);
4372                 break;
4373
4374         case CTDB_CONTROL_DISABLE_NODE:
4375                 control_disable_node(mem_ctx, req, &header, &request);
4376                 break;
4377
4378         case CTDB_CONTROL_ENABLE_NODE:
4379                 control_enable_node(mem_ctx, req, &header, &request);
4380                 break;
4381
4382         case CTDB_CONTROL_START_IPREALLOCATE:
4383                 control_start_ipreallocate(mem_ctx, req, &header, &request);
4384                 break;
4385
4386         default:
4387                 if (! (request.flags & CTDB_CTRL_FLAG_NOREPLY)) {
4388                         control_error(mem_ctx, req, &header, &request);
4389                 }
4390                 break;
4391         }
4392
4393 done:
4394         talloc_free(mem_ctx);
4395 }
4396
4397 static int client_recv(struct tevent_req *req, int *perr)
4398 {
4399         struct client_state *state = tevent_req_data(
4400                 req, struct client_state);
4401         int err;
4402
4403         DEBUG(DEBUG_INFO, ("Client done fd=%d\n", state->fd));
4404         close(state->fd);
4405
4406         if (tevent_req_is_unix_error(req, &err)) {
4407                 if (perr != NULL) {
4408                         *perr = err;
4409                 }
4410                 return -1;
4411         }
4412
4413         return state->status;
4414 }
4415
4416 /*
4417  * Fake CTDB server
4418  */
4419
4420 struct server_state {
4421         struct tevent_context *ev;
4422         struct ctdbd_context *ctdb;
4423         struct tevent_timer *leader_broadcast_te;
4424         int fd;
4425 };
4426
4427 static void server_leader_broadcast(struct tevent_context *ev,
4428                                     struct tevent_timer *te,
4429                                     struct timeval current_time,
4430                                     void *private_data);
4431 static void server_new_client(struct tevent_req *subreq);
4432 static void server_client_done(struct tevent_req *subreq);
4433
4434 static struct tevent_req *server_send(TALLOC_CTX *mem_ctx,
4435                                       struct tevent_context *ev,
4436                                       struct ctdbd_context *ctdb,
4437                                       int fd)
4438 {
4439         struct tevent_req *req, *subreq;
4440         struct server_state *state;
4441
4442         req = tevent_req_create(mem_ctx, &state, struct server_state);
4443         if (req == NULL) {
4444                 return NULL;
4445         }
4446
4447         state->ev = ev;
4448         state->ctdb = ctdb;
4449         state->fd = fd;
4450
4451         state->leader_broadcast_te = tevent_add_timer(state->ev,
4452                                                       state,
4453                                                       timeval_current_ofs(0, 0),
4454                                                       server_leader_broadcast,
4455                                                       state);
4456         if (state->leader_broadcast_te == NULL) {
4457                 DBG_WARNING("Failed to set up leader broadcast\n");
4458         }
4459
4460         subreq = accept_send(state, ev, fd);
4461         if (tevent_req_nomem(subreq, req)) {
4462                 return tevent_req_post(req, ev);
4463         }
4464         tevent_req_set_callback(subreq, server_new_client, req);
4465
4466         return req;
4467 }
4468
4469 static void server_leader_broadcast(struct tevent_context *ev,
4470                                     struct tevent_timer *te,
4471                                     struct timeval current_time,
4472                                     void *private_data)
4473 {
4474         struct server_state *state = talloc_get_type_abort(
4475                 private_data, struct server_state);
4476         struct ctdbd_context *ctdb = state->ctdb;
4477         uint32_t leader = ctdb->node_map->recmaster;
4478         TDB_DATA data;
4479         int ret;
4480
4481         if (leader == CTDB_UNKNOWN_PNN) {
4482                 goto done;
4483         }
4484
4485         data.dptr = (uint8_t *)&leader;
4486         data.dsize = sizeof(leader);
4487
4488         ret = srvid_dispatch(ctdb->srv, CTDB_SRVID_LEADER, 0, data);
4489         if (ret != 0) {
4490                 DBG_WARNING("Failed to send leader broadcast, ret=%d\n", ret);
4491         }
4492
4493 done:
4494         state->leader_broadcast_te = tevent_add_timer(state->ev,
4495                                                       state,
4496                                                       timeval_current_ofs(1, 0),
4497                                                       server_leader_broadcast,
4498                                                       state);
4499         if (state->leader_broadcast_te == NULL) {
4500                 DBG_WARNING("Failed to set up leader broadcast\n");
4501         }
4502 }
4503
4504 static void server_new_client(struct tevent_req *subreq)
4505 {
4506         struct tevent_req *req = tevent_req_callback_data(
4507                 subreq, struct tevent_req);
4508         struct server_state *state = tevent_req_data(
4509                 req, struct server_state);
4510         struct ctdbd_context *ctdb = state->ctdb;
4511         int client_fd;
4512         int ret = 0;
4513
4514         client_fd = accept_recv(subreq, NULL, NULL, &ret);
4515         TALLOC_FREE(subreq);
4516         if (client_fd == -1) {
4517                 tevent_req_error(req, ret);
4518                 return;
4519         }
4520
4521         subreq = client_send(state, state->ev, client_fd,
4522                              ctdb, ctdb->node_map->pnn);
4523         if (tevent_req_nomem(subreq, req)) {
4524                 return;
4525         }
4526         tevent_req_set_callback(subreq, server_client_done, req);
4527
4528         ctdb->num_clients += 1;
4529
4530         subreq = accept_send(state, state->ev, state->fd);
4531         if (tevent_req_nomem(subreq, req)) {
4532                 return;
4533         }
4534         tevent_req_set_callback(subreq, server_new_client, req);
4535 }
4536
4537 static void server_client_done(struct tevent_req *subreq)
4538 {
4539         struct tevent_req *req = tevent_req_callback_data(
4540                 subreq, struct tevent_req);
4541         struct server_state *state = tevent_req_data(
4542                 req, struct server_state);
4543         struct ctdbd_context *ctdb = state->ctdb;
4544         int ret = 0;
4545         int status;
4546
4547         status = client_recv(subreq, &ret);
4548         TALLOC_FREE(subreq);
4549         if (status < 0) {
4550                 tevent_req_error(req, ret);
4551                 return;
4552         }
4553
4554         ctdb->num_clients -= 1;
4555
4556         if (status == 99) {
4557                 /* Special status, to shutdown server */
4558                 DEBUG(DEBUG_INFO, ("Shutting down server\n"));
4559                 tevent_req_done(req);
4560         }
4561 }
4562
4563 static bool server_recv(struct tevent_req *req, int *perr)
4564 {
4565         int err;
4566
4567         if (tevent_req_is_unix_error(req, &err)) {
4568                 if (perr != NULL) {
4569                         *perr = err;
4570                 }
4571                 return false;
4572         }
4573         return true;
4574 }
4575
4576 /*
4577  * Main functions
4578  */
4579
4580 static int socket_init(const char *sockpath)
4581 {
4582         struct sockaddr_un addr;
4583         size_t len;
4584         int ret, fd;
4585
4586         memset(&addr, 0, sizeof(addr));
4587         addr.sun_family = AF_UNIX;
4588
4589         len = strlcpy(addr.sun_path, sockpath, sizeof(addr.sun_path));
4590         if (len >= sizeof(addr.sun_path)) {
4591                 fprintf(stderr, "path too long: %s\n", sockpath);
4592                 return -1;
4593         }
4594
4595         fd = socket(AF_UNIX, SOCK_STREAM, 0);
4596         if (fd == -1) {
4597                 fprintf(stderr, "socket failed - %s\n", sockpath);
4598                 return -1;
4599         }
4600
4601         ret = bind(fd, (struct sockaddr *)&addr, sizeof(addr));
4602         if (ret != 0) {
4603                 fprintf(stderr, "bind failed - %s\n", sockpath);
4604                 goto fail;
4605         }
4606
4607         ret = listen(fd, 10);
4608         if (ret != 0) {
4609                 fprintf(stderr, "listen failed\n");
4610                 goto fail;
4611         }
4612
4613         DEBUG(DEBUG_INFO, ("Socket init done\n"));
4614
4615         return fd;
4616
4617 fail:
4618         if (fd != -1) {
4619                 close(fd);
4620         }
4621         return -1;
4622 }
4623
4624 static struct options {
4625         const char *dbdir;
4626         const char *sockpath;
4627         const char *pidfile;
4628         const char *debuglevel;
4629 } options;
4630
4631 static struct poptOption cmdline_options[] = {
4632         POPT_AUTOHELP
4633         { "dbdir", 'D', POPT_ARG_STRING, &options.dbdir, 0,
4634                 "Database directory", "directory" },
4635         { "socket", 's', POPT_ARG_STRING, &options.sockpath, 0,
4636                 "Unix domain socket path", "filename" },
4637         { "pidfile", 'p', POPT_ARG_STRING, &options.pidfile, 0,
4638                 "pid file", "filename" } ,
4639         { "debug", 'd', POPT_ARG_STRING, &options.debuglevel, 0,
4640                 "debug level", "ERR|WARNING|NOTICE|INFO|DEBUG" } ,
4641         POPT_TABLEEND
4642 };
4643
4644 static void cleanup(void)
4645 {
4646         unlink(options.sockpath);
4647         unlink(options.pidfile);
4648 }
4649
4650 static void signal_handler(int sig)
4651 {
4652         cleanup();
4653         exit(0);
4654 }
4655
4656 static void start_server(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
4657                          struct ctdbd_context *ctdb, int fd, int pfd)
4658 {
4659         struct tevent_req *req;
4660         int ret = 0;
4661         ssize_t len;
4662
4663         atexit(cleanup);
4664         signal(SIGTERM, signal_handler);
4665
4666         req = server_send(mem_ctx, ev, ctdb, fd);
4667         if (req == NULL) {
4668                 fprintf(stderr, "Memory error\n");
4669                 exit(1);
4670         }
4671
4672         len = write(pfd, &ret, sizeof(ret));
4673         if (len != sizeof(ret)) {
4674                 fprintf(stderr, "Failed to send message to parent\n");
4675                 exit(1);
4676         }
4677         close(pfd);
4678
4679         tevent_req_poll(req, ev);
4680
4681         server_recv(req, &ret);
4682         if (ret != 0) {
4683                 exit(1);
4684         }
4685 }
4686
4687 int main(int argc, const char *argv[])
4688 {
4689         TALLOC_CTX *mem_ctx;
4690         struct ctdbd_context *ctdb;
4691         struct tevent_context *ev;
4692         poptContext pc;
4693         int opt, fd, ret, pfd[2];
4694         ssize_t len;
4695         pid_t pid;
4696         FILE *fp;
4697
4698         pc = poptGetContext(argv[0], argc, argv, cmdline_options,
4699                             POPT_CONTEXT_KEEP_FIRST);
4700         while ((opt = poptGetNextOpt(pc)) != -1) {
4701                 fprintf(stderr, "Invalid option %s\n", poptBadOption(pc, 0));
4702                 exit(1);
4703         }
4704
4705         if (options.dbdir == NULL) {
4706                 fprintf(stderr, "Please specify database directory\n");
4707                 poptPrintHelp(pc, stdout, 0);
4708                 exit(1);
4709         }
4710
4711         if (options.sockpath == NULL) {
4712                 fprintf(stderr, "Please specify socket path\n");
4713                 poptPrintHelp(pc, stdout, 0);
4714                 exit(1);
4715         }
4716
4717         if (options.pidfile == NULL) {
4718                 fprintf(stderr, "Please specify pid file\n");
4719                 poptPrintHelp(pc, stdout, 0);
4720                 exit(1);
4721         }
4722
4723         mem_ctx = talloc_new(NULL);
4724         if (mem_ctx == NULL) {
4725                 fprintf(stderr, "Memory error\n");
4726                 exit(1);
4727         }
4728
4729         ret = logging_init(mem_ctx, "file:", options.debuglevel, "fake-ctdbd");
4730         if (ret != 0) {
4731                 fprintf(stderr, "Invalid debug level\n");
4732                 poptPrintHelp(pc, stdout, 0);
4733                 exit(1);
4734         }
4735
4736         ctdb = ctdbd_setup(mem_ctx, options.dbdir);
4737         if (ctdb == NULL) {
4738                 exit(1);
4739         }
4740
4741         if (! ctdbd_verify(ctdb)) {
4742                 exit(1);
4743         }
4744
4745         ev = tevent_context_init(mem_ctx);
4746         if (ev == NULL) {
4747                 fprintf(stderr, "Memory error\n");
4748                 exit(1);
4749         }
4750
4751         fd = socket_init(options.sockpath);
4752         if (fd == -1) {
4753                 exit(1);
4754         }
4755
4756         ret = pipe(pfd);
4757         if (ret != 0) {
4758                 fprintf(stderr, "Failed to create pipe\n");
4759                 cleanup();
4760                 exit(1);
4761         }
4762
4763         pid = fork();
4764         if (pid == -1) {
4765                 fprintf(stderr, "Failed to fork\n");
4766                 cleanup();
4767                 exit(1);
4768         }
4769
4770         if (pid == 0) {
4771                 /* Child */
4772                 close(pfd[0]);
4773                 start_server(mem_ctx, ev, ctdb, fd, pfd[1]);
4774                 exit(1);
4775         }
4776
4777         /* Parent */
4778         close(pfd[1]);
4779
4780         len = read(pfd[0], &ret, sizeof(ret));
4781         close(pfd[0]);
4782         if (len != sizeof(ret)) {
4783                 fprintf(stderr, "len = %zi\n", len);
4784                 fprintf(stderr, "Failed to get message from child\n");
4785                 kill(pid, SIGTERM);
4786                 exit(1);
4787         }
4788
4789         fp = fopen(options.pidfile, "w");
4790         if (fp == NULL) {
4791                 fprintf(stderr, "Failed to open pid file %s\n",
4792                         options.pidfile);
4793                 kill(pid, SIGTERM);
4794                 exit(1);
4795         }
4796         fprintf(fp, "%d\n", pid);
4797         fclose(fp);
4798
4799         return 0;
4800 }