Merge branch 'master' of ssh://git.samba.org/data/git/samba into noejs
[amitay/samba.git] / source4 / cluster / ctdb / server / ctdb_server.c
1 /* 
2    ctdb main protocol code
3
4    Copyright (C) Andrew Tridgell  2006
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "../tdb/include/tdb.h"
22 #include "lib/events/events.h"
23 #include "lib/util/dlinklist.h"
24 #include "system/network.h"
25 #include "system/filesys.h"
26 #include "../include/ctdb_private.h"
27
28 /*
29   choose the transport we will use
30 */
31 int ctdb_set_transport(struct ctdb_context *ctdb, const char *transport)
32 {
33         ctdb->transport = talloc_strdup(ctdb, transport);
34         return 0;
35 }
36
37 /*
38   choose the recovery lock file
39 */
40 int ctdb_set_recovery_lock_file(struct ctdb_context *ctdb, const char *file)
41 {
42         ctdb->recovery_lock_file = talloc_strdup(ctdb, file);
43         return 0;
44 }
45
46 /*
47   choose the logfile location
48 */
49 int ctdb_set_logfile(struct ctdb_context *ctdb, const char *logfile)
50 {
51         ctdb->logfile = talloc_strdup(ctdb, logfile);
52         if (ctdb->logfile != NULL && strcmp(logfile, "-") != 0) {
53                 int fd;
54                 fd = open(ctdb->logfile, O_WRONLY|O_APPEND|O_CREAT, 0666);
55                 if (fd == -1) {
56                         printf("Failed to open logfile %s\n", ctdb->logfile);
57                         abort();
58                 }
59                 close(1);
60                 close(2);
61                 if (fd != 1) {
62                         dup2(fd, 1);
63                         close(fd);
64                 }
65                 /* also catch stderr of subcommands to the log file */
66                 dup2(1, 2);
67         }
68         return 0;
69 }
70
71
72 /*
73   set the directory for the local databases
74 */
75 int ctdb_set_tdb_dir(struct ctdb_context *ctdb, const char *dir)
76 {
77         ctdb->db_directory = talloc_strdup(ctdb, dir);
78         if (ctdb->db_directory == NULL) {
79                 return -1;
80         }
81         return 0;
82 }
83
84 /*
85   add a node to the list of active nodes
86 */
87 static int ctdb_add_node(struct ctdb_context *ctdb, char *nstr)
88 {
89         struct ctdb_node *node, **nodep;
90
91         nodep = talloc_realloc(ctdb, ctdb->nodes, struct ctdb_node *, ctdb->num_nodes+1);
92         CTDB_NO_MEMORY(ctdb, nodep);
93
94         ctdb->nodes = nodep;
95         nodep = &ctdb->nodes[ctdb->num_nodes];
96         (*nodep) = talloc_zero(ctdb->nodes, struct ctdb_node);
97         CTDB_NO_MEMORY(ctdb, *nodep);
98         node = *nodep;
99
100         if (ctdb_parse_address(ctdb, node, nstr, &node->address) != 0) {
101                 return -1;
102         }
103         node->ctdb = ctdb;
104         node->name = talloc_asprintf(node, "%s:%u", 
105                                      node->address.address, 
106                                      node->address.port);
107         /* this assumes that the nodes are kept in sorted order, and no gaps */
108         node->vnn = ctdb->num_nodes;
109
110         /* nodes start out disconnected */
111         node->flags |= NODE_FLAGS_DISCONNECTED;
112
113         if (ctdb->address.address &&
114             ctdb_same_address(&ctdb->address, &node->address)) {
115                 ctdb->vnn = node->vnn;
116                 node->flags &= ~NODE_FLAGS_DISCONNECTED;
117         }
118
119         ctdb->num_nodes++;
120         node->dead_count = 0;
121
122         return 0;
123 }
124
125 /*
126   setup the node list from a file
127 */
128 int ctdb_set_nlist(struct ctdb_context *ctdb, const char *nlist)
129 {
130         char **lines;
131         int nlines;
132         int i;
133
134         talloc_free(ctdb->node_list_file);
135         ctdb->node_list_file = talloc_strdup(ctdb, nlist);
136
137         lines = file_lines_load(nlist, &nlines, ctdb);
138         if (lines == NULL) {
139                 ctdb_set_error(ctdb, "Failed to load nlist '%s'\n", nlist);
140                 return -1;
141         }
142         while (nlines > 0 && strcmp(lines[nlines-1], "") == 0) {
143                 nlines--;
144         }
145
146         for (i=0;i<nlines;i++) {
147                 if (ctdb_add_node(ctdb, lines[i]) != 0) {
148                         talloc_free(lines);
149                         return -1;
150                 }
151         }
152
153         /* initialize the vnn mapping table now that we have num_nodes setup */
154 /*
155 XXX we currently initialize it to the maximum number of nodes to 
156 XXX make it behave the same way as previously.  
157 XXX Once we have recovery working we should initialize this always to 
158 XXX generation==0 (==invalid) and let the recovery tool populate this 
159 XXX table for the daemons. 
160 */
161         ctdb->vnn_map = talloc(ctdb, struct ctdb_vnn_map);
162         CTDB_NO_MEMORY(ctdb, ctdb->vnn_map);
163
164         ctdb->vnn_map->generation = 1;
165         ctdb->vnn_map->size = ctdb->num_nodes;
166         ctdb->vnn_map->map = talloc_array(ctdb->vnn_map, uint32_t, ctdb->vnn_map->size);
167         CTDB_NO_MEMORY(ctdb, ctdb->vnn_map->map);
168
169         for(i=0;i<ctdb->vnn_map->size;i++) {
170                 ctdb->vnn_map->map[i] = i;
171         }
172         
173         talloc_free(lines);
174         return 0;
175 }
176
177
178 /*
179   setup the local node address
180 */
181 int ctdb_set_address(struct ctdb_context *ctdb, const char *address)
182 {
183         if (ctdb_parse_address(ctdb, ctdb, address, &ctdb->address) != 0) {
184                 return -1;
185         }
186         
187         ctdb->name = talloc_asprintf(ctdb, "%s:%u", 
188                                      ctdb->address.address, 
189                                      ctdb->address.port);
190         return 0;
191 }
192
193
194 /*
195   return the number of active nodes
196 */
197 uint32_t ctdb_get_num_active_nodes(struct ctdb_context *ctdb)
198 {
199         int i;
200         uint32_t count=0;
201         for (i=0;i<ctdb->vnn_map->size;i++) {
202                 struct ctdb_node *node = ctdb->nodes[ctdb->vnn_map->map[i]];
203                 if (!(node->flags & NODE_FLAGS_INACTIVE)) {
204                         count++;
205                 }
206         }
207         return count;
208 }
209
210
211 /*
212   called when we need to process a packet. This can be a requeued packet
213   after a lockwait, or a real packet from another node
214 */
215 void ctdb_input_pkt(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
216 {
217         TALLOC_CTX *tmp_ctx;
218
219         /* place the packet as a child of the tmp_ctx. We then use
220            talloc_free() below to free it. If any of the calls want
221            to keep it, then they will steal it somewhere else, and the
222            talloc_free() will only free the tmp_ctx */
223         tmp_ctx = talloc_new(ctdb);
224         talloc_steal(tmp_ctx, hdr);
225
226         DEBUG(3,(__location__ " ctdb request %u of type %u length %u from "
227                  "node %u to %u\n", hdr->reqid, hdr->operation, hdr->length,
228                  hdr->srcnode, hdr->destnode));
229
230         switch (hdr->operation) {
231         case CTDB_REQ_CALL:
232         case CTDB_REPLY_CALL:
233         case CTDB_REQ_DMASTER:
234         case CTDB_REPLY_DMASTER:
235                 /* for ctdb_call inter-node operations verify that the
236                    remote node that sent us the call is running in the
237                    same generation instance as this node
238                 */
239                 if (ctdb->vnn_map->generation != hdr->generation) {
240                         DEBUG(0,(__location__ " ctdb request %u"
241                                 " length %u from node %u to %u had an"
242                                 " invalid generation id:%u while our"
243                                 " generation id is:%u\n", 
244                                  hdr->reqid, hdr->length, 
245                                  hdr->srcnode, hdr->destnode, 
246                                  hdr->generation, ctdb->vnn_map->generation));
247                         goto done;
248                 }
249         }
250
251         switch (hdr->operation) {
252         case CTDB_REQ_CALL:
253                 ctdb->statistics.node.req_call++;
254                 ctdb_request_call(ctdb, hdr);
255                 break;
256
257         case CTDB_REPLY_CALL:
258                 ctdb->statistics.node.reply_call++;
259                 ctdb_reply_call(ctdb, hdr);
260                 break;
261
262         case CTDB_REPLY_ERROR:
263                 ctdb->statistics.node.reply_error++;
264                 ctdb_reply_error(ctdb, hdr);
265                 break;
266
267         case CTDB_REQ_DMASTER:
268                 ctdb->statistics.node.req_dmaster++;
269                 ctdb_request_dmaster(ctdb, hdr);
270                 break;
271
272         case CTDB_REPLY_DMASTER:
273                 ctdb->statistics.node.reply_dmaster++;
274                 ctdb_reply_dmaster(ctdb, hdr);
275                 break;
276
277         case CTDB_REQ_MESSAGE:
278                 ctdb->statistics.node.req_message++;
279                 ctdb_request_message(ctdb, hdr);
280                 break;
281
282         case CTDB_REQ_CONTROL:
283                 ctdb->statistics.node.req_control++;
284                 ctdb_request_control(ctdb, hdr);
285                 break;
286
287         case CTDB_REPLY_CONTROL:
288                 ctdb->statistics.node.reply_control++;
289                 ctdb_reply_control(ctdb, hdr);
290                 break;
291
292         case CTDB_REQ_KEEPALIVE:
293                 ctdb->statistics.keepalive_packets_recv++;
294                 break;
295
296         default:
297                 DEBUG(0,("%s: Packet with unknown operation %u\n", 
298                          __location__, hdr->operation));
299                 break;
300         }
301
302 done:
303         talloc_free(tmp_ctx);
304 }
305
306
307 /*
308   called by the transport layer when a node is dead
309 */
310 void ctdb_node_dead(struct ctdb_node *node)
311 {
312         if (node->flags & NODE_FLAGS_DISCONNECTED) {
313                 DEBUG(1,("%s: node %s is already marked disconnected: %u connected\n", 
314                          node->ctdb->name, node->name, 
315                          node->ctdb->num_connected));
316                 return;
317         }
318         node->ctdb->num_connected--;
319         node->flags |= NODE_FLAGS_DISCONNECTED;
320         node->rx_cnt = 0;
321         node->dead_count = 0;
322         DEBUG(1,("%s: node %s is dead: %u connected\n", 
323                  node->ctdb->name, node->name, node->ctdb->num_connected));
324         ctdb_daemon_cancel_controls(node->ctdb, node);
325 }
326
327 /*
328   called by the transport layer when a node is connected
329 */
330 void ctdb_node_connected(struct ctdb_node *node)
331 {
332         if (!(node->flags & NODE_FLAGS_DISCONNECTED)) {
333                 DEBUG(1,("%s: node %s is already marked connected: %u connected\n", 
334                          node->ctdb->name, node->name, 
335                          node->ctdb->num_connected));
336                 return;
337         }
338         node->ctdb->num_connected++;
339         node->dead_count = 0;
340         node->flags &= ~NODE_FLAGS_DISCONNECTED;
341         DEBUG(1,("%s: connected to %s - %u connected\n", 
342                  node->ctdb->name, node->name, node->ctdb->num_connected));
343 }
344
345 struct queue_next {
346         struct ctdb_context *ctdb;
347         struct ctdb_req_header *hdr;
348 };
349
350
351 /*
352   trigered when a deferred packet is due
353  */
354 static void queue_next_trigger(struct event_context *ev, struct timed_event *te, 
355                                struct timeval t, void *private_data)
356 {
357         struct queue_next *q = talloc_get_type(private_data, struct queue_next);
358         ctdb_input_pkt(q->ctdb, q->hdr);
359         talloc_free(q);
360 }       
361
362 /*
363   defer a packet, so it is processed on the next event loop
364   this is used for sending packets to ourselves
365  */
366 static void ctdb_defer_packet(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
367 {
368         struct queue_next *q;
369         q = talloc(ctdb, struct queue_next);
370         if (q == NULL) {
371                 DEBUG(0,(__location__ " Failed to allocate deferred packet\n"));
372                 return;
373         }
374         q->ctdb = ctdb;
375         q->hdr = talloc_memdup(ctdb, hdr, hdr->length);
376         if (q->hdr == NULL) {
377                 DEBUG(0,("Error copying deferred packet to self\n"));
378                 return;
379         }
380 #if 0
381         /* use this to put packets directly into our recv function */
382         ctdb_input_pkt(q->ctdb, q->hdr);
383 #else
384         event_add_timed(ctdb->ev, q, timeval_zero(), queue_next_trigger, q);
385 #endif
386 }
387
388
389 /*
390   broadcast a packet to all nodes
391 */
392 static void ctdb_broadcast_packet_all(struct ctdb_context *ctdb, 
393                                       struct ctdb_req_header *hdr)
394 {
395         int i;
396         for (i=0;i<ctdb->num_nodes;i++) {
397                 hdr->destnode = ctdb->nodes[i]->vnn;
398                 ctdb_queue_packet(ctdb, hdr);
399         }
400 }
401
402 /*
403   broadcast a packet to all nodes in the current vnnmap
404 */
405 static void ctdb_broadcast_packet_vnnmap(struct ctdb_context *ctdb, 
406                                          struct ctdb_req_header *hdr)
407 {
408         int i;
409         for (i=0;i<ctdb->vnn_map->size;i++) {
410                 hdr->destnode = ctdb->vnn_map->map[i];
411                 ctdb_queue_packet(ctdb, hdr);
412         }
413 }
414
415 /*
416   broadcast a packet to all connected nodes
417 */
418 static void ctdb_broadcast_packet_connected(struct ctdb_context *ctdb, 
419                                             struct ctdb_req_header *hdr)
420 {
421         int i;
422         for (i=0;i<ctdb->num_nodes;i++) {
423                 if (!(ctdb->nodes[i]->flags & NODE_FLAGS_DISCONNECTED)) {
424                         hdr->destnode = ctdb->nodes[i]->vnn;
425                         ctdb_queue_packet(ctdb, hdr);
426                 }
427         }
428 }
429
430 /*
431   queue a packet or die
432 */
433 void ctdb_queue_packet(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
434 {
435         struct ctdb_node *node;
436
437         switch (hdr->destnode) {
438         case CTDB_BROADCAST_ALL:
439                 ctdb_broadcast_packet_all(ctdb, hdr);
440                 return;
441         case CTDB_BROADCAST_VNNMAP:
442                 ctdb_broadcast_packet_vnnmap(ctdb, hdr);
443                 return;
444         case CTDB_BROADCAST_CONNECTED:
445                 ctdb_broadcast_packet_connected(ctdb, hdr);
446                 return;
447         }
448
449         ctdb->statistics.node_packets_sent++;
450
451         if (!ctdb_validate_vnn(ctdb, hdr->destnode)) {
452                 DEBUG(0,(__location__ " cant send to node %u that does not exist\n", 
453                          hdr->destnode));
454                 return;
455         }
456
457         node = ctdb->nodes[hdr->destnode];
458
459         if (hdr->destnode == ctdb->vnn) {
460                 ctdb_defer_packet(ctdb, hdr);
461         } else {
462                 node->tx_cnt++;
463                 if (ctdb->methods->queue_pkt(node, (uint8_t *)hdr, hdr->length) != 0) {
464                         ctdb_fatal(ctdb, "Unable to queue packet\n");
465                 }
466         }
467 }
468
469