2 Fake CTDB server for testing
4 Copyright (C) Amitay Isaacs 2016
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
21 #include "system/network.h"
22 #include "system/time.h"
23 #include "system/filesys.h"
30 #include "lib/util/dlinklist.h"
31 #include "lib/util/tevent_unix.h"
32 #include "lib/util/debug.h"
33 #include "lib/util/samba_util.h"
34 #include "lib/async_req/async_sock.h"
36 #include "protocol/protocol.h"
37 #include "protocol/protocol_api.h"
38 #include "protocol/protocol_util.h"
39 #include "protocol/protocol_private.h"
41 #include "common/comm.h"
42 #include "common/logging.h"
43 #include "common/tunable.h"
44 #include "common/srvid.h"
45 #include "common/system.h"
47 #include "ipalloc_read_known_ips.h"
50 #define CTDB_PORT 4379
52 /* A fake flag that is only supported by some functions */
53 #define NODE_FLAGS_FAKE_TIMEOUT 0x80000000
59 uint32_t capabilities;
60 bool recovery_disabled;
61 void *recovery_substate;
77 struct interface_map {
79 struct interface *iface;
90 struct database *prev, *next;
93 struct tdb_context *tdb;
104 struct fake_control_failure {
105 struct fake_control_failure *prev, *next;
106 enum ctdb_controls opcode;
113 struct ctdb_client *prev, *next;
114 struct ctdbd_context *ctdb;
119 struct ctdbd_context {
120 struct node_map *node_map;
121 struct interface_map *iface_map;
122 struct vnn_map *vnn_map;
123 struct database_map *db_map;
124 struct srvid_context *srv;
126 struct timeval start_time;
127 struct timeval recovery_start_time;
128 struct timeval recovery_end_time;
129 bool takeover_disabled;
131 enum ctdb_runstate runstate;
132 struct ctdb_tunable_list tun_list;
134 struct ctdb_public_ip_list *known_ips;
135 struct fake_control_failure *control_failures;
136 struct ctdb_client *client_list;
143 static struct node_map *nodemap_init(TALLOC_CTX *mem_ctx)
145 struct node_map *node_map;
147 node_map = talloc_zero(mem_ctx, struct node_map);
148 if (node_map == NULL) {
152 node_map->pnn = CTDB_UNKNOWN_PNN;
153 node_map->recmaster = CTDB_UNKNOWN_PNN;
158 /* Read a nodemap from stdin. Each line looks like:
159 * <PNN> <FLAGS> [RECMASTER] [CURRENT] [CAPABILITIES]
160 * EOF or a blank line terminates input.
162 * By default, capabilities for each node are
163 * CTDB_CAP_RECMASTER|CTDB_CAP_LMASTER. These 2
164 * capabilities can be faked off by adding, for example,
165 * -CTDB_CAP_RECMASTER.
168 static bool nodemap_parse(struct node_map *node_map)
172 while ((fgets(line, sizeof(line), stdin) != NULL)) {
173 uint32_t pnn, flags, capabilities;
176 ctdb_sock_addr saddr;
180 if (line[0] == '\n') {
184 /* Get rid of pesky newline */
185 if ((t = strchr(line, '\n')) != NULL) {
190 tok = strtok(line, " \t");
192 fprintf(stderr, "bad line (%s) - missing PNN\n", line);
195 pnn = (uint32_t)strtoul(tok, NULL, 0);
198 tok = strtok(NULL, " \t");
200 fprintf(stderr, "bad line (%s) - missing IP\n", line);
203 ret = ctdb_sock_addr_from_string(tok, &saddr, false);
205 fprintf(stderr, "bad line (%s) - invalid IP\n", line);
208 ctdb_sock_addr_set_port(&saddr, CTDB_PORT);
209 ip = talloc_strdup(node_map, tok);
215 tok = strtok(NULL, " \t");
217 fprintf(stderr, "bad line (%s) - missing flags\n",
221 flags = (uint32_t)strtoul(tok, NULL, 0);
222 /* Handle deleted nodes */
223 if (flags & NODE_FLAGS_DELETED) {
225 ip = talloc_strdup(node_map, "0.0.0.0");
230 capabilities = CTDB_CAP_RECMASTER|CTDB_CAP_LMASTER;
232 tok = strtok(NULL, " \t");
233 while (tok != NULL) {
234 if (strcmp(tok, "CURRENT") == 0) {
236 } else if (strcmp(tok, "RECMASTER") == 0) {
237 node_map->recmaster = pnn;
238 } else if (strcmp(tok, "-CTDB_CAP_RECMASTER") == 0) {
239 capabilities &= ~CTDB_CAP_RECMASTER;
240 } else if (strcmp(tok, "-CTDB_CAP_LMASTER") == 0) {
241 capabilities &= ~CTDB_CAP_LMASTER;
242 } else if (strcmp(tok, "TIMEOUT") == 0) {
243 /* This can be done with just a flag
244 * value but it is probably clearer
245 * and less error-prone to fake this
246 * with an explicit token */
247 flags |= NODE_FLAGS_FAKE_TIMEOUT;
249 tok = strtok(NULL, " \t");
252 node_map->node = talloc_realloc(node_map, node_map->node,
254 node_map->num_nodes + 1);
255 if (node_map->node == NULL) {
258 node = &node_map->node[node_map->num_nodes];
260 ret = ctdb_sock_addr_from_string(ip, &node->addr, false);
262 fprintf(stderr, "bad line (%s) - invalid IP\n", line);
265 ctdb_sock_addr_set_port(&node->addr, CTDB_PORT);
268 node->capabilities = capabilities;
269 node->recovery_disabled = false;
270 node->recovery_substate = NULL;
272 node_map->num_nodes += 1;
275 if (node_map->num_nodes == 0) {
279 DEBUG(DEBUG_INFO, ("Parsing nodemap done\n"));
283 DEBUG(DEBUG_INFO, ("Parsing nodemap failed\n"));
288 /* Append a node to a node map with given address and flags */
289 static bool node_map_add(struct ctdb_node_map *nodemap,
290 const char *nstr, uint32_t flags)
294 struct ctdb_node_and_flags *n;
297 ret = ctdb_sock_addr_from_string(nstr, &addr, false);
299 fprintf(stderr, "Invalid IP address %s\n", nstr);
302 ctdb_sock_addr_set_port(&addr, CTDB_PORT);
305 nodemap->node = talloc_realloc(nodemap, nodemap->node,
306 struct ctdb_node_and_flags, num+1);
307 if (nodemap->node == NULL) {
311 n = &nodemap->node[num];
316 nodemap->num = num+1;
320 /* Read a nodes file into a node map */
321 static struct ctdb_node_map *ctdb_read_nodes_file(TALLOC_CTX *mem_ctx,
327 struct ctdb_node_map *nodemap;
329 nodemap = talloc_zero(mem_ctx, struct ctdb_node_map);
330 if (nodemap == NULL) {
334 lines = file_lines_load(nlist, &nlines, 0, mem_ctx);
339 while (nlines > 0 && strcmp(lines[nlines-1], "") == 0) {
343 for (i=0; i<nlines; i++) {
349 /* strip leading spaces */
350 while((*node == ' ') || (*node == '\t')) {
356 /* strip trailing spaces */
358 ((node[len-1] == ' ') || (node[len-1] == '\t')))
368 /* A "deleted" node is a node that is
369 commented out in the nodes file. This is
370 used instead of removing a line, which
371 would cause subsequent nodes to change
373 flags = NODE_FLAGS_DELETED;
374 node = discard_const("0.0.0.0");
378 if (! node_map_add(nodemap, node, flags)) {
380 TALLOC_FREE(nodemap);
389 static struct ctdb_node_map *read_nodes_file(TALLOC_CTX *mem_ctx,
392 struct ctdb_node_map *nodemap;
393 char nodes_list[PATH_MAX];
394 const char *ctdb_base;
397 ctdb_base = getenv("CTDB_BASE");
398 if (ctdb_base == NULL) {
399 D_ERR("CTDB_BASE is not set\n");
403 /* read optional node-specific nodes file */
404 num = snprintf(nodes_list, sizeof(nodes_list),
405 "%s/nodes.%d", ctdb_base, pnn);
406 if (num == sizeof(nodes_list)) {
407 D_ERR("nodes file path too long\n");
410 nodemap = ctdb_read_nodes_file(mem_ctx, nodes_list);
411 if (nodemap != NULL) {
412 /* Fake a load failure for an empty nodemap */
413 if (nodemap->num == 0) {
414 talloc_free(nodemap);
416 D_ERR("Failed to read nodes file \"%s\"\n", nodes_list);
423 /* read normal nodes file */
424 num = snprintf(nodes_list, sizeof(nodes_list), "%s/nodes", ctdb_base);
425 if (num == sizeof(nodes_list)) {
426 D_ERR("nodes file path too long\n");
429 nodemap = ctdb_read_nodes_file(mem_ctx, nodes_list);
430 if (nodemap != NULL) {
434 DBG_ERR("Failed to read nodes file \"%s\"\n", nodes_list);
438 static struct interface_map *interfaces_init(TALLOC_CTX *mem_ctx)
440 struct interface_map *iface_map;
442 iface_map = talloc_zero(mem_ctx, struct interface_map);
443 if (iface_map == NULL) {
450 /* Read interfaces information. Same format as "ctdb ifaces -Y"
452 * :Name:LinkStatus:References:
457 static bool interfaces_parse(struct interface_map *iface_map)
461 while ((fgets(line, sizeof(line), stdin) != NULL)) {
464 char *tok, *t, *name;
465 struct interface *iface;
467 if (line[0] == '\n') {
471 /* Get rid of pesky newline */
472 if ((t = strchr(line, '\n')) != NULL) {
476 if (strcmp(line, ":Name:LinkStatus:References:") == 0) {
480 /* Leading colon... */
481 // tok = strtok(line, ":");
484 tok = strtok(line, ":");
486 fprintf(stderr, "bad line (%s) - missing name\n", line);
492 tok = strtok(NULL, ":");
494 fprintf(stderr, "bad line (%s) - missing link state\n",
498 link_state = (uint16_t)strtoul(tok, NULL, 0);
501 tok = strtok(NULL, ":");
503 fprintf(stderr, "bad line (%s) - missing references\n",
507 references = (uint32_t)strtoul(tok, NULL, 0);
509 iface_map->iface = talloc_realloc(iface_map, iface_map->iface,
512 if (iface_map->iface == NULL) {
516 iface = &iface_map->iface[iface_map->num];
518 iface->name = talloc_strdup(iface_map, name);
519 if (iface->name == NULL) {
522 iface->link_up = link_state;
523 iface->references = references;
528 if (iface_map->num == 0) {
532 DEBUG(DEBUG_INFO, ("Parsing interfaces done\n"));
536 fprintf(stderr, "Parsing interfaces failed\n");
540 static struct vnn_map *vnnmap_init(TALLOC_CTX *mem_ctx)
542 struct vnn_map *vnn_map;
544 vnn_map = talloc_zero(mem_ctx, struct vnn_map);
545 if (vnn_map == NULL) {
546 fprintf(stderr, "Memory error\n");
549 vnn_map->recmode = CTDB_RECOVERY_ACTIVE;
550 vnn_map->generation = INVALID_GENERATION;
563 static bool vnnmap_parse(struct vnn_map *vnn_map)
567 while (fgets(line, sizeof(line), stdin) != NULL) {
571 if (line[0] == '\n') {
575 /* Get rid of pesky newline */
576 if ((t = strchr(line, '\n')) != NULL) {
580 n = (uint32_t) strtol(line, NULL, 0);
583 if (vnn_map->generation == INVALID_GENERATION) {
584 vnn_map->generation = n;
588 vnn_map->map = talloc_realloc(vnn_map, vnn_map->map, uint32_t,
590 if (vnn_map->map == NULL) {
591 fprintf(stderr, "Memory error\n");
595 vnn_map->map[vnn_map->size] = n;
599 if (vnn_map->size == 0) {
603 DEBUG(DEBUG_INFO, ("Parsing vnnmap done\n"));
607 fprintf(stderr, "Parsing vnnmap failed\n");
611 static bool reclock_parse(struct ctdbd_context *ctdb)
616 if (fgets(line, sizeof(line), stdin) == NULL) {
620 if (line[0] == '\n') {
624 /* Get rid of pesky newline */
625 if ((t = strchr(line, '\n')) != NULL) {
629 ctdb->reclock = talloc_strdup(ctdb, line);
630 if (ctdb->reclock == NULL) {
634 /* Swallow possible blank line following section. Picky
635 * compiler settings don't allow the return value to be
636 * ignored, so make the compiler happy.
638 if (fgets(line, sizeof(line), stdin) == NULL) {
641 DEBUG(DEBUG_INFO, ("Parsing reclock done\n"));
645 fprintf(stderr, "Parsing reclock failed\n");
649 static struct database_map *dbmap_init(TALLOC_CTX *mem_ctx,
652 struct database_map *db_map;
654 db_map = talloc_zero(mem_ctx, struct database_map);
655 if (db_map == NULL) {
659 db_map->dbdir = talloc_strdup(db_map, dbdir);
660 if (db_map->dbdir == NULL) {
668 /* Read a database map from stdin. Each line looks like:
669 * <ID> <NAME> [FLAGS] [SEQ_NUM]
670 * EOF or a blank line terminates input.
672 * By default, flags and seq_num are 0
675 static bool dbmap_parse(struct database_map *db_map)
679 while ((fgets(line, sizeof(line), stdin) != NULL)) {
682 uint32_t seq_num = 0;
687 if (line[0] == '\n') {
691 /* Get rid of pesky newline */
692 if ((t = strchr(line, '\n')) != NULL) {
697 tok = strtok(line, " \t");
699 fprintf(stderr, "bad line (%s) - missing ID\n", line);
702 id = (uint32_t)strtoul(tok, NULL, 0);
705 tok = strtok(NULL, " \t");
707 fprintf(stderr, "bad line (%s) - missing NAME\n", line);
710 name = talloc_strdup(db_map, tok);
716 tok = strtok(NULL, " \t");
717 while (tok != NULL) {
718 if (strcmp(tok, "PERSISTENT") == 0) {
719 flags |= CTDB_DB_FLAGS_PERSISTENT;
720 } else if (strcmp(tok, "STICKY") == 0) {
721 flags |= CTDB_DB_FLAGS_STICKY;
722 } else if (strcmp(tok, "READONLY") == 0) {
723 flags |= CTDB_DB_FLAGS_READONLY;
724 } else if (strcmp(tok, "REPLICATED") == 0) {
725 flags |= CTDB_DB_FLAGS_REPLICATED;
726 } else if (tok[0] >= '0'&& tok[0] <= '9') {
727 uint8_t nv = CTDB_DB_FLAGS_PERSISTENT |
728 CTDB_DB_FLAGS_REPLICATED;
730 if ((flags & nv) == 0) {
732 "seq_num for volatile db\n");
735 seq_num = (uint64_t)strtoull(tok, NULL, 0);
738 tok = strtok(NULL, " \t");
741 db = talloc_zero(db_map, struct database);
747 db->name = talloc_steal(db, name);
748 db->path = talloc_asprintf(db, "%s/%s", db_map->dbdir, name);
749 if (db->path == NULL) {
754 db->seq_num = seq_num;
756 DLIST_ADD_END(db_map->db, db);
759 if (db_map->db == NULL) {
763 DEBUG(DEBUG_INFO, ("Parsing dbmap done\n"));
767 DEBUG(DEBUG_INFO, ("Parsing dbmap failed\n"));
772 static struct database *database_find(struct database_map *db_map,
777 for (db = db_map->db; db != NULL; db = db->next) {
778 if (db->id == db_id) {
786 static int database_count(struct database_map *db_map)
791 for (db = db_map->db; db != NULL; db = db->next) {
798 static int database_flags(uint8_t db_flags)
802 if (db_flags & CTDB_DB_FLAGS_PERSISTENT) {
803 tdb_flags = TDB_DEFAULT;
805 /* volatile and replicated use the same flags */
806 tdb_flags = TDB_NOSYNC |
808 TDB_INCOMPATIBLE_HASH;
811 tdb_flags |= TDB_DISALLOW_NESTING;
816 static struct database *database_new(struct database_map *db_map,
817 const char *name, uint8_t flags)
823 db = talloc_zero(db_map, struct database);
828 db->name = talloc_strdup(db, name);
829 if (db->name == NULL) {
833 db->path = talloc_asprintf(db, "%s/%s", db_map->dbdir, name);
834 if (db->path == NULL) {
838 key.dsize = strlen(db->name) + 1;
839 key.dptr = discard_const(db->name);
841 db->id = tdb_jenkins_hash(&key);
844 tdb_flags = database_flags(flags);
846 db->tdb = tdb_open(db->path, 8192, tdb_flags, O_CREAT|O_RDWR, 0644);
847 if (db->tdb == NULL) {
848 DBG_ERR("tdb_open\n");
852 DLIST_ADD_END(db_map->db, db);
856 DBG_ERR("Memory error\n");
862 static int ltdb_store(struct database *db, TDB_DATA key,
863 struct ctdb_ltdb_header *header, TDB_DATA data)
866 bool db_volatile = true;
869 if (db->tdb == NULL) {
873 if ((db->flags & CTDB_DB_FLAGS_PERSISTENT) ||
874 (db->flags & CTDB_DB_FLAGS_REPLICATED)) {
878 if (data.dsize > 0) {
881 if (db_volatile && header->rsn == 0) {
889 rec[0].dsize = ctdb_ltdb_header_len(header);
890 rec[0].dptr = (uint8_t *)header;
892 rec[1].dsize = data.dsize;
893 rec[1].dptr = data.dptr;
895 ret = tdb_storev(db->tdb, key, rec, 2, TDB_REPLACE);
897 if (header->rsn > 0) {
898 ret = tdb_delete(db->tdb, key);
907 static int ltdb_fetch(struct database *db, TDB_DATA key,
908 struct ctdb_ltdb_header *header,
909 TALLOC_CTX *mem_ctx, TDB_DATA *data)
915 if (db->tdb == NULL) {
919 rec = tdb_fetch(db->tdb, key);
920 ret = ctdb_ltdb_header_pull(rec.dptr, rec.dsize, header, &np);
922 if (rec.dptr != NULL) {
926 *header = (struct ctdb_ltdb_header) {
932 ret = ltdb_store(db, key, header, tdb_null);
941 data->dsize = rec.dsize - ctdb_ltdb_header_len(header);
942 data->dptr = talloc_memdup(mem_ctx,
943 rec.dptr + ctdb_ltdb_header_len(header),
948 if (data->dptr == NULL) {
955 static int database_seqnum(struct database *db, uint64_t *seqnum)
957 const char *keyname = CTDB_DB_SEQNUM_KEY;
959 struct ctdb_ltdb_header header;
963 if (db->tdb == NULL) {
964 *seqnum = db->seq_num;
968 key.dptr = discard_const(keyname);
969 key.dsize = strlen(keyname) + 1;
971 ret = ltdb_fetch(db, key, &header, db, &data);
976 if (data.dsize == 0) {
981 ret = ctdb_uint64_pull(data.dptr, data.dsize, seqnum, &np);
982 talloc_free(data.dptr);
990 static int ltdb_transaction_update(uint32_t reqid,
991 struct ctdb_ltdb_header *no_header,
992 TDB_DATA key, TDB_DATA data,
995 struct database *db = (struct database *)private_data;
996 TALLOC_CTX *tmp_ctx = talloc_new(db);
997 struct ctdb_ltdb_header header = { 0 }, oldheader;
1001 if (db->tdb == NULL) {
1005 ret = ctdb_ltdb_header_extract(&data, &header);
1010 ret = ltdb_fetch(db, key, &oldheader, tmp_ctx, &olddata);
1015 if (olddata.dsize > 0) {
1016 if (oldheader.rsn > header.rsn ||
1017 (oldheader.rsn == header.rsn &&
1018 olddata.dsize != data.dsize)) {
1023 talloc_free(tmp_ctx);
1025 ret = ltdb_store(db, key, &header, data);
1029 static int ltdb_transaction(struct database *db,
1030 struct ctdb_rec_buffer *recbuf)
1034 if (db->tdb == NULL) {
1038 ret = tdb_transaction_start(db->tdb);
1043 ret = ctdb_rec_buffer_traverse(recbuf, ltdb_transaction_update, db);
1045 tdb_transaction_cancel(db->tdb);
1048 ret = tdb_transaction_commit(db->tdb);
1052 static bool public_ips_parse(struct ctdbd_context *ctdb,
1057 if (numnodes == 0) {
1058 D_ERR("Must initialise nodemap before public IPs\n");
1062 ctdb->known_ips = ipalloc_read_known_ips(ctdb, numnodes, false);
1064 status = (ctdb->known_ips != NULL && ctdb->known_ips->num != 0);
1067 D_INFO("Parsing public IPs done\n");
1069 D_INFO("Parsing public IPs failed\n");
1075 /* Read information about controls to fail. Format is:
1076 * <opcode> <pnn> {ERROR|TIMEOUT} <comment>
1078 static bool control_failures_parse(struct ctdbd_context *ctdb)
1082 while ((fgets(line, sizeof(line), stdin) != NULL)) {
1084 enum ctdb_controls opcode;
1087 const char *comment;
1088 struct fake_control_failure *failure = NULL;
1090 if (line[0] == '\n') {
1094 /* Get rid of pesky newline */
1095 if ((t = strchr(line, '\n')) != NULL) {
1100 tok = strtok(line, " \t");
1102 D_ERR("bad line (%s) - missing opcode\n", line);
1105 opcode = (enum ctdb_controls)strtoul(tok, NULL, 0);
1108 tok = strtok(NULL, " \t");
1110 D_ERR("bad line (%s) - missing PNN\n", line);
1113 pnn = (uint32_t)strtoul(tok, NULL, 0);
1116 tok = strtok(NULL, " \t");
1118 D_ERR("bad line (%s) - missing errno\n", line);
1121 error = talloc_strdup(ctdb, tok);
1122 if (error == NULL) {
1125 if (strcmp(error, "ERROR") != 0 &&
1126 strcmp(error, "TIMEOUT") != 0) {
1127 D_ERR("bad line (%s) "
1128 "- error must be \"ERROR\" or \"TIMEOUT\"\n",
1134 tok = strtok(NULL, "\n"); /* rest of line */
1136 D_ERR("bad line (%s) - missing comment\n", line);
1139 comment = talloc_strdup(ctdb, tok);
1140 if (comment == NULL) {
1144 failure = talloc_zero(ctdb, struct fake_control_failure);
1145 if (failure == NULL) {
1149 failure->opcode = opcode;
1151 failure->error = error;
1152 failure->comment = comment;
1154 DLIST_ADD(ctdb->control_failures, failure);
1157 if (ctdb->control_failures == NULL) {
1161 D_INFO("Parsing fake control failures done\n");
1165 D_INFO("Parsing fake control failures failed\n");
1169 static bool runstate_parse(struct ctdbd_context *ctdb)
1174 if (fgets(line, sizeof(line), stdin) == NULL) {
1178 if (line[0] == '\n') {
1182 /* Get rid of pesky newline */
1183 if ((t = strchr(line, '\n')) != NULL) {
1187 ctdb->runstate = ctdb_runstate_from_string(line);
1188 if (ctdb->runstate == CTDB_RUNSTATE_UNKNOWN) {
1192 /* Swallow possible blank line following section. Picky
1193 * compiler settings don't allow the return value to be
1194 * ignored, so make the compiler happy.
1196 if (fgets(line, sizeof(line), stdin) == NULL) {
1199 D_INFO("Parsing runstate done\n");
1203 D_ERR("Parsing runstate failed\n");
1211 static int ctdb_client_destructor(struct ctdb_client *client)
1213 DLIST_REMOVE(client->ctdb->client_list, client);
1217 static int client_add(struct ctdbd_context *ctdb, pid_t client_pid,
1220 struct ctdb_client *client;
1222 client = talloc_zero(client_state, struct ctdb_client);
1223 if (client == NULL) {
1227 client->ctdb = ctdb;
1228 client->pid = client_pid;
1229 client->state = client_state;
1231 DLIST_ADD(ctdb->client_list, client);
1232 talloc_set_destructor(client, ctdb_client_destructor);
1236 static void *client_find(struct ctdbd_context *ctdb, pid_t client_pid)
1238 struct ctdb_client *client;
1240 for (client=ctdb->client_list; client != NULL; client=client->next) {
1241 if (client->pid == client_pid) {
1242 return client->state;
1250 * CTDB context setup
1253 static uint32_t new_generation(uint32_t old_generation)
1255 uint32_t generation;
1258 generation = random();
1259 if (generation != INVALID_GENERATION &&
1260 generation != old_generation) {
1268 static struct ctdbd_context *ctdbd_setup(TALLOC_CTX *mem_ctx,
1271 struct ctdbd_context *ctdb;
1276 ctdb = talloc_zero(mem_ctx, struct ctdbd_context);
1281 ctdb->node_map = nodemap_init(ctdb);
1282 if (ctdb->node_map == NULL) {
1286 ctdb->iface_map = interfaces_init(ctdb);
1287 if (ctdb->iface_map == NULL) {
1291 ctdb->vnn_map = vnnmap_init(ctdb);
1292 if (ctdb->vnn_map == NULL) {
1296 ctdb->db_map = dbmap_init(ctdb, dbdir);
1297 if (ctdb->db_map == NULL) {
1301 ret = srvid_init(ctdb, &ctdb->srv);
1306 ctdb->runstate = CTDB_RUNSTATE_RUNNING;
1308 while (fgets(line, sizeof(line), stdin) != NULL) {
1311 if ((t = strchr(line, '\n')) != NULL) {
1315 if (strcmp(line, "NODEMAP") == 0) {
1316 status = nodemap_parse(ctdb->node_map);
1317 } else if (strcmp(line, "IFACES") == 0) {
1318 status = interfaces_parse(ctdb->iface_map);
1319 } else if (strcmp(line, "VNNMAP") == 0) {
1320 status = vnnmap_parse(ctdb->vnn_map);
1321 } else if (strcmp(line, "DBMAP") == 0) {
1322 status = dbmap_parse(ctdb->db_map);
1323 } else if (strcmp(line, "PUBLICIPS") == 0) {
1324 status = public_ips_parse(ctdb,
1325 ctdb->node_map->num_nodes);
1326 } else if (strcmp(line, "RECLOCK") == 0) {
1327 status = reclock_parse(ctdb);
1328 } else if (strcmp(line, "CONTROLFAILS") == 0) {
1329 status = control_failures_parse(ctdb);
1330 } else if (strcmp(line, "RUNSTATE") == 0) {
1331 status = runstate_parse(ctdb);
1333 fprintf(stderr, "Unknown line %s\n", line);
1342 ctdb->start_time = tevent_timeval_current();
1343 ctdb->recovery_start_time = tevent_timeval_current();
1344 ctdb->vnn_map->recmode = CTDB_RECOVERY_NORMAL;
1345 if (ctdb->vnn_map->generation == INVALID_GENERATION) {
1346 ctdb->vnn_map->generation =
1347 new_generation(ctdb->vnn_map->generation);
1349 ctdb->recovery_end_time = tevent_timeval_current();
1351 ctdb->log_level = DEBUG_ERR;
1353 ctdb_tunable_set_defaults(&ctdb->tun_list);
1362 static bool ctdbd_verify(struct ctdbd_context *ctdb)
1367 if (ctdb->node_map->num_nodes == 0) {
1371 /* Make sure all the nodes are in order */
1372 for (i=0; i<ctdb->node_map->num_nodes; i++) {
1373 node = &ctdb->node_map->node[i];
1374 if (node->pnn != i) {
1375 fprintf(stderr, "Expected node %u, found %u\n",
1381 node = &ctdb->node_map->node[ctdb->node_map->pnn];
1382 if (node->flags & NODE_FLAGS_DISCONNECTED) {
1383 DEBUG(DEBUG_INFO, ("Node disconnected, exiting\n"));
1394 struct recover_state {
1395 struct tevent_context *ev;
1396 struct ctdbd_context *ctdb;
1399 static int recover_check(struct tevent_req *req);
1400 static void recover_wait_done(struct tevent_req *subreq);
1401 static void recover_done(struct tevent_req *subreq);
1403 static struct tevent_req *recover_send(TALLOC_CTX *mem_ctx,
1404 struct tevent_context *ev,
1405 struct ctdbd_context *ctdb)
1407 struct tevent_req *req;
1408 struct recover_state *state;
1411 req = tevent_req_create(mem_ctx, &state, struct recover_state);
1419 ret = recover_check(req);
1421 tevent_req_error(req, ret);
1422 return tevent_req_post(req, ev);
1428 static int recover_check(struct tevent_req *req)
1430 struct recover_state *state = tevent_req_data(
1431 req, struct recover_state);
1432 struct ctdbd_context *ctdb = state->ctdb;
1433 struct tevent_req *subreq;
1434 bool recovery_disabled;
1437 recovery_disabled = false;
1438 for (i=0; i<ctdb->node_map->num_nodes; i++) {
1439 if (ctdb->node_map->node[i].recovery_disabled) {
1440 recovery_disabled = true;
1445 subreq = tevent_wakeup_send(state, state->ev,
1446 tevent_timeval_current_ofs(1, 0));
1447 if (subreq == NULL) {
1451 if (recovery_disabled) {
1452 tevent_req_set_callback(subreq, recover_wait_done, req);
1454 ctdb->recovery_start_time = tevent_timeval_current();
1455 tevent_req_set_callback(subreq, recover_done, req);
1461 static void recover_wait_done(struct tevent_req *subreq)
1463 struct tevent_req *req = tevent_req_callback_data(
1464 subreq, struct tevent_req);
1468 status = tevent_wakeup_recv(subreq);
1469 TALLOC_FREE(subreq);
1471 tevent_req_error(req, EIO);
1475 ret = recover_check(req);
1477 tevent_req_error(req, ret);
1481 static void recover_done(struct tevent_req *subreq)
1483 struct tevent_req *req = tevent_req_callback_data(
1484 subreq, struct tevent_req);
1485 struct recover_state *state = tevent_req_data(
1486 req, struct recover_state);
1487 struct ctdbd_context *ctdb = state->ctdb;
1490 status = tevent_wakeup_recv(subreq);
1491 TALLOC_FREE(subreq);
1493 tevent_req_error(req, EIO);
1497 ctdb->vnn_map->recmode = CTDB_RECOVERY_NORMAL;
1498 ctdb->recovery_end_time = tevent_timeval_current();
1499 ctdb->vnn_map->generation = new_generation(ctdb->vnn_map->generation);
1501 tevent_req_done(req);
1504 static bool recover_recv(struct tevent_req *req, int *perr)
1508 if (tevent_req_is_unix_error(req, &err)) {
1519 * Routines for ctdb_req_header
1522 static void header_fix_pnn(struct ctdb_req_header *header,
1523 struct ctdbd_context *ctdb)
1525 if (header->srcnode == CTDB_CURRENT_NODE) {
1526 header->srcnode = ctdb->node_map->pnn;
1529 if (header->destnode == CTDB_CURRENT_NODE) {
1530 header->destnode = ctdb->node_map->pnn;
1534 static struct ctdb_req_header header_reply_call(
1535 struct ctdb_req_header *header,
1536 struct ctdbd_context *ctdb)
1538 struct ctdb_req_header reply_header;
1540 reply_header = (struct ctdb_req_header) {
1541 .ctdb_magic = CTDB_MAGIC,
1542 .ctdb_version = CTDB_PROTOCOL,
1543 .generation = ctdb->vnn_map->generation,
1544 .operation = CTDB_REPLY_CALL,
1545 .destnode = header->srcnode,
1546 .srcnode = header->destnode,
1547 .reqid = header->reqid,
1550 return reply_header;
1553 static struct ctdb_req_header header_reply_control(
1554 struct ctdb_req_header *header,
1555 struct ctdbd_context *ctdb)
1557 struct ctdb_req_header reply_header;
1559 reply_header = (struct ctdb_req_header) {
1560 .ctdb_magic = CTDB_MAGIC,
1561 .ctdb_version = CTDB_PROTOCOL,
1562 .generation = ctdb->vnn_map->generation,
1563 .operation = CTDB_REPLY_CONTROL,
1564 .destnode = header->srcnode,
1565 .srcnode = header->destnode,
1566 .reqid = header->reqid,
1569 return reply_header;
1572 static struct ctdb_req_header header_reply_message(
1573 struct ctdb_req_header *header,
1574 struct ctdbd_context *ctdb)
1576 struct ctdb_req_header reply_header;
1578 reply_header = (struct ctdb_req_header) {
1579 .ctdb_magic = CTDB_MAGIC,
1580 .ctdb_version = CTDB_PROTOCOL,
1581 .generation = ctdb->vnn_map->generation,
1582 .operation = CTDB_REQ_MESSAGE,
1583 .destnode = header->srcnode,
1584 .srcnode = header->destnode,
1588 return reply_header;
1595 struct client_state {
1596 struct tevent_context *ev;
1598 struct ctdbd_context *ctdb;
1601 struct comm_context *comm;
1602 struct srvid_register_state *rstate;
1607 * Send replies to call, controls and messages
1610 static void client_reply_done(struct tevent_req *subreq);
1612 static void client_send_call(struct tevent_req *req,
1613 struct ctdb_req_header *header,
1614 struct ctdb_reply_call *reply)
1616 struct client_state *state = tevent_req_data(
1617 req, struct client_state);
1618 struct ctdbd_context *ctdb = state->ctdb;
1619 struct tevent_req *subreq;
1620 struct ctdb_req_header reply_header;
1622 size_t datalen, buflen;
1625 reply_header = header_reply_call(header, ctdb);
1627 datalen = ctdb_reply_call_len(&reply_header, reply);
1628 ret = ctdb_allocate_pkt(state, datalen, &buf, &buflen);
1630 tevent_req_error(req, ret);
1634 ret = ctdb_reply_call_push(&reply_header, reply, buf, &buflen);
1636 tevent_req_error(req, ret);
1640 subreq = comm_write_send(state, state->ev, state->comm, buf, buflen);
1641 if (tevent_req_nomem(subreq, req)) {
1644 tevent_req_set_callback(subreq, client_reply_done, req);
1646 talloc_steal(subreq, buf);
1649 static void client_send_message(struct tevent_req *req,
1650 struct ctdb_req_header *header,
1651 struct ctdb_req_message_data *message)
1653 struct client_state *state = tevent_req_data(
1654 req, struct client_state);
1655 struct ctdbd_context *ctdb = state->ctdb;
1656 struct tevent_req *subreq;
1657 struct ctdb_req_header reply_header;
1659 size_t datalen, buflen;
1662 reply_header = header_reply_message(header, ctdb);
1664 datalen = ctdb_req_message_data_len(&reply_header, message);
1665 ret = ctdb_allocate_pkt(state, datalen, &buf, &buflen);
1667 tevent_req_error(req, ret);
1671 ret = ctdb_req_message_data_push(&reply_header, message,
1674 tevent_req_error(req, ret);
1678 DEBUG(DEBUG_INFO, ("message srvid = 0x%"PRIx64"\n", message->srvid));
1680 subreq = comm_write_send(state, state->ev, state->comm, buf, buflen);
1681 if (tevent_req_nomem(subreq, req)) {
1684 tevent_req_set_callback(subreq, client_reply_done, req);
1686 talloc_steal(subreq, buf);
1689 static void client_send_control(struct tevent_req *req,
1690 struct ctdb_req_header *header,
1691 struct ctdb_reply_control *reply)
1693 struct client_state *state = tevent_req_data(
1694 req, struct client_state);
1695 struct ctdbd_context *ctdb = state->ctdb;
1696 struct tevent_req *subreq;
1697 struct ctdb_req_header reply_header;
1699 size_t datalen, buflen;
1702 reply_header = header_reply_control(header, ctdb);
1704 datalen = ctdb_reply_control_len(&reply_header, reply);
1705 ret = ctdb_allocate_pkt(state, datalen, &buf, &buflen);
1707 tevent_req_error(req, ret);
1711 ret = ctdb_reply_control_push(&reply_header, reply, buf, &buflen);
1713 tevent_req_error(req, ret);
1717 DEBUG(DEBUG_INFO, ("reply opcode = %u\n", reply->rdata.opcode));
1719 subreq = comm_write_send(state, state->ev, state->comm, buf, buflen);
1720 if (tevent_req_nomem(subreq, req)) {
1723 tevent_req_set_callback(subreq, client_reply_done, req);
1725 talloc_steal(subreq, buf);
1728 static void client_reply_done(struct tevent_req *subreq)
1730 struct tevent_req *req = tevent_req_callback_data(
1731 subreq, struct tevent_req);
1735 status = comm_write_recv(subreq, &ret);
1736 TALLOC_FREE(subreq);
1738 tevent_req_error(req, ret);
1743 * Handling protocol - controls
1746 static void control_process_exists(TALLOC_CTX *mem_ctx,
1747 struct tevent_req *req,
1748 struct ctdb_req_header *header,
1749 struct ctdb_req_control *request)
1751 struct client_state *state = tevent_req_data(
1752 req, struct client_state);
1753 struct ctdbd_context *ctdb = state->ctdb;
1754 struct client_state *cstate;
1755 struct ctdb_reply_control reply;
1757 reply.rdata.opcode = request->opcode;
1759 cstate = client_find(ctdb, request->rdata.data.pid);
1760 if (cstate == NULL) {
1762 reply.errmsg = "No client for PID";
1764 reply.status = kill(request->rdata.data.pid, 0);
1765 reply.errmsg = NULL;
1768 client_send_control(req, header, &reply);
1771 static void control_ping(TALLOC_CTX *mem_ctx,
1772 struct tevent_req *req,
1773 struct ctdb_req_header *header,
1774 struct ctdb_req_control *request)
1776 struct client_state *state = tevent_req_data(
1777 req, struct client_state);
1778 struct ctdbd_context *ctdb = state->ctdb;
1779 struct ctdb_reply_control reply;
1781 reply.rdata.opcode = request->opcode;
1782 reply.status = ctdb->num_clients;
1783 reply.errmsg = NULL;
1785 client_send_control(req, header, &reply);
1788 static void control_getdbpath(TALLOC_CTX *mem_ctx,
1789 struct tevent_req *req,
1790 struct ctdb_req_header *header,
1791 struct ctdb_req_control *request)
1793 struct client_state *state = tevent_req_data(
1794 req, struct client_state);
1795 struct ctdbd_context *ctdb = state->ctdb;
1796 struct ctdb_reply_control reply;
1797 struct database *db;
1799 reply.rdata.opcode = request->opcode;
1801 db = database_find(ctdb->db_map, request->rdata.data.db_id);
1803 reply.status = ENOENT;
1804 reply.errmsg = "Database not found";
1806 reply.rdata.data.db_path =
1807 talloc_strdup(mem_ctx, db->path);
1808 if (reply.rdata.data.db_path == NULL) {
1809 reply.status = ENOMEM;
1810 reply.errmsg = "Memory error";
1813 reply.errmsg = NULL;
1817 client_send_control(req, header, &reply);
1820 static void control_getvnnmap(TALLOC_CTX *mem_ctx,
1821 struct tevent_req *req,
1822 struct ctdb_req_header *header,
1823 struct ctdb_req_control *request)
1825 struct client_state *state = tevent_req_data(
1826 req, struct client_state);
1827 struct ctdbd_context *ctdb = state->ctdb;
1828 struct ctdb_reply_control reply;
1829 struct ctdb_vnn_map *vnnmap;
1831 reply.rdata.opcode = request->opcode;
1833 vnnmap = talloc_zero(mem_ctx, struct ctdb_vnn_map);
1834 if (vnnmap == NULL) {
1835 reply.status = ENOMEM;
1836 reply.errmsg = "Memory error";
1838 vnnmap->generation = ctdb->vnn_map->generation;
1839 vnnmap->size = ctdb->vnn_map->size;
1840 vnnmap->map = ctdb->vnn_map->map;
1842 reply.rdata.data.vnnmap = vnnmap;
1844 reply.errmsg = NULL;
1847 client_send_control(req, header, &reply);
1850 static void control_get_debug(TALLOC_CTX *mem_ctx,
1851 struct tevent_req *req,
1852 struct ctdb_req_header *header,
1853 struct ctdb_req_control *request)
1855 struct client_state *state = tevent_req_data(
1856 req, struct client_state);
1857 struct ctdbd_context *ctdb = state->ctdb;
1858 struct ctdb_reply_control reply;
1860 reply.rdata.opcode = request->opcode;
1861 reply.rdata.data.loglevel = (uint32_t)ctdb->log_level;
1863 reply.errmsg = NULL;
1865 client_send_control(req, header, &reply);
1868 static void control_set_debug(TALLOC_CTX *mem_ctx,
1869 struct tevent_req *req,
1870 struct ctdb_req_header *header,
1871 struct ctdb_req_control *request)
1873 struct client_state *state = tevent_req_data(
1874 req, struct client_state);
1875 struct ctdbd_context *ctdb = state->ctdb;
1876 struct ctdb_reply_control reply;
1878 ctdb->log_level = (int)request->rdata.data.loglevel;
1880 reply.rdata.opcode = request->opcode;
1882 reply.errmsg = NULL;
1884 client_send_control(req, header, &reply);
1887 static void control_get_dbmap(TALLOC_CTX *mem_ctx,
1888 struct tevent_req *req,
1889 struct ctdb_req_header *header,
1890 struct ctdb_req_control *request)
1892 struct client_state *state = tevent_req_data(
1893 req, struct client_state);
1894 struct ctdbd_context *ctdb = state->ctdb;
1895 struct ctdb_reply_control reply;
1896 struct ctdb_dbid_map *dbmap;
1897 struct database *db;
1900 reply.rdata.opcode = request->opcode;
1902 dbmap = talloc_zero(mem_ctx, struct ctdb_dbid_map);
1903 if (dbmap == NULL) {
1907 dbmap->num = database_count(ctdb->db_map);
1908 dbmap->dbs = talloc_zero_array(dbmap, struct ctdb_dbid, dbmap->num);
1909 if (dbmap->dbs == NULL) {
1913 db = ctdb->db_map->db;
1914 for (i = 0; i < dbmap->num; i++) {
1915 dbmap->dbs[i] = (struct ctdb_dbid) {
1923 reply.rdata.data.dbmap = dbmap;
1925 reply.errmsg = NULL;
1926 client_send_control(req, header, &reply);
1931 reply.errmsg = "Memory error";
1932 client_send_control(req, header, &reply);
1935 static void control_get_recmode(TALLOC_CTX *mem_ctx,
1936 struct tevent_req *req,
1937 struct ctdb_req_header *header,
1938 struct ctdb_req_control *request)
1940 struct client_state *state = tevent_req_data(
1941 req, struct client_state);
1942 struct ctdbd_context *ctdb = state->ctdb;
1943 struct ctdb_reply_control reply;
1945 reply.rdata.opcode = request->opcode;
1946 reply.status = ctdb->vnn_map->recmode;
1947 reply.errmsg = NULL;
1949 client_send_control(req, header, &reply);
1952 struct set_recmode_state {
1953 struct tevent_req *req;
1954 struct ctdbd_context *ctdb;
1955 struct ctdb_req_header header;
1956 struct ctdb_reply_control reply;
1959 static void set_recmode_callback(struct tevent_req *subreq)
1961 struct set_recmode_state *substate = tevent_req_callback_data(
1962 subreq, struct set_recmode_state);
1966 status = recover_recv(subreq, &ret);
1967 TALLOC_FREE(subreq);
1969 substate->reply.status = ret;
1970 substate->reply.errmsg = "recovery failed";
1972 substate->reply.status = 0;
1973 substate->reply.errmsg = NULL;
1976 client_send_control(substate->req, &substate->header, &substate->reply);
1977 talloc_free(substate);
1980 static void control_set_recmode(TALLOC_CTX *mem_ctx,
1981 struct tevent_req *req,
1982 struct ctdb_req_header *header,
1983 struct ctdb_req_control *request)
1985 struct client_state *state = tevent_req_data(
1986 req, struct client_state);
1987 struct tevent_req *subreq;
1988 struct ctdbd_context *ctdb = state->ctdb;
1989 struct set_recmode_state *substate;
1990 struct ctdb_reply_control reply;
1992 reply.rdata.opcode = request->opcode;
1994 if (request->rdata.data.recmode == CTDB_RECOVERY_NORMAL) {
1996 reply.errmsg = "Client cannot set recmode to NORMAL";
2000 substate = talloc_zero(ctdb, struct set_recmode_state);
2001 if (substate == NULL) {
2003 reply.errmsg = "Memory error";
2007 substate->req = req;
2008 substate->ctdb = ctdb;
2009 substate->header = *header;
2010 substate->reply.rdata.opcode = request->opcode;
2012 subreq = recover_send(substate, state->ev, state->ctdb);
2013 if (subreq == NULL) {
2014 talloc_free(substate);
2017 tevent_req_set_callback(subreq, set_recmode_callback, substate);
2019 ctdb->vnn_map->recmode = CTDB_RECOVERY_ACTIVE;
2023 client_send_control(req, header, &reply);
2027 static void control_db_attach(TALLOC_CTX *mem_ctx,
2028 struct tevent_req *req,
2029 struct ctdb_req_header *header,
2030 struct ctdb_req_control *request)
2032 struct client_state *state = tevent_req_data(
2033 req, struct client_state);
2034 struct ctdbd_context *ctdb = state->ctdb;
2035 struct ctdb_reply_control reply;
2036 struct database *db;
2038 reply.rdata.opcode = request->opcode;
2040 for (db = ctdb->db_map->db; db != NULL; db = db->next) {
2041 if (strcmp(db->name, request->rdata.data.db_name) == 0) {
2046 db = database_new(ctdb->db_map, request->rdata.data.db_name, 0);
2049 reply.errmsg = "Failed to attach database";
2050 client_send_control(req, header, &reply);
2055 reply.rdata.data.db_id = db->id;
2057 reply.errmsg = NULL;
2058 client_send_control(req, header, &reply);
2061 static void srvid_handler_done(struct tevent_req *subreq);
2063 static void srvid_handler(uint64_t srvid, TDB_DATA data, void *private_data)
2065 struct client_state *state = talloc_get_type_abort(
2066 private_data, struct client_state);
2067 struct ctdbd_context *ctdb = state->ctdb;
2068 struct tevent_req *subreq;
2069 struct ctdb_req_header request_header;
2070 struct ctdb_req_message_data message;
2072 size_t datalen, buflen;
2075 request_header = (struct ctdb_req_header) {
2076 .ctdb_magic = CTDB_MAGIC,
2077 .ctdb_version = CTDB_PROTOCOL,
2078 .generation = ctdb->vnn_map->generation,
2079 .operation = CTDB_REQ_MESSAGE,
2080 .destnode = state->pnn,
2081 .srcnode = ctdb->node_map->recmaster,
2085 message = (struct ctdb_req_message_data) {
2090 datalen = ctdb_req_message_data_len(&request_header, &message);
2091 ret = ctdb_allocate_pkt(state, datalen, &buf, &buflen);
2096 ret = ctdb_req_message_data_push(&request_header,
2105 subreq = comm_write_send(state, state->ev, state->comm, buf, buflen);
2106 if (subreq == NULL) {
2110 tevent_req_set_callback(subreq, srvid_handler_done, state);
2112 talloc_steal(subreq, buf);
2115 static void srvid_handler_done(struct tevent_req *subreq)
2117 struct client_state *state = tevent_req_callback_data(
2118 subreq, struct client_state);
2122 ok = comm_write_recv(subreq, &ret);
2123 TALLOC_FREE(subreq);
2126 ("Failed to dispatch message to client pid=%u, ret=%d\n",
2132 static void control_register_srvid(TALLOC_CTX *mem_ctx,
2133 struct tevent_req *req,
2134 struct ctdb_req_header *header,
2135 struct ctdb_req_control *request)
2137 struct client_state *state = tevent_req_data(
2138 req, struct client_state);
2139 struct ctdbd_context *ctdb = state->ctdb;
2140 struct ctdb_reply_control reply;
2143 reply.rdata.opcode = request->opcode;
2145 ret = srvid_register(ctdb->srv, state, request->srvid,
2146 srvid_handler, state);
2149 reply.errmsg = "Memory error";
2153 DEBUG(DEBUG_INFO, ("Register srvid 0x%"PRIx64"\n", request->srvid));
2156 reply.errmsg = NULL;
2159 client_send_control(req, header, &reply);
2162 static void control_deregister_srvid(TALLOC_CTX *mem_ctx,
2163 struct tevent_req *req,
2164 struct ctdb_req_header *header,
2165 struct ctdb_req_control *request)
2167 struct client_state *state = tevent_req_data(
2168 req, struct client_state);
2169 struct ctdbd_context *ctdb = state->ctdb;
2170 struct ctdb_reply_control reply;
2173 reply.rdata.opcode = request->opcode;
2175 ret = srvid_deregister(ctdb->srv, request->srvid, state);
2178 reply.errmsg = "srvid not registered";
2182 DEBUG(DEBUG_INFO, ("Deregister srvid 0x%"PRIx64"\n", request->srvid));
2185 reply.errmsg = NULL;
2188 client_send_control(req, header, &reply);
2191 static void control_get_dbname(TALLOC_CTX *mem_ctx,
2192 struct tevent_req *req,
2193 struct ctdb_req_header *header,
2194 struct ctdb_req_control *request)
2196 struct client_state *state = tevent_req_data(
2197 req, struct client_state);
2198 struct ctdbd_context *ctdb = state->ctdb;
2199 struct ctdb_reply_control reply;
2200 struct database *db;
2202 reply.rdata.opcode = request->opcode;
2204 db = database_find(ctdb->db_map, request->rdata.data.db_id);
2206 reply.status = ENOENT;
2207 reply.errmsg = "Database not found";
2209 reply.rdata.data.db_name = talloc_strdup(mem_ctx, db->name);
2210 if (reply.rdata.data.db_name == NULL) {
2211 reply.status = ENOMEM;
2212 reply.errmsg = "Memory error";
2215 reply.errmsg = NULL;
2219 client_send_control(req, header, &reply);
2222 static void control_get_pid(TALLOC_CTX *mem_ctx,
2223 struct tevent_req *req,
2224 struct ctdb_req_header *header,
2225 struct ctdb_req_control *request)
2227 struct ctdb_reply_control reply;
2229 reply.rdata.opcode = request->opcode;
2230 reply.status = getpid();
2231 reply.errmsg = NULL;
2233 client_send_control(req, header, &reply);
2236 static void control_get_pnn(TALLOC_CTX *mem_ctx,
2237 struct tevent_req *req,
2238 struct ctdb_req_header *header,
2239 struct ctdb_req_control *request)
2241 struct ctdb_reply_control reply;
2243 reply.rdata.opcode = request->opcode;
2244 reply.status = header->destnode;
2245 reply.errmsg = NULL;
2247 client_send_control(req, header, &reply);
2250 static void control_shutdown(TALLOC_CTX *mem_ctx,
2251 struct tevent_req *req,
2252 struct ctdb_req_header *hdr,
2253 struct ctdb_req_control *request)
2255 struct client_state *state = tevent_req_data(
2256 req, struct client_state);
2261 static void control_set_tunable(TALLOC_CTX *mem_ctx,
2262 struct tevent_req *req,
2263 struct ctdb_req_header *header,
2264 struct ctdb_req_control *request)
2266 struct client_state *state = tevent_req_data(
2267 req, struct client_state);
2268 struct ctdbd_context *ctdb = state->ctdb;
2269 struct ctdb_reply_control reply;
2272 reply.rdata.opcode = request->opcode;
2273 reply.errmsg = NULL;
2275 ret = ctdb_tunable_set_value(&ctdb->tun_list,
2276 request->rdata.data.tunable->name,
2277 request->rdata.data.tunable->value,
2281 } else if (obsolete) {
2287 client_send_control(req, header, &reply);
2290 static void control_get_tunable(TALLOC_CTX *mem_ctx,
2291 struct tevent_req *req,
2292 struct ctdb_req_header *header,
2293 struct ctdb_req_control *request)
2295 struct client_state *state = tevent_req_data(
2296 req, struct client_state);
2297 struct ctdbd_context *ctdb = state->ctdb;
2298 struct ctdb_reply_control reply;
2302 reply.rdata.opcode = request->opcode;
2303 reply.errmsg = NULL;
2305 ret = ctdb_tunable_get_value(&ctdb->tun_list,
2306 request->rdata.data.tun_var, &value);
2310 reply.rdata.data.tun_value = value;
2314 client_send_control(req, header, &reply);
2317 static void control_list_tunables(TALLOC_CTX *mem_ctx,
2318 struct tevent_req *req,
2319 struct ctdb_req_header *header,
2320 struct ctdb_req_control *request)
2322 struct ctdb_reply_control reply;
2323 struct ctdb_var_list *var_list;
2325 reply.rdata.opcode = request->opcode;
2326 reply.errmsg = NULL;
2328 var_list = ctdb_tunable_names(mem_ctx);
2329 if (var_list == NULL) {
2332 reply.rdata.data.tun_var_list = var_list;
2336 client_send_control(req, header, &reply);
2339 static void control_modify_flags(TALLOC_CTX *mem_ctx,
2340 struct tevent_req *req,
2341 struct ctdb_req_header *header,
2342 struct ctdb_req_control *request)
2344 struct client_state *state = tevent_req_data(
2345 req, struct client_state);
2346 struct ctdbd_context *ctdb = state->ctdb;
2347 struct ctdb_node_flag_change *change = request->rdata.data.flag_change;
2348 struct ctdb_reply_control reply;
2351 reply.rdata.opcode = request->opcode;
2353 if ((change->old_flags & ~NODE_FLAGS_PERMANENTLY_DISABLED) ||
2354 (change->new_flags & ~NODE_FLAGS_PERMANENTLY_DISABLED) != 0) {
2356 ("MODIFY_FLAGS control not for PERMANENTLY_DISABLED\n"));
2357 reply.status = EINVAL;
2358 reply.errmsg = "Failed to MODIFY_FLAGS";
2359 client_send_control(req, header, &reply);
2363 /* There's all sorts of broadcast weirdness here. Only change
2364 * the specified node, not the destination node of the
2366 node = &ctdb->node_map->node[change->pnn];
2369 change->old_flags & NODE_FLAGS_PERMANENTLY_DISABLED) == 0 &&
2370 (change->new_flags & NODE_FLAGS_PERMANENTLY_DISABLED) != 0) {
2371 DEBUG(DEBUG_INFO,("Disabling node %d\n", header->destnode));
2372 node->flags |= NODE_FLAGS_PERMANENTLY_DISABLED;
2377 change->old_flags & NODE_FLAGS_PERMANENTLY_DISABLED) != 0 &&
2378 (change->new_flags & NODE_FLAGS_PERMANENTLY_DISABLED) == 0) {
2379 DEBUG(DEBUG_INFO,("Enabling node %d\n", header->destnode));
2380 node->flags &= ~NODE_FLAGS_PERMANENTLY_DISABLED;
2384 DEBUG(DEBUG_INFO, ("Flags unchanged for node %d\n", header->destnode));
2388 reply.errmsg = NULL;
2389 client_send_control(req, header, &reply);
2392 static void control_get_all_tunables(TALLOC_CTX *mem_ctx,
2393 struct tevent_req *req,
2394 struct ctdb_req_header *header,
2395 struct ctdb_req_control *request)
2397 struct client_state *state = tevent_req_data(
2398 req, struct client_state);
2399 struct ctdbd_context *ctdb = state->ctdb;
2400 struct ctdb_reply_control reply;
2402 reply.rdata.opcode = request->opcode;
2403 reply.rdata.data.tun_list = &ctdb->tun_list;
2405 reply.errmsg = NULL;
2407 client_send_control(req, header, &reply);
2410 static void control_db_attach_persistent(TALLOC_CTX *mem_ctx,
2411 struct tevent_req *req,
2412 struct ctdb_req_header *header,
2413 struct ctdb_req_control *request)
2415 struct client_state *state = tevent_req_data(
2416 req, struct client_state);
2417 struct ctdbd_context *ctdb = state->ctdb;
2418 struct ctdb_reply_control reply;
2419 struct database *db;
2421 reply.rdata.opcode = request->opcode;
2423 for (db = ctdb->db_map->db; db != NULL; db = db->next) {
2424 if (strcmp(db->name, request->rdata.data.db_name) == 0) {
2429 db = database_new(ctdb->db_map, request->rdata.data.db_name,
2430 CTDB_DB_FLAGS_PERSISTENT);
2433 reply.errmsg = "Failed to attach database";
2434 client_send_control(req, header, &reply);
2439 reply.rdata.data.db_id = db->id;
2441 reply.errmsg = NULL;
2442 client_send_control(req, header, &reply);
2445 static void control_uptime(TALLOC_CTX *mem_ctx,
2446 struct tevent_req *req,
2447 struct ctdb_req_header *header,
2448 struct ctdb_req_control *request)
2450 struct client_state *state = tevent_req_data(
2451 req, struct client_state);
2452 struct ctdbd_context *ctdb = state->ctdb;
2453 struct ctdb_reply_control reply;
2454 struct ctdb_uptime *uptime;;
2456 reply.rdata.opcode = request->opcode;
2458 uptime = talloc_zero(mem_ctx, struct ctdb_uptime);
2459 if (uptime == NULL) {
2463 uptime->current_time = tevent_timeval_current();
2464 uptime->ctdbd_start_time = ctdb->start_time;
2465 uptime->last_recovery_started = ctdb->recovery_start_time;
2466 uptime->last_recovery_finished = ctdb->recovery_end_time;
2468 reply.rdata.data.uptime = uptime;
2470 reply.errmsg = NULL;
2471 client_send_control(req, header, &reply);
2476 reply.errmsg = "Memory error";
2477 client_send_control(req, header, &reply);
2480 static void control_reload_nodes_file(TALLOC_CTX *mem_ctx,
2481 struct tevent_req *req,
2482 struct ctdb_req_header *header,
2483 struct ctdb_req_control *request)
2485 struct client_state *state = tevent_req_data(
2486 req, struct client_state);
2487 struct ctdbd_context *ctdb = state->ctdb;
2488 struct ctdb_reply_control reply;
2489 struct ctdb_node_map *nodemap;
2490 struct node_map *node_map = ctdb->node_map;
2493 reply.rdata.opcode = request->opcode;
2495 nodemap = read_nodes_file(mem_ctx, header->destnode);
2496 if (nodemap == NULL) {
2500 for (i=0; i<nodemap->num; i++) {
2503 if (i < node_map->num_nodes &&
2504 ctdb_sock_addr_same(&nodemap->node[i].addr,
2505 &node_map->node[i].addr)) {
2509 if (nodemap->node[i].flags & NODE_FLAGS_DELETED) {
2512 node = &node_map->node[i];
2514 node->flags |= NODE_FLAGS_DELETED;
2515 ret = ctdb_sock_addr_from_string("0.0.0.0", &node->addr,
2518 /* Can't happen, but Coverity... */
2525 if (i < node_map->num_nodes &&
2526 node_map->node[i].flags & NODE_FLAGS_DELETED) {
2527 node = &node_map->node[i];
2529 node->flags &= ~NODE_FLAGS_DELETED;
2530 node->addr = nodemap->node[i].addr;
2535 node_map->node = talloc_realloc(node_map, node_map->node,
2537 node_map->num_nodes+1);
2538 if (node_map->node == NULL) {
2541 node = &node_map->node[node_map->num_nodes];
2543 node->addr = nodemap->node[i].addr;
2544 node->pnn = nodemap->node[i].pnn;
2546 node->capabilities = CTDB_CAP_DEFAULT;
2547 node->recovery_disabled = false;
2548 node->recovery_substate = NULL;
2550 node_map->num_nodes += 1;
2553 talloc_free(nodemap);
2556 reply.errmsg = NULL;
2557 client_send_control(req, header, &reply);
2562 reply.errmsg = "Memory error";
2563 client_send_control(req, header, &reply);
2566 static void control_get_capabilities(TALLOC_CTX *mem_ctx,
2567 struct tevent_req *req,
2568 struct ctdb_req_header *header,
2569 struct ctdb_req_control *request)
2571 struct client_state *state = tevent_req_data(
2572 req, struct client_state);
2573 struct ctdbd_context *ctdb = state->ctdb;
2574 struct ctdb_reply_control reply;
2578 reply.rdata.opcode = request->opcode;
2580 node = &ctdb->node_map->node[header->destnode];
2581 caps = node->capabilities;
2583 if (node->flags & NODE_FLAGS_FAKE_TIMEOUT) {
2584 /* Don't send reply */
2588 reply.rdata.data.caps = caps;
2590 reply.errmsg = NULL;
2592 client_send_control(req, header, &reply);
2595 static void control_release_ip(TALLOC_CTX *mem_ctx,
2596 struct tevent_req *req,
2597 struct ctdb_req_header *header,
2598 struct ctdb_req_control *request)
2600 struct client_state *state = tevent_req_data(
2601 req, struct client_state);
2602 struct ctdbd_context *ctdb = state->ctdb;
2603 struct ctdb_public_ip *ip = request->rdata.data.pubip;
2604 struct ctdb_reply_control reply;
2605 struct ctdb_public_ip_list *ips = NULL;
2606 struct ctdb_public_ip *t = NULL;
2609 reply.rdata.opcode = request->opcode;
2611 if (ctdb->known_ips == NULL) {
2612 D_INFO("RELEASE_IP %s - not a public IP\n",
2613 ctdb_sock_addr_to_string(mem_ctx, &ip->addr, false));
2617 ips = &ctdb->known_ips[header->destnode];
2620 for (i = 0; i < ips->num; i++) {
2621 if (ctdb_sock_addr_same_ip(&ips->ip[i].addr, &ip->addr)) {
2627 D_INFO("RELEASE_IP %s - not a public IP\n",
2628 ctdb_sock_addr_to_string(mem_ctx, &ip->addr, false));
2632 if (t->pnn != header->destnode) {
2633 if (header->destnode == ip->pnn) {
2634 D_ERR("error: RELEASE_IP %s - to TAKE_IP node %d\n",
2635 ctdb_sock_addr_to_string(mem_ctx,
2639 reply.errmsg = "RELEASE_IP to TAKE_IP node";
2640 client_send_control(req, header, &reply);
2644 D_INFO("RELEASE_IP %s - to node %d - redundant\n",
2645 ctdb_sock_addr_to_string(mem_ctx, &ip->addr, false),
2649 D_NOTICE("RELEASE_IP %s - to node %d\n",
2650 ctdb_sock_addr_to_string(mem_ctx, &ip->addr, false),
2657 reply.errmsg = NULL;
2658 client_send_control(req, header, &reply);
2661 static void control_takeover_ip(TALLOC_CTX *mem_ctx,
2662 struct tevent_req *req,
2663 struct ctdb_req_header *header,
2664 struct ctdb_req_control *request)
2666 struct client_state *state = tevent_req_data(
2667 req, struct client_state);
2668 struct ctdbd_context *ctdb = state->ctdb;
2669 struct ctdb_public_ip *ip = request->rdata.data.pubip;
2670 struct ctdb_reply_control reply;
2671 struct ctdb_public_ip_list *ips = NULL;
2672 struct ctdb_public_ip *t = NULL;
2675 reply.rdata.opcode = request->opcode;
2677 if (ctdb->known_ips == NULL) {
2678 D_INFO("TAKEOVER_IP %s - not a public IP\n",
2679 ctdb_sock_addr_to_string(mem_ctx, &ip->addr, false));
2683 ips = &ctdb->known_ips[header->destnode];
2686 for (i = 0; i < ips->num; i++) {
2687 if (ctdb_sock_addr_same_ip(&ips->ip[i].addr, &ip->addr)) {
2693 D_INFO("TAKEOVER_IP %s - not a public IP\n",
2694 ctdb_sock_addr_to_string(mem_ctx, &ip->addr, false));
2698 if (t->pnn == header->destnode) {
2699 D_INFO("TAKEOVER_IP %s - redundant\n",
2700 ctdb_sock_addr_to_string(mem_ctx, &ip->addr, false));
2702 D_NOTICE("TAKEOVER_IP %s\n",
2703 ctdb_sock_addr_to_string(mem_ctx, &ip->addr, false));
2709 reply.errmsg = NULL;
2710 client_send_control(req, header, &reply);
2713 static void control_get_public_ips(TALLOC_CTX *mem_ctx,
2714 struct tevent_req *req,
2715 struct ctdb_req_header *header,
2716 struct ctdb_req_control *request)
2718 struct client_state *state = tevent_req_data(
2719 req, struct client_state);
2720 struct ctdbd_context *ctdb = state->ctdb;
2721 struct ctdb_reply_control reply;
2722 struct ctdb_public_ip_list *ips = NULL;
2724 reply.rdata.opcode = request->opcode;
2726 if (ctdb->known_ips == NULL) {
2727 /* No IPs defined so create a dummy empty struct and ship it */
2728 ips = talloc_zero(mem_ctx, struct ctdb_public_ip_list);;
2730 reply.status = ENOMEM;
2731 reply.errmsg = "Memory error";
2737 ips = &ctdb->known_ips[header->destnode];
2739 if (request->flags & CTDB_PUBLIC_IP_FLAGS_ONLY_AVAILABLE) {
2740 /* If runstate is not RUNNING or a node is then return
2741 * no available IPs. Don't worry about interface
2742 * states here - we're not faking down to that level.
2744 uint32_t flags = ctdb->node_map->node[header->destnode].flags;
2745 if (ctdb->runstate != CTDB_RUNSTATE_RUNNING ||
2746 ((flags & (NODE_FLAGS_INACTIVE|NODE_FLAGS_DISABLED)) != 0)) {
2747 /* No available IPs: return dummy empty struct */
2748 ips = talloc_zero(mem_ctx, struct ctdb_public_ip_list);;
2750 reply.status = ENOMEM;
2751 reply.errmsg = "Memory error";
2758 reply.rdata.data.pubip_list = ips;
2760 reply.errmsg = NULL;
2763 client_send_control(req, header, &reply);
2766 static void control_get_nodemap(TALLOC_CTX *mem_ctx,
2767 struct tevent_req *req,
2768 struct ctdb_req_header *header,
2769 struct ctdb_req_control *request)
2771 struct client_state *state = tevent_req_data(
2772 req, struct client_state);
2773 struct ctdbd_context *ctdb = state->ctdb;
2774 struct ctdb_reply_control reply;
2775 struct ctdb_node_map *nodemap;
2779 reply.rdata.opcode = request->opcode;
2781 nodemap = talloc_zero(mem_ctx, struct ctdb_node_map);
2782 if (nodemap == NULL) {
2786 nodemap->num = ctdb->node_map->num_nodes;
2787 nodemap->node = talloc_array(nodemap, struct ctdb_node_and_flags,
2789 if (nodemap->node == NULL) {
2793 for (i=0; i<nodemap->num; i++) {
2794 node = &ctdb->node_map->node[i];
2795 nodemap->node[i] = (struct ctdb_node_and_flags) {
2797 .flags = node->flags,
2802 reply.rdata.data.nodemap = nodemap;
2804 reply.errmsg = NULL;
2805 client_send_control(req, header, &reply);
2810 reply.errmsg = "Memory error";
2811 client_send_control(req, header, &reply);
2814 static void control_get_reclock_file(TALLOC_CTX *mem_ctx,
2815 struct tevent_req *req,
2816 struct ctdb_req_header *header,
2817 struct ctdb_req_control *request)
2819 struct client_state *state = tevent_req_data(
2820 req, struct client_state);
2821 struct ctdbd_context *ctdb = state->ctdb;
2822 struct ctdb_reply_control reply;
2824 reply.rdata.opcode = request->opcode;
2826 if (ctdb->reclock != NULL) {
2827 reply.rdata.data.reclock_file =
2828 talloc_strdup(mem_ctx, ctdb->reclock);
2829 if (reply.rdata.data.reclock_file == NULL) {
2830 reply.status = ENOMEM;
2831 reply.errmsg = "Memory error";
2835 reply.rdata.data.reclock_file = NULL;
2839 reply.errmsg = NULL;
2842 client_send_control(req, header, &reply);
2845 static void control_stop_node(TALLOC_CTX *mem_ctx,
2846 struct tevent_req *req,
2847 struct ctdb_req_header *header,
2848 struct ctdb_req_control *request)
2850 struct client_state *state = tevent_req_data(
2851 req, struct client_state);
2852 struct ctdbd_context *ctdb = state->ctdb;
2853 struct ctdb_reply_control reply;
2855 reply.rdata.opcode = request->opcode;
2857 DEBUG(DEBUG_INFO, ("Stopping node\n"));
2858 ctdb->node_map->node[header->destnode].flags |= NODE_FLAGS_STOPPED;
2861 reply.errmsg = NULL;
2863 client_send_control(req, header, &reply);
2867 static void control_continue_node(TALLOC_CTX *mem_ctx,
2868 struct tevent_req *req,
2869 struct ctdb_req_header *header,
2870 struct ctdb_req_control *request)
2872 struct client_state *state = tevent_req_data(
2873 req, struct client_state);
2874 struct ctdbd_context *ctdb = state->ctdb;
2875 struct ctdb_reply_control reply;
2877 reply.rdata.opcode = request->opcode;
2879 DEBUG(DEBUG_INFO, ("Continue node\n"));
2880 ctdb->node_map->node[header->destnode].flags &= ~NODE_FLAGS_STOPPED;
2883 reply.errmsg = NULL;
2885 client_send_control(req, header, &reply);
2889 static void set_ban_state_callback(struct tevent_req *subreq)
2891 struct node *node = tevent_req_callback_data(
2892 subreq, struct node);
2895 status = tevent_wakeup_recv(subreq);
2896 TALLOC_FREE(subreq);
2898 DEBUG(DEBUG_INFO, ("tevent_wakeup_recv failed\n"));
2901 node->flags &= ~NODE_FLAGS_BANNED;
2904 static void control_set_ban_state(TALLOC_CTX *mem_ctx,
2905 struct tevent_req *req,
2906 struct ctdb_req_header *header,
2907 struct ctdb_req_control *request)
2909 struct client_state *state = tevent_req_data(
2910 req, struct client_state);
2911 struct tevent_req *subreq;
2912 struct ctdbd_context *ctdb = state->ctdb;
2913 struct ctdb_ban_state *ban = request->rdata.data.ban_state;
2914 struct ctdb_reply_control reply;
2917 reply.rdata.opcode = request->opcode;
2919 if (ban->pnn != header->destnode) {
2921 ("SET_BAN_STATE control for PNN %d rejected\n",
2923 reply.status = EINVAL;
2927 node = &ctdb->node_map->node[header->destnode];
2929 if (ban->time == 0) {
2930 DEBUG(DEBUG_INFO,("Unbanning this node\n"));
2931 node->flags &= ~NODE_FLAGS_BANNED;
2935 subreq = tevent_wakeup_send(ctdb->node_map, state->ev,
2936 tevent_timeval_current_ofs(
2938 if (subreq == NULL) {
2939 reply.status = ENOMEM;
2942 tevent_req_set_callback(subreq, set_ban_state_callback, node);
2944 DEBUG(DEBUG_INFO, ("Banning this node for %d seconds\n", ban->time));
2945 node->flags |= NODE_FLAGS_BANNED;
2946 ctdb->vnn_map->generation = INVALID_GENERATION;
2950 reply.errmsg = NULL;
2952 client_send_control(req, header, &reply);
2956 reply.errmsg = "Failed to ban node";
2959 static void control_trans3_commit(TALLOC_CTX *mem_ctx,
2960 struct tevent_req *req,
2961 struct ctdb_req_header *header,
2962 struct ctdb_req_control *request)
2964 struct client_state *state = tevent_req_data(
2965 req, struct client_state);
2966 struct ctdbd_context *ctdb = state->ctdb;
2967 struct ctdb_reply_control reply;
2968 struct database *db;
2971 reply.rdata.opcode = request->opcode;
2973 db = database_find(ctdb->db_map, request->rdata.data.recbuf->db_id);
2976 reply.errmsg = "Unknown database";
2977 client_send_control(req, header, &reply);
2982 (CTDB_DB_FLAGS_PERSISTENT|CTDB_DB_FLAGS_REPLICATED))) {
2984 reply.errmsg = "Transactions on volatile database";
2985 client_send_control(req, header, &reply);
2989 ret = ltdb_transaction(db, request->rdata.data.recbuf);
2992 reply.errmsg = "Transaction failed";
2993 client_send_control(req, header, &reply);
2998 reply.errmsg = NULL;
2999 client_send_control(req, header, &reply);
3002 static void control_get_db_seqnum(TALLOC_CTX *mem_ctx,
3003 struct tevent_req *req,
3004 struct ctdb_req_header *header,
3005 struct ctdb_req_control *request)
3007 struct client_state *state = tevent_req_data(
3008 req, struct client_state);
3009 struct ctdbd_context *ctdb = state->ctdb;
3010 struct ctdb_reply_control reply;
3011 struct database *db;
3014 reply.rdata.opcode = request->opcode;
3016 db = database_find(ctdb->db_map, request->rdata.data.db_id);
3018 reply.status = ENOENT;
3019 reply.errmsg = "Database not found";
3023 ret = database_seqnum(db, &seqnum);
3025 reply.rdata.data.seqnum = seqnum;
3027 reply.errmsg = NULL;
3030 reply.errmsg = "Failed to get seqnum";
3034 client_send_control(req, header, &reply);
3037 static void control_db_get_health(TALLOC_CTX *mem_ctx,
3038 struct tevent_req *req,
3039 struct ctdb_req_header *header,
3040 struct ctdb_req_control *request)
3042 struct client_state *state = tevent_req_data(
3043 req, struct client_state);
3044 struct ctdbd_context *ctdb = state->ctdb;
3045 struct ctdb_reply_control reply;
3046 struct database *db;
3048 reply.rdata.opcode = request->opcode;
3050 db = database_find(ctdb->db_map, request->rdata.data.db_id);
3052 reply.status = ENOENT;
3053 reply.errmsg = "Database not found";
3055 reply.rdata.data.reason = NULL;
3057 reply.errmsg = NULL;
3060 client_send_control(req, header, &reply);
3063 static struct ctdb_iface_list *get_ctdb_iface_list(TALLOC_CTX *mem_ctx,
3064 struct ctdbd_context *ctdb)
3066 struct ctdb_iface_list *iface_list;
3067 struct interface *iface;
3070 iface_list = talloc_zero(mem_ctx, struct ctdb_iface_list);
3071 if (iface_list == NULL) {
3075 iface_list->num = ctdb->iface_map->num;
3076 iface_list->iface = talloc_array(iface_list, struct ctdb_iface,
3078 if (iface_list->iface == NULL) {
3079 TALLOC_FREE(iface_list);
3083 for (i=0; i<iface_list->num; i++) {
3084 iface = &ctdb->iface_map->iface[i];
3085 iface_list->iface[i] = (struct ctdb_iface) {
3086 .link_state = iface->link_up,
3087 .references = iface->references,
3089 strlcpy(iface_list->iface[i].name, iface->name,
3090 sizeof(iface_list->iface[i].name));
3097 static void control_get_public_ip_info(TALLOC_CTX *mem_ctx,
3098 struct tevent_req *req,
3099 struct ctdb_req_header *header,
3100 struct ctdb_req_control *request)
3102 struct client_state *state = tevent_req_data(
3103 req, struct client_state);
3104 struct ctdbd_context *ctdb = state->ctdb;
3105 struct ctdb_reply_control reply;
3106 ctdb_sock_addr *addr = request->rdata.data.addr;
3107 struct ctdb_public_ip_list *known = NULL;
3108 struct ctdb_public_ip_info *info = NULL;
3111 reply.rdata.opcode = request->opcode;
3113 info = talloc_zero(mem_ctx, struct ctdb_public_ip_info);
3115 reply.status = ENOMEM;
3116 reply.errmsg = "Memory error";
3120 reply.rdata.data.ipinfo = info;
3122 if (ctdb->known_ips != NULL) {
3123 known = &ctdb->known_ips[header->destnode];
3125 /* No IPs defined so create a dummy empty struct and
3126 * fall through. The given IP won't be matched
3129 known = talloc_zero(mem_ctx, struct ctdb_public_ip_list);;
3130 if (known == NULL) {
3131 reply.status = ENOMEM;
3132 reply.errmsg = "Memory error";
3137 for (i = 0; i < known->num; i++) {
3138 if (ctdb_sock_addr_same_ip(&known->ip[i].addr,
3144 if (i == known->num) {
3145 D_ERR("GET_PUBLIC_IP_INFO: not known public IP %s\n",
3146 ctdb_sock_addr_to_string(mem_ctx, addr, false));
3148 reply.errmsg = "Unknown address";
3152 info->ip = known->ip[i];
3154 /* The fake PUBLICIPS stanza and resulting known_ips data
3155 * don't know anything about interfaces, so completely fake
3158 info->active_idx = 0;
3160 info->ifaces = get_ctdb_iface_list(mem_ctx, ctdb);
3161 if (info->ifaces == NULL) {
3162 reply.status = ENOMEM;
3163 reply.errmsg = "Memory error";
3168 reply.errmsg = NULL;
3171 client_send_control(req, header, &reply);
3174 static void control_get_ifaces(TALLOC_CTX *mem_ctx,
3175 struct tevent_req *req,
3176 struct ctdb_req_header *header,
3177 struct ctdb_req_control *request)
3179 struct client_state *state = tevent_req_data(
3180 req, struct client_state);
3181 struct ctdbd_context *ctdb = state->ctdb;
3182 struct ctdb_reply_control reply;
3183 struct ctdb_iface_list *iface_list;
3185 reply.rdata.opcode = request->opcode;
3187 iface_list = get_ctdb_iface_list(mem_ctx, ctdb);
3188 if (iface_list == NULL) {
3192 reply.rdata.data.iface_list = iface_list;
3194 reply.errmsg = NULL;
3195 client_send_control(req, header, &reply);
3200 reply.errmsg = "Memory error";
3201 client_send_control(req, header, &reply);
3204 static void control_set_iface_link_state(TALLOC_CTX *mem_ctx,
3205 struct tevent_req *req,
3206 struct ctdb_req_header *header,
3207 struct ctdb_req_control *request)
3209 struct client_state *state = tevent_req_data(
3210 req, struct client_state);
3211 struct ctdbd_context *ctdb = state->ctdb;
3212 struct ctdb_reply_control reply;
3213 struct ctdb_iface *in_iface;
3214 struct interface *iface = NULL;
3215 bool link_up = false;
3218 reply.rdata.opcode = request->opcode;
3220 in_iface = request->rdata.data.iface;
3222 if (in_iface->name[CTDB_IFACE_SIZE] != '\0') {
3223 reply.errmsg = "interface name not terminated";
3227 switch (in_iface->link_state) {
3237 reply.errmsg = "invalid link state";
3241 if (in_iface->references != 0) {
3242 reply.errmsg = "references should be 0";
3246 for (i=0; i<ctdb->iface_map->num; i++) {
3247 if (strcmp(ctdb->iface_map->iface[i].name,
3248 in_iface->name) == 0) {
3249 iface = &ctdb->iface_map->iface[i];
3254 if (iface == NULL) {
3255 reply.errmsg = "interface not found";
3259 iface->link_up = link_up;
3262 reply.errmsg = NULL;
3263 client_send_control(req, header, &reply);
3268 client_send_control(req, header, &reply);
3271 static void control_set_db_readonly(TALLOC_CTX *mem_ctx,
3272 struct tevent_req *req,
3273 struct ctdb_req_header *header,
3274 struct ctdb_req_control *request)
3276 struct client_state *state = tevent_req_data(
3277 req, struct client_state);
3278 struct ctdbd_context *ctdb = state->ctdb;
3279 struct ctdb_reply_control reply;
3280 struct database *db;
3282 reply.rdata.opcode = request->opcode;
3284 db = database_find(ctdb->db_map, request->rdata.data.db_id);
3286 reply.status = ENOENT;
3287 reply.errmsg = "Database not found";
3291 if (db->flags & CTDB_DB_FLAGS_PERSISTENT) {
3292 reply.status = EINVAL;
3293 reply.errmsg = "Can not set READONLY on persistent db";
3297 db->flags |= CTDB_DB_FLAGS_READONLY;
3299 reply.errmsg = NULL;
3302 client_send_control(req, header, &reply);
3305 struct traverse_start_ext_state {
3306 struct tevent_req *req;
3307 struct ctdb_req_header *header;
3310 bool withemptyrecords;
3314 static int traverse_start_ext_handler(struct tdb_context *tdb,
3315 TDB_DATA key, TDB_DATA data,
3318 struct traverse_start_ext_state *state =
3319 (struct traverse_start_ext_state *)private_data;
3320 struct ctdb_rec_data rec;
3321 struct ctdb_req_message_data message;
3324 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
3328 if ((data.dsize == sizeof(struct ctdb_ltdb_header)) &&
3329 (!state->withemptyrecords)) {
3333 rec = (struct ctdb_rec_data) {
3334 .reqid = state->reqid,
3340 message.srvid = state->srvid;
3341 message.data.dsize = ctdb_rec_data_len(&rec);
3342 message.data.dptr = talloc_size(state->req, message.data.dsize);
3343 if (message.data.dptr == NULL) {
3344 state->status = ENOMEM;
3348 ctdb_rec_data_push(&rec, message.data.dptr, &np);
3349 client_send_message(state->req, state->header, &message);
3351 talloc_free(message.data.dptr);
3356 static void control_traverse_start_ext(TALLOC_CTX *mem_ctx,
3357 struct tevent_req *req,
3358 struct ctdb_req_header *header,
3359 struct ctdb_req_control *request)
3361 struct client_state *state = tevent_req_data(
3362 req, struct client_state);
3363 struct ctdbd_context *ctdb = state->ctdb;
3364 struct ctdb_reply_control reply;
3365 struct database *db;
3366 struct ctdb_traverse_start_ext *ext;
3367 struct traverse_start_ext_state t_state;
3368 struct ctdb_rec_data rec;
3369 struct ctdb_req_message_data message;
3374 reply.rdata.opcode = request->opcode;
3376 ext = request->rdata.data.traverse_start_ext;
3378 db = database_find(ctdb->db_map, ext->db_id);
3381 reply.errmsg = "Unknown database";
3382 client_send_control(req, header, &reply);
3386 t_state = (struct traverse_start_ext_state) {
3389 .reqid = ext->reqid,
3390 .srvid = ext->srvid,
3391 .withemptyrecords = ext->withemptyrecords,
3394 ret = tdb_traverse_read(db->tdb, traverse_start_ext_handler, &t_state);
3395 DEBUG(DEBUG_INFO, ("traversed %d records\n", ret));
3396 if (t_state.status != 0) {
3398 reply.errmsg = "Memory error";
3399 client_send_control(req, header, &reply);
3403 client_send_control(req, header, &reply);
3405 rec = (struct ctdb_rec_data) {
3406 .reqid = ext->reqid,
3412 message.srvid = ext->srvid;
3413 message.data.dsize = ctdb_rec_data_len(&rec);
3414 ctdb_rec_data_push(&rec, buffer, &np);
3415 message.data.dptr = buffer;
3416 client_send_message(req, header, &message);
3419 static void control_set_db_sticky(TALLOC_CTX *mem_ctx,
3420 struct tevent_req *req,
3421 struct ctdb_req_header *header,
3422 struct ctdb_req_control *request)
3424 struct client_state *state = tevent_req_data(
3425 req, struct client_state);
3426 struct ctdbd_context *ctdb = state->ctdb;
3427 struct ctdb_reply_control reply;
3428 struct database *db;
3430 reply.rdata.opcode = request->opcode;
3432 db = database_find(ctdb->db_map, request->rdata.data.db_id);
3434 reply.status = ENOENT;
3435 reply.errmsg = "Database not found";
3439 if (db->flags & CTDB_DB_FLAGS_PERSISTENT) {
3440 reply.status = EINVAL;
3441 reply.errmsg = "Can not set STICKY on persistent db";
3445 db->flags |= CTDB_DB_FLAGS_STICKY;
3447 reply.errmsg = NULL;
3450 client_send_control(req, header, &reply);
3453 static void control_start_ipreallocate(TALLOC_CTX *mem_ctx,
3454 struct tevent_req *req,
3455 struct ctdb_req_header *header,
3456 struct ctdb_req_control *request)
3458 struct ctdb_reply_control reply;
3460 /* Always succeed */
3461 reply.rdata.opcode = request->opcode;
3463 reply.errmsg = NULL;
3465 client_send_control(req, header, &reply);
3468 static void control_ipreallocated(TALLOC_CTX *mem_ctx,
3469 struct tevent_req *req,
3470 struct ctdb_req_header *header,
3471 struct ctdb_req_control *request)
3473 struct ctdb_reply_control reply;
3475 /* Always succeed */
3476 reply.rdata.opcode = request->opcode;
3478 reply.errmsg = NULL;
3480 client_send_control(req, header, &reply);
3483 static void control_get_runstate(TALLOC_CTX *mem_ctx,
3484 struct tevent_req *req,
3485 struct ctdb_req_header *header,
3486 struct ctdb_req_control *request)
3488 struct client_state *state = tevent_req_data(
3489 req, struct client_state);
3490 struct ctdbd_context *ctdb = state->ctdb;
3491 struct ctdb_reply_control reply;
3493 reply.rdata.opcode = request->opcode;
3494 reply.rdata.data.runstate = ctdb->runstate;
3496 reply.errmsg = NULL;
3498 client_send_control(req, header, &reply);
3501 static void control_get_nodes_file(TALLOC_CTX *mem_ctx,
3502 struct tevent_req *req,
3503 struct ctdb_req_header *header,
3504 struct ctdb_req_control *request)
3506 struct ctdb_reply_control reply;
3507 struct ctdb_node_map *nodemap;
3509 reply.rdata.opcode = request->opcode;
3511 nodemap = read_nodes_file(mem_ctx, header->destnode);
3512 if (nodemap == NULL) {
3516 reply.rdata.data.nodemap = nodemap;
3518 reply.errmsg = NULL;
3519 client_send_control(req, header, &reply);
3524 reply.errmsg = "Failed to read nodes file";
3525 client_send_control(req, header, &reply);
3528 static void control_db_open_flags(TALLOC_CTX *mem_ctx,
3529 struct tevent_req *req,
3530 struct ctdb_req_header *header,
3531 struct ctdb_req_control *request)
3533 struct client_state *state = tevent_req_data(
3534 req, struct client_state);
3535 struct ctdbd_context *ctdb = state->ctdb;
3536 struct ctdb_reply_control reply;
3537 struct database *db;
3539 reply.rdata.opcode = request->opcode;
3541 db = database_find(ctdb->db_map, request->rdata.data.db_id);
3543 reply.status = ENOENT;
3544 reply.errmsg = "Database not found";
3546 reply.rdata.data.tdb_flags = database_flags(db->flags);
3548 reply.errmsg = NULL;
3551 client_send_control(req, header, &reply);
3554 static void control_db_attach_replicated(TALLOC_CTX *mem_ctx,
3555 struct tevent_req *req,
3556 struct ctdb_req_header *header,
3557 struct ctdb_req_control *request)
3559 struct client_state *state = tevent_req_data(
3560 req, struct client_state);
3561 struct ctdbd_context *ctdb = state->ctdb;
3562 struct ctdb_reply_control reply;
3563 struct database *db;
3565 reply.rdata.opcode = request->opcode;
3567 for (db = ctdb->db_map->db; db != NULL; db = db->next) {
3568 if (strcmp(db->name, request->rdata.data.db_name) == 0) {
3573 db = database_new(ctdb->db_map, request->rdata.data.db_name,
3574 CTDB_DB_FLAGS_REPLICATED);
3577 reply.errmsg = "Failed to attach database";
3578 client_send_control(req, header, &reply);
3583 reply.rdata.data.db_id = db->id;
3585 reply.errmsg = NULL;
3586 client_send_control(req, header, &reply);
3589 static void control_check_pid_srvid(TALLOC_CTX *mem_ctx,
3590 struct tevent_req *req,
3591 struct ctdb_req_header *header,
3592 struct ctdb_req_control *request)
3594 struct client_state *state = tevent_req_data(
3595 req, struct client_state);
3596 struct ctdbd_context *ctdb = state->ctdb;
3597 struct ctdb_client *client;
3598 struct client_state *cstate;
3599 struct ctdb_reply_control reply;
3600 bool pid_found, srvid_found;
3603 reply.rdata.opcode = request->opcode;
3606 srvid_found = false;
3608 for (client=ctdb->client_list; client != NULL; client=client->next) {
3609 if (client->pid == request->rdata.data.pid_srvid->pid) {
3611 cstate = (struct client_state *)client->state;
3612 ret = srvid_exists(ctdb->srv,
3613 request->rdata.data.pid_srvid->srvid,
3617 ret = kill(cstate->pid, 0);
3620 reply.errmsg = strerror(errno);
3623 reply.errmsg = NULL;
3631 reply.errmsg = "No client for PID";
3632 } else if (! srvid_found) {
3634 reply.errmsg = "No client for PID and SRVID";
3637 client_send_control(req, header, &reply);
3640 static void control_disable_node(TALLOC_CTX *mem_ctx,
3641 struct tevent_req *req,
3642 struct ctdb_req_header *header,
3643 struct ctdb_req_control *request)
3645 struct client_state *state = tevent_req_data(
3646 req, struct client_state);
3647 struct ctdbd_context *ctdb = state->ctdb;
3648 struct ctdb_reply_control reply;
3650 reply.rdata.opcode = request->opcode;
3652 DEBUG(DEBUG_INFO, ("Disabling node\n"));
3653 ctdb->node_map->node[header->destnode].flags |=
3654 NODE_FLAGS_PERMANENTLY_DISABLED;
3657 reply.errmsg = NULL;
3659 client_send_control(req, header, &reply);
3663 static void control_enable_node(TALLOC_CTX *mem_ctx,
3664 struct tevent_req *req,
3665 struct ctdb_req_header *header,
3666 struct ctdb_req_control *request)
3668 struct client_state *state = tevent_req_data(
3669 req, struct client_state);
3670 struct ctdbd_context *ctdb = state->ctdb;
3671 struct ctdb_reply_control reply;
3673 reply.rdata.opcode = request->opcode;
3675 DEBUG(DEBUG_INFO, ("Enable node\n"));
3676 ctdb->node_map->node[header->destnode].flags &=
3677 ~NODE_FLAGS_PERMANENTLY_DISABLED;
3680 reply.errmsg = NULL;
3682 client_send_control(req, header, &reply);
3686 static bool fake_control_failure(TALLOC_CTX *mem_ctx,
3687 struct tevent_req *req,
3688 struct ctdb_req_header *header,
3689 struct ctdb_req_control *request)
3691 struct client_state *state = tevent_req_data(
3692 req, struct client_state);
3693 struct ctdbd_context *ctdb = state->ctdb;
3694 struct ctdb_reply_control reply;
3695 struct fake_control_failure *f = NULL;
3697 D_DEBUG("Checking fake control failure for control %u on node %u\n",
3698 request->opcode, header->destnode);
3699 for (f = ctdb->control_failures; f != NULL; f = f->next) {
3700 if (f->opcode == request->opcode &&
3701 (f->pnn == header->destnode ||
3702 f->pnn == CTDB_UNKNOWN_PNN)) {
3704 reply.rdata.opcode = request->opcode;
3705 if (strcmp(f->error, "TIMEOUT") == 0) {
3706 /* Causes no reply */
3707 D_ERR("Control %u fake timeout on node %u\n",
3708 request->opcode, header->destnode);
3710 } else if (strcmp(f->error, "ERROR") == 0) {
3711 D_ERR("Control %u fake error on node %u\n",
3712 request->opcode, header->destnode);
3714 reply.errmsg = f->comment;
3715 client_send_control(req, header, &reply);
3724 static void control_error(TALLOC_CTX *mem_ctx,
3725 struct tevent_req *req,
3726 struct ctdb_req_header *header,
3727 struct ctdb_req_control *request)
3729 struct ctdb_reply_control reply;
3731 D_DEBUG("Control %u not implemented\n", request->opcode);
3733 reply.rdata.opcode = request->opcode;
3735 reply.errmsg = "Not implemented";
3737 client_send_control(req, header, &reply);
3741 * Handling protocol - messages
3744 struct disable_recoveries_state {
3748 static void disable_recoveries_callback(struct tevent_req *subreq)
3750 struct disable_recoveries_state *substate = tevent_req_callback_data(
3751 subreq, struct disable_recoveries_state);
3754 status = tevent_wakeup_recv(subreq);
3755 TALLOC_FREE(subreq);
3757 DEBUG(DEBUG_INFO, ("tevent_wakeup_recv failed\n"));
3760 substate->node->recovery_disabled = false;
3761 TALLOC_FREE(substate->node->recovery_substate);
3764 static void message_disable_recoveries(TALLOC_CTX *mem_ctx,
3765 struct tevent_req *req,
3766 struct ctdb_req_header *header,
3767 struct ctdb_req_message *request)
3769 struct client_state *state = tevent_req_data(
3770 req, struct client_state);
3771 struct tevent_req *subreq;
3772 struct ctdbd_context *ctdb = state->ctdb;
3773 struct disable_recoveries_state *substate;
3774 struct ctdb_disable_message *disable = request->data.disable;
3775 struct ctdb_req_message_data reply;
3780 node = &ctdb->node_map->node[header->destnode];
3782 if (disable->timeout == 0) {
3783 TALLOC_FREE(node->recovery_substate);
3784 node->recovery_disabled = false;
3785 DEBUG(DEBUG_INFO, ("Enabled recoveries on node %u\n",
3790 substate = talloc_zero(ctdb->node_map,
3791 struct disable_recoveries_state);
3792 if (substate == NULL) {
3796 substate->node = node;
3798 subreq = tevent_wakeup_send(substate, state->ev,
3799 tevent_timeval_current_ofs(
3800 disable->timeout, 0));
3801 if (subreq == NULL) {
3802 talloc_free(substate);
3805 tevent_req_set_callback(subreq, disable_recoveries_callback, substate);
3807 DEBUG(DEBUG_INFO, ("Disabled recoveries for %d seconds on node %u\n",
3808 disable->timeout, header->destnode));
3809 node->recovery_substate = substate;
3810 node->recovery_disabled = true;
3813 ret = header->destnode;
3816 reply.srvid = disable->srvid;
3817 data.dptr = (uint8_t *)&ret;
3818 data.dsize = sizeof(int);
3821 client_send_message(req, header, &reply);
3824 static void message_takeover_run(TALLOC_CTX *mem_ctx,
3825 struct tevent_req *req,
3826 struct ctdb_req_header *header,
3827 struct ctdb_req_message *request)
3829 struct client_state *state = tevent_req_data(
3830 req, struct client_state);
3831 struct ctdbd_context *ctdb = state->ctdb;
3832 struct ctdb_srvid_message *srvid = request->data.msg;
3833 struct ctdb_req_message_data reply;
3837 if (header->destnode != ctdb->node_map->recmaster) {
3838 /* No reply! Only recmaster replies... */
3842 DEBUG(DEBUG_INFO, ("IP takover run on node %u\n",
3844 ret = header->destnode;
3846 reply.srvid = srvid->srvid;
3847 data.dptr = (uint8_t *)&ret;
3848 data.dsize = sizeof(int);
3851 client_send_message(req, header, &reply);
3855 * Handle a single client
3858 static void client_read_handler(uint8_t *buf, size_t buflen,
3859 void *private_data);
3860 static void client_dead_handler(void *private_data);
3861 static void client_process_packet(struct tevent_req *req,
3862 uint8_t *buf, size_t buflen);
3863 static void client_process_call(struct tevent_req *req,
3864 uint8_t *buf, size_t buflen);
3865 static void client_process_message(struct tevent_req *req,
3866 uint8_t *buf, size_t buflen);
3867 static void client_process_control(struct tevent_req *req,
3868 uint8_t *buf, size_t buflen);
3869 static void client_reply_done(struct tevent_req *subreq);
3871 static struct tevent_req *client_send(TALLOC_CTX *mem_ctx,
3872 struct tevent_context *ev,
3873 int fd, struct ctdbd_context *ctdb,
3876 struct tevent_req *req;
3877 struct client_state *state;
3880 req = tevent_req_create(mem_ctx, &state, struct client_state);
3890 (void) ctdb_get_peer_pid(fd, &state->pid);
3892 ret = comm_setup(state, ev, fd, client_read_handler, req,
3893 client_dead_handler, req, &state->comm);
3895 tevent_req_error(req, ret);
3896 return tevent_req_post(req, ev);
3899 ret = client_add(ctdb, state->pid, state);
3901 tevent_req_error(req, ret);
3902 return tevent_req_post(req, ev);
3905 DEBUG(DEBUG_INFO, ("New client fd=%d\n", fd));
3910 static void client_read_handler(uint8_t *buf, size_t buflen,
3913 struct tevent_req *req = talloc_get_type_abort(
3914 private_data, struct tevent_req);
3915 struct client_state *state = tevent_req_data(
3916 req, struct client_state);
3917 struct ctdbd_context *ctdb = state->ctdb;
3918 struct ctdb_req_header header;
3923 ret = ctdb_req_header_pull(buf, buflen, &header, &np);
3928 if (buflen != header.length) {
3932 ret = ctdb_req_header_verify(&header, 0);
3937 header_fix_pnn(&header, ctdb);
3939 if (header.destnode == CTDB_BROADCAST_ALL) {
3940 for (i=0; i<ctdb->node_map->num_nodes; i++) {
3941 header.destnode = i;
3943 ctdb_req_header_push(&header, buf, &np);
3944 client_process_packet(req, buf, buflen);
3949 if (header.destnode == CTDB_BROADCAST_CONNECTED) {
3950 for (i=0; i<ctdb->node_map->num_nodes; i++) {
3951 if (ctdb->node_map->node[i].flags &
3952 NODE_FLAGS_DISCONNECTED) {
3956 header.destnode = i;
3958 ctdb_req_header_push(&header, buf, &np);
3959 client_process_packet(req, buf, buflen);
3964 if (header.destnode > ctdb->node_map->num_nodes) {
3965 fprintf(stderr, "Invalid destination pnn 0x%x\n",
3971 if (ctdb->node_map->node[header.destnode].flags & NODE_FLAGS_DISCONNECTED) {
3972 fprintf(stderr, "Packet for disconnected node pnn %u\n",
3977 ctdb_req_header_push(&header, buf, &np);
3978 client_process_packet(req, buf, buflen);
3981 static void client_dead_handler(void *private_data)
3983 struct tevent_req *req = talloc_get_type_abort(
3984 private_data, struct tevent_req);
3986 tevent_req_done(req);
3989 static void client_process_packet(struct tevent_req *req,
3990 uint8_t *buf, size_t buflen)
3992 struct ctdb_req_header header;
3996 ret = ctdb_req_header_pull(buf, buflen, &header, &np);
4001 switch (header.operation) {
4003 client_process_call(req, buf, buflen);
4006 case CTDB_REQ_MESSAGE:
4007 client_process_message(req, buf, buflen);
4010 case CTDB_REQ_CONTROL:
4011 client_process_control(req, buf, buflen);
4019 static void client_process_call(struct tevent_req *req,
4020 uint8_t *buf, size_t buflen)
4022 struct client_state *state = tevent_req_data(
4023 req, struct client_state);
4024 struct ctdbd_context *ctdb = state->ctdb;
4025 TALLOC_CTX *mem_ctx;
4026 struct ctdb_req_header header;
4027 struct ctdb_req_call request;
4028 struct ctdb_reply_call reply;
4029 struct database *db;
4030 struct ctdb_ltdb_header hdr;
4034 mem_ctx = talloc_new(state);
4035 if (tevent_req_nomem(mem_ctx, req)) {
4039 ret = ctdb_req_call_pull(buf, buflen, &header, mem_ctx, &request);
4041 talloc_free(mem_ctx);
4042 tevent_req_error(req, ret);
4046 header_fix_pnn(&header, ctdb);
4048 if (header.destnode >= ctdb->node_map->num_nodes) {
4052 DEBUG(DEBUG_INFO, ("call db_id = %u\n", request.db_id));
4054 db = database_find(ctdb->db_map, request.db_id);
4059 ret = ltdb_fetch(db, request.key, &hdr, mem_ctx, &data);
4064 /* Fake migration */
4065 if (hdr.dmaster != ctdb->node_map->pnn) {
4066 hdr.dmaster = ctdb->node_map->pnn;
4068 ret = ltdb_store(db, request.key, &hdr, data);
4074 talloc_free(mem_ctx);
4077 reply.data = tdb_null;
4079 client_send_call(req, &header, &reply);
4083 talloc_free(mem_ctx);
4085 reply.data = tdb_null;
4087 client_send_call(req, &header, &reply);
4090 static void client_process_message(struct tevent_req *req,
4091 uint8_t *buf, size_t buflen)
4093 struct client_state *state = tevent_req_data(
4094 req, struct client_state);
4095 struct ctdbd_context *ctdb = state->ctdb;
4096 TALLOC_CTX *mem_ctx;
4097 struct ctdb_req_header header;
4098 struct ctdb_req_message request;
4102 mem_ctx = talloc_new(state);
4103 if (tevent_req_nomem(mem_ctx, req)) {
4107 ret = ctdb_req_message_pull(buf, buflen, &header, mem_ctx, &request);
4109 talloc_free(mem_ctx);
4110 tevent_req_error(req, ret);
4114 header_fix_pnn(&header, ctdb);
4116 if (header.destnode >= ctdb->node_map->num_nodes) {
4117 /* Many messages are not replied to, so just behave as
4118 * though this message was not received */
4119 fprintf(stderr, "Invalid node %d\n", header.destnode);
4120 talloc_free(mem_ctx);
4124 srvid = request.srvid;
4125 DEBUG(DEBUG_INFO, ("request srvid = 0x%"PRIx64"\n", srvid));
4127 if (srvid == CTDB_SRVID_DISABLE_RECOVERIES) {
4128 message_disable_recoveries(mem_ctx, req, &header, &request);
4129 } else if (srvid == CTDB_SRVID_TAKEOVER_RUN) {
4130 message_takeover_run(mem_ctx, req, &header, &request);
4132 D_DEBUG("Message id 0x%"PRIx64" not implemented\n", srvid);
4136 talloc_free(mem_ctx);
4139 static void client_process_control(struct tevent_req *req,
4140 uint8_t *buf, size_t buflen)
4142 struct client_state *state = tevent_req_data(
4143 req, struct client_state);
4144 struct ctdbd_context *ctdb = state->ctdb;
4145 TALLOC_CTX *mem_ctx;
4146 struct ctdb_req_header header;
4147 struct ctdb_req_control request;
4150 mem_ctx = talloc_new(state);
4151 if (tevent_req_nomem(mem_ctx, req)) {
4155 ret = ctdb_req_control_pull(buf, buflen, &header, mem_ctx, &request);
4157 talloc_free(mem_ctx);
4158 tevent_req_error(req, ret);
4162 header_fix_pnn(&header, ctdb);
4164 if (header.destnode >= ctdb->node_map->num_nodes) {
4165 struct ctdb_reply_control reply;
4167 reply.rdata.opcode = request.opcode;
4168 reply.errmsg = "Invalid node";
4170 client_send_control(req, &header, &reply);
4174 DEBUG(DEBUG_INFO, ("request opcode = %u, reqid = %u\n",
4175 request.opcode, header.reqid));
4177 if (fake_control_failure(mem_ctx, req, &header, &request)) {
4181 switch (request.opcode) {
4182 case CTDB_CONTROL_PROCESS_EXISTS:
4183 control_process_exists(mem_ctx, req, &header, &request);
4186 case CTDB_CONTROL_PING:
4187 control_ping(mem_ctx, req, &header, &request);
4190 case CTDB_CONTROL_GETDBPATH:
4191 control_getdbpath(mem_ctx, req, &header, &request);
4194 case CTDB_CONTROL_GETVNNMAP:
4195 control_getvnnmap(mem_ctx, req, &header, &request);
4198 case CTDB_CONTROL_GET_DEBUG:
4199 control_get_debug(mem_ctx, req, &header, &request);
4202 case CTDB_CONTROL_SET_DEBUG:
4203 control_set_debug(mem_ctx, req, &header, &request);
4206 case CTDB_CONTROL_GET_DBMAP:
4207 control_get_dbmap(mem_ctx, req, &header, &request);
4210 case CTDB_CONTROL_GET_RECMODE:
4211 control_get_recmode(mem_ctx, req, &header, &request);
4214 case CTDB_CONTROL_SET_RECMODE:
4215 control_set_recmode(mem_ctx, req, &header, &request);
4218 case CTDB_CONTROL_DB_ATTACH:
4219 control_db_attach(mem_ctx, req, &header, &request);
4222 case CTDB_CONTROL_REGISTER_SRVID:
4223 control_register_srvid(mem_ctx, req, &header, &request);
4226 case CTDB_CONTROL_DEREGISTER_SRVID:
4227 control_deregister_srvid(mem_ctx, req, &header, &request);
4230 case CTDB_CONTROL_GET_DBNAME:
4231 control_get_dbname(mem_ctx, req, &header, &request);
4234 case CTDB_CONTROL_GET_PID:
4235 control_get_pid(mem_ctx, req, &header, &request);
4238 case CTDB_CONTROL_GET_PNN:
4239 control_get_pnn(mem_ctx, req, &header, &request);
4242 case CTDB_CONTROL_SHUTDOWN:
4243 control_shutdown(mem_ctx, req, &header, &request);
4246 case CTDB_CONTROL_SET_TUNABLE:
4247 control_set_tunable(mem_ctx, req, &header, &request);
4250 case CTDB_CONTROL_GET_TUNABLE:
4251 control_get_tunable(mem_ctx, req, &header, &request);
4254 case CTDB_CONTROL_LIST_TUNABLES:
4255 control_list_tunables(mem_ctx, req, &header, &request);
4258 case CTDB_CONTROL_MODIFY_FLAGS:
4259 control_modify_flags(mem_ctx, req, &header, &request);
4262 case CTDB_CONTROL_GET_ALL_TUNABLES:
4263 control_get_all_tunables(mem_ctx, req, &header, &request);
4266 case CTDB_CONTROL_DB_ATTACH_PERSISTENT:
4267 control_db_attach_persistent(mem_ctx, req, &header, &request);
4270 case CTDB_CONTROL_UPTIME:
4271 control_uptime(mem_ctx, req, &header, &request);
4274 case CTDB_CONTROL_RELOAD_NODES_FILE:
4275 control_reload_nodes_file(mem_ctx, req, &header, &request);
4278 case CTDB_CONTROL_GET_CAPABILITIES:
4279 control_get_capabilities(mem_ctx, req, &header, &request);
4282 case CTDB_CONTROL_RELEASE_IP:
4283 control_release_ip(mem_ctx, req, &header, &request);
4286 case CTDB_CONTROL_TAKEOVER_IP:
4287 control_takeover_ip(mem_ctx, req, &header, &request);
4290 case CTDB_CONTROL_GET_PUBLIC_IPS:
4291 control_get_public_ips(mem_ctx, req, &header, &request);
4294 case CTDB_CONTROL_GET_NODEMAP:
4295 control_get_nodemap(mem_ctx, req, &header, &request);
4298 case CTDB_CONTROL_GET_RECLOCK_FILE:
4299 control_get_reclock_file(mem_ctx, req, &header, &request);
4302 case CTDB_CONTROL_STOP_NODE:
4303 control_stop_node(mem_ctx, req, &header, &request);
4306 case CTDB_CONTROL_CONTINUE_NODE:
4307 control_continue_node(mem_ctx, req, &header, &request);
4310 case CTDB_CONTROL_SET_BAN_STATE:
4311 control_set_ban_state(mem_ctx, req, &header, &request);
4314 case CTDB_CONTROL_TRANS3_COMMIT:
4315 control_trans3_commit(mem_ctx, req, &header, &request);
4318 case CTDB_CONTROL_GET_DB_SEQNUM:
4319 control_get_db_seqnum(mem_ctx, req, &header, &request);
4322 case CTDB_CONTROL_DB_GET_HEALTH:
4323 control_db_get_health(mem_ctx, req, &header, &request);
4326 case CTDB_CONTROL_GET_PUBLIC_IP_INFO:
4327 control_get_public_ip_info(mem_ctx, req, &header, &request);
4330 case CTDB_CONTROL_GET_IFACES:
4331 control_get_ifaces(mem_ctx, req, &header, &request);
4334 case CTDB_CONTROL_SET_IFACE_LINK_STATE:
4335 control_set_iface_link_state(mem_ctx, req, &header, &request);
4338 case CTDB_CONTROL_SET_DB_READONLY:
4339 control_set_db_readonly(mem_ctx, req, &header, &request);
4342 case CTDB_CONTROL_TRAVERSE_START_EXT:
4343 control_traverse_start_ext(mem_ctx, req, &header, &request);
4346 case CTDB_CONTROL_SET_DB_STICKY:
4347 control_set_db_sticky(mem_ctx, req, &header, &request);
4350 case CTDB_CONTROL_IPREALLOCATED:
4351 control_ipreallocated(mem_ctx, req, &header, &request);
4354 case CTDB_CONTROL_GET_RUNSTATE:
4355 control_get_runstate(mem_ctx, req, &header, &request);
4358 case CTDB_CONTROL_GET_NODES_FILE:
4359 control_get_nodes_file(mem_ctx, req, &header, &request);
4362 case CTDB_CONTROL_DB_OPEN_FLAGS:
4363 control_db_open_flags(mem_ctx, req, &header, &request);
4366 case CTDB_CONTROL_DB_ATTACH_REPLICATED:
4367 control_db_attach_replicated(mem_ctx, req, &header, &request);
4370 case CTDB_CONTROL_CHECK_PID_SRVID:
4371 control_check_pid_srvid(mem_ctx, req, &header, &request);
4374 case CTDB_CONTROL_DISABLE_NODE:
4375 control_disable_node(mem_ctx, req, &header, &request);
4378 case CTDB_CONTROL_ENABLE_NODE:
4379 control_enable_node(mem_ctx, req, &header, &request);
4382 case CTDB_CONTROL_START_IPREALLOCATE:
4383 control_start_ipreallocate(mem_ctx, req, &header, &request);
4387 if (! (request.flags & CTDB_CTRL_FLAG_NOREPLY)) {
4388 control_error(mem_ctx, req, &header, &request);
4394 talloc_free(mem_ctx);
4397 static int client_recv(struct tevent_req *req, int *perr)
4399 struct client_state *state = tevent_req_data(
4400 req, struct client_state);
4403 DEBUG(DEBUG_INFO, ("Client done fd=%d\n", state->fd));
4406 if (tevent_req_is_unix_error(req, &err)) {
4413 return state->status;
4420 struct server_state {
4421 struct tevent_context *ev;
4422 struct ctdbd_context *ctdb;
4423 struct tevent_timer *leader_broadcast_te;
4427 static void server_leader_broadcast(struct tevent_context *ev,
4428 struct tevent_timer *te,
4429 struct timeval current_time,
4430 void *private_data);
4431 static void server_new_client(struct tevent_req *subreq);
4432 static void server_client_done(struct tevent_req *subreq);
4434 static struct tevent_req *server_send(TALLOC_CTX *mem_ctx,
4435 struct tevent_context *ev,
4436 struct ctdbd_context *ctdb,
4439 struct tevent_req *req, *subreq;
4440 struct server_state *state;
4442 req = tevent_req_create(mem_ctx, &state, struct server_state);
4451 state->leader_broadcast_te = tevent_add_timer(state->ev,
4453 timeval_current_ofs(0, 0),
4454 server_leader_broadcast,
4456 if (state->leader_broadcast_te == NULL) {
4457 DBG_WARNING("Failed to set up leader broadcast\n");
4460 subreq = accept_send(state, ev, fd);
4461 if (tevent_req_nomem(subreq, req)) {
4462 return tevent_req_post(req, ev);
4464 tevent_req_set_callback(subreq, server_new_client, req);
4469 static void server_leader_broadcast(struct tevent_context *ev,
4470 struct tevent_timer *te,
4471 struct timeval current_time,
4474 struct server_state *state = talloc_get_type_abort(
4475 private_data, struct server_state);
4476 struct ctdbd_context *ctdb = state->ctdb;
4477 uint32_t leader = ctdb->node_map->recmaster;
4481 if (leader == CTDB_UNKNOWN_PNN) {
4485 data.dptr = (uint8_t *)&leader;
4486 data.dsize = sizeof(leader);
4488 ret = srvid_dispatch(ctdb->srv, CTDB_SRVID_LEADER, 0, data);
4490 DBG_WARNING("Failed to send leader broadcast, ret=%d\n", ret);
4494 state->leader_broadcast_te = tevent_add_timer(state->ev,
4496 timeval_current_ofs(1, 0),
4497 server_leader_broadcast,
4499 if (state->leader_broadcast_te == NULL) {
4500 DBG_WARNING("Failed to set up leader broadcast\n");
4504 static void server_new_client(struct tevent_req *subreq)
4506 struct tevent_req *req = tevent_req_callback_data(
4507 subreq, struct tevent_req);
4508 struct server_state *state = tevent_req_data(
4509 req, struct server_state);
4510 struct ctdbd_context *ctdb = state->ctdb;
4514 client_fd = accept_recv(subreq, NULL, NULL, &ret);
4515 TALLOC_FREE(subreq);
4516 if (client_fd == -1) {
4517 tevent_req_error(req, ret);
4521 subreq = client_send(state, state->ev, client_fd,
4522 ctdb, ctdb->node_map->pnn);
4523 if (tevent_req_nomem(subreq, req)) {
4526 tevent_req_set_callback(subreq, server_client_done, req);
4528 ctdb->num_clients += 1;
4530 subreq = accept_send(state, state->ev, state->fd);
4531 if (tevent_req_nomem(subreq, req)) {
4534 tevent_req_set_callback(subreq, server_new_client, req);
4537 static void server_client_done(struct tevent_req *subreq)
4539 struct tevent_req *req = tevent_req_callback_data(
4540 subreq, struct tevent_req);
4541 struct server_state *state = tevent_req_data(
4542 req, struct server_state);
4543 struct ctdbd_context *ctdb = state->ctdb;
4547 status = client_recv(subreq, &ret);
4548 TALLOC_FREE(subreq);
4550 tevent_req_error(req, ret);
4554 ctdb->num_clients -= 1;
4557 /* Special status, to shutdown server */
4558 DEBUG(DEBUG_INFO, ("Shutting down server\n"));
4559 tevent_req_done(req);
4563 static bool server_recv(struct tevent_req *req, int *perr)
4567 if (tevent_req_is_unix_error(req, &err)) {
4580 static int socket_init(const char *sockpath)
4582 struct sockaddr_un addr;
4586 memset(&addr, 0, sizeof(addr));
4587 addr.sun_family = AF_UNIX;
4589 len = strlcpy(addr.sun_path, sockpath, sizeof(addr.sun_path));
4590 if (len >= sizeof(addr.sun_path)) {
4591 fprintf(stderr, "path too long: %s\n", sockpath);
4595 fd = socket(AF_UNIX, SOCK_STREAM, 0);
4597 fprintf(stderr, "socket failed - %s\n", sockpath);
4601 ret = bind(fd, (struct sockaddr *)&addr, sizeof(addr));
4603 fprintf(stderr, "bind failed - %s\n", sockpath);
4607 ret = listen(fd, 10);
4609 fprintf(stderr, "listen failed\n");
4613 DEBUG(DEBUG_INFO, ("Socket init done\n"));
4624 static struct options {
4626 const char *sockpath;
4627 const char *pidfile;
4628 const char *debuglevel;
4631 static struct poptOption cmdline_options[] = {
4633 { "dbdir", 'D', POPT_ARG_STRING, &options.dbdir, 0,
4634 "Database directory", "directory" },
4635 { "socket", 's', POPT_ARG_STRING, &options.sockpath, 0,
4636 "Unix domain socket path", "filename" },
4637 { "pidfile", 'p', POPT_ARG_STRING, &options.pidfile, 0,
4638 "pid file", "filename" } ,
4639 { "debug", 'd', POPT_ARG_STRING, &options.debuglevel, 0,
4640 "debug level", "ERR|WARNING|NOTICE|INFO|DEBUG" } ,
4644 static void cleanup(void)
4646 unlink(options.sockpath);
4647 unlink(options.pidfile);
4650 static void signal_handler(int sig)
4656 static void start_server(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
4657 struct ctdbd_context *ctdb, int fd, int pfd)
4659 struct tevent_req *req;
4664 signal(SIGTERM, signal_handler);
4666 req = server_send(mem_ctx, ev, ctdb, fd);
4668 fprintf(stderr, "Memory error\n");
4672 len = write(pfd, &ret, sizeof(ret));
4673 if (len != sizeof(ret)) {
4674 fprintf(stderr, "Failed to send message to parent\n");
4679 tevent_req_poll(req, ev);
4681 server_recv(req, &ret);
4687 int main(int argc, const char *argv[])
4689 TALLOC_CTX *mem_ctx;
4690 struct ctdbd_context *ctdb;
4691 struct tevent_context *ev;
4693 int opt, fd, ret, pfd[2];
4698 pc = poptGetContext(argv[0], argc, argv, cmdline_options,
4699 POPT_CONTEXT_KEEP_FIRST);
4700 while ((opt = poptGetNextOpt(pc)) != -1) {
4701 fprintf(stderr, "Invalid option %s\n", poptBadOption(pc, 0));
4705 if (options.dbdir == NULL) {
4706 fprintf(stderr, "Please specify database directory\n");
4707 poptPrintHelp(pc, stdout, 0);
4711 if (options.sockpath == NULL) {
4712 fprintf(stderr, "Please specify socket path\n");
4713 poptPrintHelp(pc, stdout, 0);
4717 if (options.pidfile == NULL) {
4718 fprintf(stderr, "Please specify pid file\n");
4719 poptPrintHelp(pc, stdout, 0);
4723 mem_ctx = talloc_new(NULL);
4724 if (mem_ctx == NULL) {
4725 fprintf(stderr, "Memory error\n");
4729 ret = logging_init(mem_ctx, "file:", options.debuglevel, "fake-ctdbd");
4731 fprintf(stderr, "Invalid debug level\n");
4732 poptPrintHelp(pc, stdout, 0);
4736 ctdb = ctdbd_setup(mem_ctx, options.dbdir);
4741 if (! ctdbd_verify(ctdb)) {
4745 ev = tevent_context_init(mem_ctx);
4747 fprintf(stderr, "Memory error\n");
4751 fd = socket_init(options.sockpath);
4758 fprintf(stderr, "Failed to create pipe\n");
4765 fprintf(stderr, "Failed to fork\n");
4773 start_server(mem_ctx, ev, ctdb, fd, pfd[1]);
4780 len = read(pfd[0], &ret, sizeof(ret));
4782 if (len != sizeof(ret)) {
4783 fprintf(stderr, "len = %zi\n", len);
4784 fprintf(stderr, "Failed to get message from child\n");
4789 fp = fopen(options.pidfile, "w");
4791 fprintf(stderr, "Failed to open pid file %s\n",
4796 fprintf(fp, "%d\n", pid);