2 ctdb main protocol code
4 Copyright (C) Andrew Tridgell 2006
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
21 #include "../tdb/include/tdb.h"
22 #include "lib/events/events.h"
23 #include "lib/util/dlinklist.h"
24 #include "system/network.h"
25 #include "system/filesys.h"
26 #include "../include/ctdb_private.h"
29 choose the transport we will use
31 int ctdb_set_transport(struct ctdb_context *ctdb, const char *transport)
33 ctdb->transport = talloc_strdup(ctdb, transport);
38 choose the recovery lock file
40 int ctdb_set_recovery_lock_file(struct ctdb_context *ctdb, const char *file)
42 ctdb->recovery_lock_file = talloc_strdup(ctdb, file);
47 choose the logfile location
49 int ctdb_set_logfile(struct ctdb_context *ctdb, const char *logfile)
51 ctdb->logfile = talloc_strdup(ctdb, logfile);
52 if (ctdb->logfile != NULL && strcmp(logfile, "-") != 0) {
54 fd = open(ctdb->logfile, O_WRONLY|O_APPEND|O_CREAT, 0666);
56 printf("Failed to open logfile %s\n", ctdb->logfile);
65 /* also catch stderr of subcommands to the log file */
73 set the directory for the local databases
75 int ctdb_set_tdb_dir(struct ctdb_context *ctdb, const char *dir)
77 ctdb->db_directory = talloc_strdup(ctdb, dir);
78 if (ctdb->db_directory == NULL) {
85 add a node to the list of active nodes
87 static int ctdb_add_node(struct ctdb_context *ctdb, char *nstr)
89 struct ctdb_node *node, **nodep;
91 nodep = talloc_realloc(ctdb, ctdb->nodes, struct ctdb_node *, ctdb->num_nodes+1);
92 CTDB_NO_MEMORY(ctdb, nodep);
95 nodep = &ctdb->nodes[ctdb->num_nodes];
96 (*nodep) = talloc_zero(ctdb->nodes, struct ctdb_node);
97 CTDB_NO_MEMORY(ctdb, *nodep);
100 if (ctdb_parse_address(ctdb, node, nstr, &node->address) != 0) {
104 node->name = talloc_asprintf(node, "%s:%u",
105 node->address.address,
107 /* this assumes that the nodes are kept in sorted order, and no gaps */
108 node->vnn = ctdb->num_nodes;
110 /* nodes start out disconnected */
111 node->flags |= NODE_FLAGS_DISCONNECTED;
113 if (ctdb->address.address &&
114 ctdb_same_address(&ctdb->address, &node->address)) {
115 ctdb->vnn = node->vnn;
116 node->flags &= ~NODE_FLAGS_DISCONNECTED;
120 node->dead_count = 0;
126 setup the node list from a file
128 int ctdb_set_nlist(struct ctdb_context *ctdb, const char *nlist)
134 talloc_free(ctdb->node_list_file);
135 ctdb->node_list_file = talloc_strdup(ctdb, nlist);
137 lines = file_lines_load(nlist, &nlines, ctdb);
139 ctdb_set_error(ctdb, "Failed to load nlist '%s'\n", nlist);
142 while (nlines > 0 && strcmp(lines[nlines-1], "") == 0) {
146 for (i=0;i<nlines;i++) {
147 if (ctdb_add_node(ctdb, lines[i]) != 0) {
153 /* initialize the vnn mapping table now that we have num_nodes setup */
155 XXX we currently initialize it to the maximum number of nodes to
156 XXX make it behave the same way as previously.
157 XXX Once we have recovery working we should initialize this always to
158 XXX generation==0 (==invalid) and let the recovery tool populate this
159 XXX table for the daemons.
161 ctdb->vnn_map = talloc(ctdb, struct ctdb_vnn_map);
162 CTDB_NO_MEMORY(ctdb, ctdb->vnn_map);
164 ctdb->vnn_map->generation = 1;
165 ctdb->vnn_map->size = ctdb->num_nodes;
166 ctdb->vnn_map->map = talloc_array(ctdb->vnn_map, uint32_t, ctdb->vnn_map->size);
167 CTDB_NO_MEMORY(ctdb, ctdb->vnn_map->map);
169 for(i=0;i<ctdb->vnn_map->size;i++) {
170 ctdb->vnn_map->map[i] = i;
179 setup the local node address
181 int ctdb_set_address(struct ctdb_context *ctdb, const char *address)
183 if (ctdb_parse_address(ctdb, ctdb, address, &ctdb->address) != 0) {
187 ctdb->name = talloc_asprintf(ctdb, "%s:%u",
188 ctdb->address.address,
195 return the number of active nodes
197 uint32_t ctdb_get_num_active_nodes(struct ctdb_context *ctdb)
201 for (i=0;i<ctdb->vnn_map->size;i++) {
202 struct ctdb_node *node = ctdb->nodes[ctdb->vnn_map->map[i]];
203 if (!(node->flags & NODE_FLAGS_INACTIVE)) {
212 called when we need to process a packet. This can be a requeued packet
213 after a lockwait, or a real packet from another node
215 void ctdb_input_pkt(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
219 /* place the packet as a child of the tmp_ctx. We then use
220 talloc_free() below to free it. If any of the calls want
221 to keep it, then they will steal it somewhere else, and the
222 talloc_free() will only free the tmp_ctx */
223 tmp_ctx = talloc_new(ctdb);
224 talloc_steal(tmp_ctx, hdr);
226 DEBUG(3,(__location__ " ctdb request %u of type %u length %u from "
227 "node %u to %u\n", hdr->reqid, hdr->operation, hdr->length,
228 hdr->srcnode, hdr->destnode));
230 switch (hdr->operation) {
232 case CTDB_REPLY_CALL:
233 case CTDB_REQ_DMASTER:
234 case CTDB_REPLY_DMASTER:
235 /* for ctdb_call inter-node operations verify that the
236 remote node that sent us the call is running in the
237 same generation instance as this node
239 if (ctdb->vnn_map->generation != hdr->generation) {
240 DEBUG(0,(__location__ " ctdb request %u"
241 " length %u from node %u to %u had an"
242 " invalid generation id:%u while our"
243 " generation id is:%u\n",
244 hdr->reqid, hdr->length,
245 hdr->srcnode, hdr->destnode,
246 hdr->generation, ctdb->vnn_map->generation));
251 switch (hdr->operation) {
253 ctdb->statistics.node.req_call++;
254 ctdb_request_call(ctdb, hdr);
257 case CTDB_REPLY_CALL:
258 ctdb->statistics.node.reply_call++;
259 ctdb_reply_call(ctdb, hdr);
262 case CTDB_REPLY_ERROR:
263 ctdb->statistics.node.reply_error++;
264 ctdb_reply_error(ctdb, hdr);
267 case CTDB_REQ_DMASTER:
268 ctdb->statistics.node.req_dmaster++;
269 ctdb_request_dmaster(ctdb, hdr);
272 case CTDB_REPLY_DMASTER:
273 ctdb->statistics.node.reply_dmaster++;
274 ctdb_reply_dmaster(ctdb, hdr);
277 case CTDB_REQ_MESSAGE:
278 ctdb->statistics.node.req_message++;
279 ctdb_request_message(ctdb, hdr);
282 case CTDB_REQ_CONTROL:
283 ctdb->statistics.node.req_control++;
284 ctdb_request_control(ctdb, hdr);
287 case CTDB_REPLY_CONTROL:
288 ctdb->statistics.node.reply_control++;
289 ctdb_reply_control(ctdb, hdr);
292 case CTDB_REQ_KEEPALIVE:
293 ctdb->statistics.keepalive_packets_recv++;
297 DEBUG(0,("%s: Packet with unknown operation %u\n",
298 __location__, hdr->operation));
303 talloc_free(tmp_ctx);
308 called by the transport layer when a node is dead
310 void ctdb_node_dead(struct ctdb_node *node)
312 if (node->flags & NODE_FLAGS_DISCONNECTED) {
313 DEBUG(1,("%s: node %s is already marked disconnected: %u connected\n",
314 node->ctdb->name, node->name,
315 node->ctdb->num_connected));
318 node->ctdb->num_connected--;
319 node->flags |= NODE_FLAGS_DISCONNECTED;
321 node->dead_count = 0;
322 DEBUG(1,("%s: node %s is dead: %u connected\n",
323 node->ctdb->name, node->name, node->ctdb->num_connected));
324 ctdb_daemon_cancel_controls(node->ctdb, node);
328 called by the transport layer when a node is connected
330 void ctdb_node_connected(struct ctdb_node *node)
332 if (!(node->flags & NODE_FLAGS_DISCONNECTED)) {
333 DEBUG(1,("%s: node %s is already marked connected: %u connected\n",
334 node->ctdb->name, node->name,
335 node->ctdb->num_connected));
338 node->ctdb->num_connected++;
339 node->dead_count = 0;
340 node->flags &= ~NODE_FLAGS_DISCONNECTED;
341 DEBUG(1,("%s: connected to %s - %u connected\n",
342 node->ctdb->name, node->name, node->ctdb->num_connected));
346 struct ctdb_context *ctdb;
347 struct ctdb_req_header *hdr;
352 trigered when a deferred packet is due
354 static void queue_next_trigger(struct event_context *ev, struct timed_event *te,
355 struct timeval t, void *private_data)
357 struct queue_next *q = talloc_get_type(private_data, struct queue_next);
358 ctdb_input_pkt(q->ctdb, q->hdr);
363 defer a packet, so it is processed on the next event loop
364 this is used for sending packets to ourselves
366 static void ctdb_defer_packet(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
368 struct queue_next *q;
369 q = talloc(ctdb, struct queue_next);
371 DEBUG(0,(__location__ " Failed to allocate deferred packet\n"));
375 q->hdr = talloc_memdup(ctdb, hdr, hdr->length);
376 if (q->hdr == NULL) {
377 DEBUG(0,("Error copying deferred packet to self\n"));
381 /* use this to put packets directly into our recv function */
382 ctdb_input_pkt(q->ctdb, q->hdr);
384 event_add_timed(ctdb->ev, q, timeval_zero(), queue_next_trigger, q);
390 broadcast a packet to all nodes
392 static void ctdb_broadcast_packet_all(struct ctdb_context *ctdb,
393 struct ctdb_req_header *hdr)
396 for (i=0;i<ctdb->num_nodes;i++) {
397 hdr->destnode = ctdb->nodes[i]->vnn;
398 ctdb_queue_packet(ctdb, hdr);
403 broadcast a packet to all nodes in the current vnnmap
405 static void ctdb_broadcast_packet_vnnmap(struct ctdb_context *ctdb,
406 struct ctdb_req_header *hdr)
409 for (i=0;i<ctdb->vnn_map->size;i++) {
410 hdr->destnode = ctdb->vnn_map->map[i];
411 ctdb_queue_packet(ctdb, hdr);
416 broadcast a packet to all connected nodes
418 static void ctdb_broadcast_packet_connected(struct ctdb_context *ctdb,
419 struct ctdb_req_header *hdr)
422 for (i=0;i<ctdb->num_nodes;i++) {
423 if (!(ctdb->nodes[i]->flags & NODE_FLAGS_DISCONNECTED)) {
424 hdr->destnode = ctdb->nodes[i]->vnn;
425 ctdb_queue_packet(ctdb, hdr);
431 queue a packet or die
433 void ctdb_queue_packet(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
435 struct ctdb_node *node;
437 switch (hdr->destnode) {
438 case CTDB_BROADCAST_ALL:
439 ctdb_broadcast_packet_all(ctdb, hdr);
441 case CTDB_BROADCAST_VNNMAP:
442 ctdb_broadcast_packet_vnnmap(ctdb, hdr);
444 case CTDB_BROADCAST_CONNECTED:
445 ctdb_broadcast_packet_connected(ctdb, hdr);
449 ctdb->statistics.node_packets_sent++;
451 if (!ctdb_validate_vnn(ctdb, hdr->destnode)) {
452 DEBUG(0,(__location__ " cant send to node %u that does not exist\n",
457 node = ctdb->nodes[hdr->destnode];
459 if (hdr->destnode == ctdb->vnn) {
460 ctdb_defer_packet(ctdb, hdr);
463 if (ctdb->methods->queue_pkt(node, (uint8_t *)hdr, hdr->length) != 0) {
464 ctdb_fatal(ctdb, "Unable to queue packet\n");