2 ctdb main protocol code
4 Copyright (C) Andrew Tridgell 2006
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
22 #include "lib/util/dlinklist.h"
23 #include "system/network.h"
24 #include "system/filesys.h"
25 #include "../include/ctdb_private.h"
28 choose the transport we will use
30 int ctdb_set_transport(struct ctdb_context *ctdb, const char *transport)
32 ctdb->transport = talloc_strdup(ctdb, transport);
33 CTDB_NO_MEMORY(ctdb, ctdb->transport);
39 Check whether an ip is a valid node ip
40 Returns the node id for this ip address or -1
42 int ctdb_ip_to_nodeid(struct ctdb_context *ctdb, const ctdb_sock_addr *nodeip)
46 for (nodeid=0;nodeid<ctdb->num_nodes;nodeid++) {
47 if (ctdb->nodes[nodeid]->flags & NODE_FLAGS_DELETED) {
50 if (ctdb_same_ip(&ctdb->nodes[nodeid]->address, nodeip)) {
59 choose the recovery lock file
61 int ctdb_set_recovery_lock_file(struct ctdb_context *ctdb, const char *file)
63 if (ctdb->recovery_lock_file != NULL) {
64 talloc_free(ctdb->recovery_lock_file);
65 ctdb->recovery_lock_file = NULL;
69 DEBUG(DEBUG_ALERT,("Recovery lock file set to \"\". Disabling recovery lock checking\n"));
73 ctdb->recovery_lock_file = talloc_strdup(ctdb, file);
74 CTDB_NO_MEMORY(ctdb, ctdb->recovery_lock_file);
79 /* Load a nodes list file into a nodes array */
80 static int convert_node_map_to_list(struct ctdb_context *ctdb,
82 struct ctdb_node_map *node_map,
83 struct ctdb_node ***nodes,
88 *nodes = talloc_zero_array(mem_ctx,
89 struct ctdb_node *, node_map->num);
90 CTDB_NO_MEMORY(ctdb, *nodes);
91 *num_nodes = node_map->num;
93 for (i = 0; i < node_map->num; i++) {
94 struct ctdb_node *node;
96 node = talloc_zero(*nodes, struct ctdb_node);
97 CTDB_NO_MEMORY(ctdb, node);
100 node->address = node_map->nodes[i].addr;
101 node->name = talloc_asprintf(node, "%s:%u",
102 ctdb_addr_to_str(&node->address),
103 ctdb_addr_to_port(&node->address));
105 node->flags = node_map->nodes[i].flags;
106 if (!(node->flags & NODE_FLAGS_DELETED)) {
107 node->flags = NODE_FLAGS_UNHEALTHY;
109 node->flags |= NODE_FLAGS_DISCONNECTED;
113 node->dead_count = 0;
119 /* Load the nodes list from a file */
120 void ctdb_load_nodes_file(struct ctdb_context *ctdb)
122 struct ctdb_node_map *node_map;
125 node_map = ctdb_read_nodes_file(ctdb, ctdb->nodes_file);
126 if (node_map == NULL) {
130 TALLOC_FREE(ctdb->nodes);
131 ret = convert_node_map_to_list(ctdb, ctdb, node_map,
132 &ctdb->nodes, &ctdb->num_nodes);
137 talloc_free(node_map);
141 DEBUG(DEBUG_ERR, ("Failed to load nodes file \"%s\"\n",
143 talloc_free(node_map);
148 setup the local node address
150 int ctdb_set_address(struct ctdb_context *ctdb, const char *address)
152 ctdb->address = talloc(ctdb, ctdb_sock_addr);
153 CTDB_NO_MEMORY(ctdb, ctdb->address);
155 if (ctdb_parse_address(ctdb, address, ctdb->address) != 0) {
159 ctdb->name = talloc_asprintf(ctdb, "%s:%u",
160 ctdb_addr_to_str(ctdb->address),
161 ctdb_addr_to_port(ctdb->address));
167 return the number of active nodes
169 uint32_t ctdb_get_num_active_nodes(struct ctdb_context *ctdb)
173 for (i=0; i < ctdb->num_nodes; i++) {
174 if (!(ctdb->nodes[i]->flags & NODE_FLAGS_INACTIVE)) {
183 called when we need to process a packet. This can be a requeued packet
184 after a lockwait, or a real packet from another node
186 void ctdb_input_pkt(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
190 /* place the packet as a child of the tmp_ctx. We then use
191 talloc_free() below to free it. If any of the calls want
192 to keep it, then they will steal it somewhere else, and the
193 talloc_free() will only free the tmp_ctx */
194 tmp_ctx = talloc_new(ctdb);
195 talloc_steal(tmp_ctx, hdr);
197 DEBUG(DEBUG_DEBUG,(__location__ " ctdb request %u of type %u length %u from "
198 "node %u to %u\n", hdr->reqid, hdr->operation, hdr->length,
199 hdr->srcnode, hdr->destnode));
201 switch (hdr->operation) {
203 case CTDB_REPLY_CALL:
204 case CTDB_REQ_DMASTER:
205 case CTDB_REPLY_DMASTER:
206 /* we dont allow these calls when banned */
207 if (ctdb->nodes[ctdb->pnn]->flags & NODE_FLAGS_BANNED) {
208 DEBUG(DEBUG_DEBUG,(__location__ " ctdb operation %u"
210 " length %u from node %u to %u while node"
212 hdr->operation, hdr->reqid,
214 hdr->srcnode, hdr->destnode));
218 /* Push the check for generation in the handlers for these
219 * operations. Check database generation instead of global
220 * generation. Since the database context is not available
221 * here, push the check in the operations.
225 switch (hdr->operation) {
227 CTDB_INCREMENT_STAT(ctdb, node.req_call);
228 ctdb_request_call(ctdb, hdr);
231 case CTDB_REPLY_CALL:
232 CTDB_INCREMENT_STAT(ctdb, node.reply_call);
233 ctdb_reply_call(ctdb, hdr);
236 case CTDB_REPLY_ERROR:
237 CTDB_INCREMENT_STAT(ctdb, node.reply_error);
238 ctdb_reply_error(ctdb, hdr);
241 case CTDB_REQ_DMASTER:
242 CTDB_INCREMENT_STAT(ctdb, node.req_dmaster);
243 ctdb_request_dmaster(ctdb, hdr);
246 case CTDB_REPLY_DMASTER:
247 CTDB_INCREMENT_STAT(ctdb, node.reply_dmaster);
248 ctdb_reply_dmaster(ctdb, hdr);
251 case CTDB_REQ_MESSAGE:
252 CTDB_INCREMENT_STAT(ctdb, node.req_message);
253 ctdb_request_message(ctdb, hdr);
256 case CTDB_REQ_CONTROL:
257 CTDB_INCREMENT_STAT(ctdb, node.req_control);
258 ctdb_request_control(ctdb, hdr);
261 case CTDB_REPLY_CONTROL:
262 CTDB_INCREMENT_STAT(ctdb, node.reply_control);
263 ctdb_reply_control(ctdb, hdr);
266 case CTDB_REQ_KEEPALIVE:
267 CTDB_INCREMENT_STAT(ctdb, keepalive_packets_recv);
271 DEBUG(DEBUG_CRIT,("%s: Packet with unknown operation %u\n",
272 __location__, hdr->operation));
277 talloc_free(tmp_ctx);
282 called by the transport layer when a node is dead
284 void ctdb_node_dead(struct ctdb_node *node)
286 if (node->flags & NODE_FLAGS_DISCONNECTED) {
287 DEBUG(DEBUG_INFO,("%s: node %s is already marked disconnected: %u connected\n",
288 node->ctdb->name, node->name,
289 node->ctdb->num_connected));
292 node->ctdb->num_connected--;
293 node->flags |= NODE_FLAGS_DISCONNECTED | NODE_FLAGS_UNHEALTHY;
295 node->dead_count = 0;
297 DEBUG(DEBUG_NOTICE,("%s: node %s is dead: %u connected\n",
298 node->ctdb->name, node->name, node->ctdb->num_connected));
299 ctdb_daemon_cancel_controls(node->ctdb, node);
301 if (node->ctdb->methods == NULL) {
302 DEBUG(DEBUG_ERR,(__location__ " Can not restart transport while shutting down daemon.\n"));
306 node->ctdb->methods->restart(node);
310 called by the transport layer when a node is connected
312 void ctdb_node_connected(struct ctdb_node *node)
314 if (!(node->flags & NODE_FLAGS_DISCONNECTED)) {
315 DEBUG(DEBUG_INFO,("%s: node %s is already marked connected: %u connected\n",
316 node->ctdb->name, node->name,
317 node->ctdb->num_connected));
320 node->ctdb->num_connected++;
321 node->dead_count = 0;
322 node->flags &= ~NODE_FLAGS_DISCONNECTED;
323 node->flags |= NODE_FLAGS_UNHEALTHY;
325 ("%s: connected to %s - %u connected\n",
326 node->ctdb->name, node->name, node->ctdb->num_connected));
330 struct ctdb_context *ctdb;
331 struct ctdb_req_header *hdr;
336 triggered when a deferred packet is due
338 static void queue_next_trigger(struct event_context *ev, struct timed_event *te,
339 struct timeval t, void *private_data)
341 struct queue_next *q = talloc_get_type(private_data, struct queue_next);
342 ctdb_input_pkt(q->ctdb, q->hdr);
347 defer a packet, so it is processed on the next event loop
348 this is used for sending packets to ourselves
350 static void ctdb_defer_packet(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
352 struct queue_next *q;
353 q = talloc(ctdb, struct queue_next);
355 DEBUG(DEBUG_ERR,(__location__ " Failed to allocate deferred packet\n"));
359 q->hdr = talloc_memdup(ctdb, hdr, hdr->length);
360 if (q->hdr == NULL) {
361 DEBUG(DEBUG_ERR,("Error copying deferred packet to self\n"));
365 /* use this to put packets directly into our recv function */
366 ctdb_input_pkt(q->ctdb, q->hdr);
368 event_add_timed(ctdb->ev, q, timeval_zero(), queue_next_trigger, q);
374 broadcast a packet to all nodes
376 static void ctdb_broadcast_packet_all(struct ctdb_context *ctdb,
377 struct ctdb_req_header *hdr)
380 for (i=0; i < ctdb->num_nodes; i++) {
381 if (ctdb->nodes[i]->flags & NODE_FLAGS_DELETED) {
384 hdr->destnode = ctdb->nodes[i]->pnn;
385 ctdb_queue_packet(ctdb, hdr);
390 broadcast a packet to all nodes in the current vnnmap
392 static void ctdb_broadcast_packet_vnnmap(struct ctdb_context *ctdb,
393 struct ctdb_req_header *hdr)
396 for (i=0;i<ctdb->vnn_map->size;i++) {
397 hdr->destnode = ctdb->vnn_map->map[i];
398 ctdb_queue_packet(ctdb, hdr);
403 broadcast a packet to all connected nodes
405 static void ctdb_broadcast_packet_connected(struct ctdb_context *ctdb,
406 struct ctdb_req_header *hdr)
409 for (i=0; i < ctdb->num_nodes; i++) {
410 if (ctdb->nodes[i]->flags & NODE_FLAGS_DELETED) {
413 if (!(ctdb->nodes[i]->flags & NODE_FLAGS_DISCONNECTED)) {
414 hdr->destnode = ctdb->nodes[i]->pnn;
415 ctdb_queue_packet(ctdb, hdr);
421 queue a packet or die
423 void ctdb_queue_packet(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
425 struct ctdb_node *node;
427 switch (hdr->destnode) {
428 case CTDB_BROADCAST_ALL:
429 ctdb_broadcast_packet_all(ctdb, hdr);
431 case CTDB_BROADCAST_VNNMAP:
432 ctdb_broadcast_packet_vnnmap(ctdb, hdr);
434 case CTDB_BROADCAST_CONNECTED:
435 ctdb_broadcast_packet_connected(ctdb, hdr);
439 CTDB_INCREMENT_STAT(ctdb, node_packets_sent);
441 if (!ctdb_validate_pnn(ctdb, hdr->destnode)) {
442 DEBUG(DEBUG_CRIT,(__location__ " cant send to node %u that does not exist\n",
447 node = ctdb->nodes[hdr->destnode];
449 if (node->flags & NODE_FLAGS_DELETED) {
450 DEBUG(DEBUG_ERR, (__location__ " Can not queue packet to DELETED node %d\n", hdr->destnode));
454 if (node->pnn == ctdb->pnn) {
455 ctdb_defer_packet(ctdb, hdr);
459 if (ctdb->methods == NULL) {
460 DEBUG(DEBUG_ALERT, (__location__ " Can not queue packet. "
461 "Transport is DOWN\n"));
466 if (ctdb->methods->queue_pkt(node, (uint8_t *)hdr, hdr->length) != 0) {
467 ctdb_fatal(ctdb, "Unable to queue packet\n");
475 a valgrind hack to allow us to get opcode specific backtraces
476 very ugly, and relies on no compiler optimisation!
478 void ctdb_queue_packet_opcode(struct ctdb_context *ctdb, struct ctdb_req_header *hdr, unsigned opcode)
481 #define DO_OP(x) case x: ctdb_queue_packet(ctdb, hdr); break
583 ctdb_queue_packet(ctdb, hdr);