split node health monitoring and checking for connected/disconnected
[metze/ctdb/wip.git] / server / ctdb_daemon.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2006
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "db_wrap.h"
22 #include "lib/tdb/include/tdb.h"
23 #include "lib/events/events.h"
24 #include "lib/util/dlinklist.h"
25 #include "system/network.h"
26 #include "system/filesys.h"
27 #include "system/wait.h"
28 #include "../include/ctdb.h"
29 #include "../include/ctdb_private.h"
30
31 static void daemon_incoming_packet(void *, struct ctdb_req_header *);
32
33 /*
34   handler for when a node changes its flags
35 */
36 static void flag_change_handler(struct ctdb_context *ctdb, uint64_t srvid, 
37                                 TDB_DATA data, void *private_data)
38 {
39         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
40
41         if (data.dsize != sizeof(*c) || !ctdb_validate_pnn(ctdb, c->pnn)) {
42                 DEBUG(0,(__location__ "Invalid data in ctdb_node_flag_change\n"));
43                 return;
44         }
45
46         if (!ctdb_validate_pnn(ctdb, c->pnn)) {
47                 DEBUG(0,("Bad pnn %u in flag_change_handler\n", c->pnn));
48                 return;
49         }
50
51         /* don't get the disconnected flag from the other node */
52         ctdb->nodes[c->pnn]->flags = 
53                 (ctdb->nodes[c->pnn]->flags&NODE_FLAGS_DISCONNECTED) 
54                 | (c->new_flags & ~NODE_FLAGS_DISCONNECTED);    
55         DEBUG(2,("Node flags for node %u are now 0x%x\n", c->pnn, ctdb->nodes[c->pnn]->flags));
56
57         /* make sure we don't hold any IPs when we shouldn't */
58         if (c->pnn == ctdb->pnn &&
59             (ctdb->nodes[c->pnn]->flags & (NODE_FLAGS_INACTIVE|NODE_FLAGS_BANNED))) {
60                 ctdb_release_all_ips(ctdb);
61         }
62 }
63
64 static void print_exit_message(void)
65 {
66         DEBUG(0,("CTDB daemon shutting down\n"));
67 }
68
69
70 /* called when the "startup" event script has finished */
71 static void ctdb_start_transport(struct ctdb_context *ctdb)
72 {
73         /* start the transport running */
74         if (ctdb->methods->start(ctdb) != 0) {
75                 DEBUG(0,("transport failed to start!\n"));
76                 ctdb_fatal(ctdb, "transport failed to start");
77         }
78
79         /* start the recovery daemon process */
80         if (ctdb_start_recoverd(ctdb) != 0) {
81                 DEBUG(0,("Failed to start recovery daemon\n"));
82                 exit(11);
83         }
84
85         /* Make sure we log something when the daemon terminates */
86         atexit(print_exit_message);
87
88         /* a handler for when nodes are disabled/enabled */
89         ctdb_register_message_handler(ctdb, ctdb, CTDB_SRVID_NODE_FLAGS_CHANGED, 
90                                       flag_change_handler, NULL);
91
92         /* start monitoring for connected/disconnected nodes */
93         ctdb_start_keepalive(ctdb);
94
95         /* start monitoring for node health */
96         ctdb_start_monitoring(ctdb);
97
98         /* start periodic update of tcp tickle lists */
99         ctdb_start_tcp_tickle_update(ctdb);
100 }
101
102 static void block_signal(int signum)
103 {
104         struct sigaction act;
105
106         memset(&act, 0, sizeof(act));
107
108         act.sa_handler = SIG_IGN;
109         sigemptyset(&act.sa_mask);
110         sigaddset(&act.sa_mask, signum);
111         sigaction(signum, &act, NULL);
112 }
113
114
115 /*
116   send a packet to a client
117  */
118 static int daemon_queue_send(struct ctdb_client *client, struct ctdb_req_header *hdr)
119 {
120         client->ctdb->statistics.client_packets_sent++;
121         return ctdb_queue_send(client->queue, (uint8_t *)hdr, hdr->length);
122 }
123
124 /*
125   message handler for when we are in daemon mode. This redirects the message
126   to the right client
127  */
128 static void daemon_message_handler(struct ctdb_context *ctdb, uint64_t srvid, 
129                                     TDB_DATA data, void *private_data)
130 {
131         struct ctdb_client *client = talloc_get_type(private_data, struct ctdb_client);
132         struct ctdb_req_message *r;
133         int len;
134
135         /* construct a message to send to the client containing the data */
136         len = offsetof(struct ctdb_req_message, data) + data.dsize;
137         r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE, 
138                                len, struct ctdb_req_message);
139         CTDB_NO_MEMORY_VOID(ctdb, r);
140
141         talloc_set_name_const(r, "req_message packet");
142
143         r->srvid         = srvid;
144         r->datalen       = data.dsize;
145         memcpy(&r->data[0], data.dptr, data.dsize);
146
147         daemon_queue_send(client, &r->hdr);
148
149         talloc_free(r);
150 }
151                                            
152
153 /*
154   this is called when the ctdb daemon received a ctdb request to 
155   set the srvid from the client
156  */
157 int daemon_register_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
158 {
159         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
160         int res;
161         if (client == NULL) {
162                 DEBUG(0,("Bad client_id in daemon_request_register_message_handler\n"));
163                 return -1;
164         }
165         res = ctdb_register_message_handler(ctdb, client, srvid, daemon_message_handler, client);
166         if (res != 0) {
167                 DEBUG(0,(__location__ " Failed to register handler %llu in daemon\n", 
168                          (unsigned long long)srvid));
169         } else {
170                 DEBUG(1,(__location__ " Registered message handler for srvid=%llu\n", 
171                          (unsigned long long)srvid));
172         }
173
174         /* this is a hack for Samba - we now know the pid of the Samba client */
175         if ((srvid & 0xFFFFFFFF) == srvid &&
176             kill(srvid, 0) == 0) {
177                 client->pid = srvid;
178                 DEBUG(1,(__location__ " Registered PID %u for client %u\n",
179                          (unsigned)client->pid, client_id));
180         }
181         return res;
182 }
183
184 /*
185   this is called when the ctdb daemon received a ctdb request to 
186   remove a srvid from the client
187  */
188 int daemon_deregister_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
189 {
190         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
191         if (client == NULL) {
192                 DEBUG(0,("Bad client_id in daemon_request_deregister_message_handler\n"));
193                 return -1;
194         }
195         return ctdb_deregister_message_handler(ctdb, srvid, client);
196 }
197
198
199 /*
200   destroy a ctdb_client
201 */
202 static int ctdb_client_destructor(struct ctdb_client *client)
203 {
204         ctdb_takeover_client_destructor_hook(client);
205         ctdb_reqid_remove(client->ctdb, client->client_id);
206         client->ctdb->statistics.num_clients--;
207         return 0;
208 }
209
210
211 /*
212   this is called when the ctdb daemon received a ctdb request message
213   from a local client over the unix domain socket
214  */
215 static void daemon_request_message_from_client(struct ctdb_client *client, 
216                                                struct ctdb_req_message *c)
217 {
218         TDB_DATA data;
219         int res;
220
221         /* maybe the message is for another client on this node */
222         if (ctdb_get_pnn(client->ctdb)==c->hdr.destnode) {
223                 ctdb_request_message(client->ctdb, (struct ctdb_req_header *)c);
224                 return;
225         }
226
227         /* its for a remote node */
228         data.dptr = &c->data[0];
229         data.dsize = c->datalen;
230         res = ctdb_daemon_send_message(client->ctdb, c->hdr.destnode,
231                                        c->srvid, data);
232         if (res != 0) {
233                 DEBUG(0,(__location__ " Failed to send message to remote node %u\n",
234                          c->hdr.destnode));
235         }
236 }
237
238
239 struct daemon_call_state {
240         struct ctdb_client *client;
241         uint32_t reqid;
242         struct ctdb_call *call;
243         struct timeval start_time;
244 };
245
246 /* 
247    complete a call from a client 
248 */
249 static void daemon_call_from_client_callback(struct ctdb_call_state *state)
250 {
251         struct daemon_call_state *dstate = talloc_get_type(state->async.private_data, 
252                                                            struct daemon_call_state);
253         struct ctdb_reply_call *r;
254         int res;
255         uint32_t length;
256         struct ctdb_client *client = dstate->client;
257
258         talloc_steal(client, dstate);
259         talloc_steal(dstate, dstate->call);
260
261         res = ctdb_daemon_call_recv(state, dstate->call);
262         if (res != 0) {
263                 DEBUG(0, (__location__ " ctdbd_call_recv() returned error\n"));
264                 client->ctdb->statistics.pending_calls--;
265                 ctdb_latency(&client->ctdb->statistics.max_call_latency, dstate->start_time);
266                 return;
267         }
268
269         length = offsetof(struct ctdb_reply_call, data) + dstate->call->reply_data.dsize;
270         r = ctdbd_allocate_pkt(client->ctdb, dstate, CTDB_REPLY_CALL, 
271                                length, struct ctdb_reply_call);
272         if (r == NULL) {
273                 DEBUG(0, (__location__ " Failed to allocate reply_call in ctdb daemon\n"));
274                 client->ctdb->statistics.pending_calls--;
275                 ctdb_latency(&client->ctdb->statistics.max_call_latency, dstate->start_time);
276                 return;
277         }
278         r->hdr.reqid        = dstate->reqid;
279         r->datalen          = dstate->call->reply_data.dsize;
280         memcpy(&r->data[0], dstate->call->reply_data.dptr, r->datalen);
281
282         res = daemon_queue_send(client, &r->hdr);
283         if (res != 0) {
284                 DEBUG(0, (__location__ " Failed to queue packet from daemon to client\n"));
285         }
286         ctdb_latency(&client->ctdb->statistics.max_call_latency, dstate->start_time);
287         talloc_free(dstate);
288         client->ctdb->statistics.pending_calls--;
289 }
290
291
292 static void daemon_request_call_from_client(struct ctdb_client *client, 
293                                             struct ctdb_req_call *c);
294
295 /*
296   this is called when the ctdb daemon received a ctdb request call
297   from a local client over the unix domain socket
298  */
299 static void daemon_request_call_from_client(struct ctdb_client *client, 
300                                             struct ctdb_req_call *c)
301 {
302         struct ctdb_call_state *state;
303         struct ctdb_db_context *ctdb_db;
304         struct daemon_call_state *dstate;
305         struct ctdb_call *call;
306         struct ctdb_ltdb_header header;
307         TDB_DATA key, data;
308         int ret;
309         struct ctdb_context *ctdb = client->ctdb;
310
311         ctdb->statistics.total_calls++;
312         ctdb->statistics.pending_calls++;
313
314         ctdb_db = find_ctdb_db(client->ctdb, c->db_id);
315         if (!ctdb_db) {
316                 DEBUG(0, (__location__ " Unknown database in request. db_id==0x%08x",
317                           c->db_id));
318                 ctdb->statistics.pending_calls--;
319                 return;
320         }
321
322         key.dptr = c->data;
323         key.dsize = c->keylen;
324
325         ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header, 
326                                            (struct ctdb_req_header *)c, &data,
327                                            daemon_incoming_packet, client, True);
328         if (ret == -2) {
329                 /* will retry later */
330                 ctdb->statistics.pending_calls--;
331                 return;
332         }
333
334         if (ret != 0) {
335                 DEBUG(0,(__location__ " Unable to fetch record\n"));
336                 ctdb->statistics.pending_calls--;
337                 return;
338         }
339
340         dstate = talloc(client, struct daemon_call_state);
341         if (dstate == NULL) {
342                 ctdb_ltdb_unlock(ctdb_db, key);
343                 DEBUG(0,(__location__ " Unable to allocate dstate\n"));
344                 ctdb->statistics.pending_calls--;
345                 return;
346         }
347         dstate->start_time = timeval_current();
348         dstate->client = client;
349         dstate->reqid  = c->hdr.reqid;
350         talloc_steal(dstate, data.dptr);
351
352         call = dstate->call = talloc_zero(dstate, struct ctdb_call);
353         if (call == NULL) {
354                 ctdb_ltdb_unlock(ctdb_db, key);
355                 DEBUG(0,(__location__ " Unable to allocate call\n"));
356                 ctdb->statistics.pending_calls--;
357                 ctdb_latency(&ctdb->statistics.max_call_latency, dstate->start_time);
358                 return;
359         }
360
361         call->call_id = c->callid;
362         call->key = key;
363         call->call_data.dptr = c->data + c->keylen;
364         call->call_data.dsize = c->calldatalen;
365         call->flags = c->flags;
366
367         if (header.dmaster == ctdb->pnn) {
368                 state = ctdb_call_local_send(ctdb_db, call, &header, &data);
369         } else {
370                 state = ctdb_daemon_call_send_remote(ctdb_db, call, &header);
371         }
372
373         ctdb_ltdb_unlock(ctdb_db, key);
374
375         if (state == NULL) {
376                 DEBUG(0,(__location__ " Unable to setup call send\n"));
377                 ctdb->statistics.pending_calls--;
378                 ctdb_latency(&ctdb->statistics.max_call_latency, dstate->start_time);
379                 return;
380         }
381         talloc_steal(state, dstate);
382         talloc_steal(client, state);
383
384         state->async.fn = daemon_call_from_client_callback;
385         state->async.private_data = dstate;
386 }
387
388
389 static void daemon_request_control_from_client(struct ctdb_client *client, 
390                                                struct ctdb_req_control *c);
391
392 /* data contains a packet from the client */
393 static void daemon_incoming_packet(void *p, struct ctdb_req_header *hdr)
394 {
395         struct ctdb_client *client = talloc_get_type(p, struct ctdb_client);
396         TALLOC_CTX *tmp_ctx;
397         struct ctdb_context *ctdb = client->ctdb;
398
399         /* place the packet as a child of a tmp_ctx. We then use
400            talloc_free() below to free it. If any of the calls want
401            to keep it, then they will steal it somewhere else, and the
402            talloc_free() will be a no-op */
403         tmp_ctx = talloc_new(client);
404         talloc_steal(tmp_ctx, hdr);
405
406         if (hdr->ctdb_magic != CTDB_MAGIC) {
407                 ctdb_set_error(client->ctdb, "Non CTDB packet rejected in daemon\n");
408                 goto done;
409         }
410
411         if (hdr->ctdb_version != CTDB_VERSION) {
412                 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
413                 goto done;
414         }
415
416         switch (hdr->operation) {
417         case CTDB_REQ_CALL:
418                 ctdb->statistics.client.req_call++;
419                 daemon_request_call_from_client(client, (struct ctdb_req_call *)hdr);
420                 break;
421
422         case CTDB_REQ_MESSAGE:
423                 ctdb->statistics.client.req_message++;
424                 daemon_request_message_from_client(client, (struct ctdb_req_message *)hdr);
425                 break;
426
427         case CTDB_REQ_CONTROL:
428                 ctdb->statistics.client.req_control++;
429                 daemon_request_control_from_client(client, (struct ctdb_req_control *)hdr);
430                 break;
431
432         default:
433                 DEBUG(0,(__location__ " daemon: unrecognized operation %u\n",
434                          hdr->operation));
435         }
436
437 done:
438         talloc_free(tmp_ctx);
439 }
440
441 /*
442   called when the daemon gets a incoming packet
443  */
444 static void ctdb_daemon_read_cb(uint8_t *data, size_t cnt, void *args)
445 {
446         struct ctdb_client *client = talloc_get_type(args, struct ctdb_client);
447         struct ctdb_req_header *hdr;
448
449         if (cnt == 0) {
450                 talloc_free(client);
451                 return;
452         }
453
454         client->ctdb->statistics.client_packets_recv++;
455
456         if (cnt < sizeof(*hdr)) {
457                 ctdb_set_error(client->ctdb, "Bad packet length %u in daemon\n", 
458                                (unsigned)cnt);
459                 return;
460         }
461         hdr = (struct ctdb_req_header *)data;
462         if (cnt != hdr->length) {
463                 ctdb_set_error(client->ctdb, "Bad header length %u expected %u\n in daemon", 
464                                (unsigned)hdr->length, (unsigned)cnt);
465                 return;
466         }
467
468         if (hdr->ctdb_magic != CTDB_MAGIC) {
469                 ctdb_set_error(client->ctdb, "Non CTDB packet rejected\n");
470                 return;
471         }
472
473         if (hdr->ctdb_version != CTDB_VERSION) {
474                 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
475                 return;
476         }
477
478         DEBUG(3,(__location__ " client request %u of type %u length %u from "
479                  "node %u to %u\n", hdr->reqid, hdr->operation, hdr->length,
480                  hdr->srcnode, hdr->destnode));
481
482         /* it is the responsibility of the incoming packet function to free 'data' */
483         daemon_incoming_packet(client, hdr);
484 }
485
486 static void ctdb_accept_client(struct event_context *ev, struct fd_event *fde, 
487                          uint16_t flags, void *private_data)
488 {
489         struct sockaddr_in addr;
490         socklen_t len;
491         int fd;
492         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
493         struct ctdb_client *client;
494
495         memset(&addr, 0, sizeof(addr));
496         len = sizeof(addr);
497         fd = accept(ctdb->daemon.sd, (struct sockaddr *)&addr, &len);
498         if (fd == -1) {
499                 return;
500         }
501
502         set_nonblocking(fd);
503         set_close_on_exec(fd);
504
505         client = talloc_zero(ctdb, struct ctdb_client);
506         client->ctdb = ctdb;
507         client->fd = fd;
508         client->client_id = ctdb_reqid_new(ctdb, client);
509         ctdb->statistics.num_clients++;
510
511         client->queue = ctdb_queue_setup(ctdb, client, fd, CTDB_DS_ALIGNMENT, 
512                                          ctdb_daemon_read_cb, client);
513
514         talloc_set_destructor(client, ctdb_client_destructor);
515 }
516
517
518
519 /*
520   create a unix domain socket and bind it
521   return a file descriptor open on the socket 
522 */
523 static int ux_socket_bind(struct ctdb_context *ctdb)
524 {
525         struct sockaddr_un addr;
526
527         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
528         if (ctdb->daemon.sd == -1) {
529                 return -1;
530         }
531
532         set_nonblocking(ctdb->daemon.sd);
533         set_close_on_exec(ctdb->daemon.sd);
534
535 #if 0
536         /* AIX doesn't like this :( */
537         if (fchown(ctdb->daemon.sd, geteuid(), getegid()) != 0 ||
538             fchmod(ctdb->daemon.sd, 0700) != 0) {
539                 DEBUG(0,("Unable to secure ctdb socket '%s', ctdb->daemon.name\n"));
540                 goto failed;
541         }
542 #endif
543
544         set_nonblocking(ctdb->daemon.sd);
545
546         memset(&addr, 0, sizeof(addr));
547         addr.sun_family = AF_UNIX;
548         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
549
550         if (bind(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
551                 DEBUG(0,("Unable to bind on ctdb socket '%s'\n", ctdb->daemon.name));
552                 goto failed;
553         }       
554         if (listen(ctdb->daemon.sd, 10) != 0) {
555                 DEBUG(0,("Unable to listen on ctdb socket '%s'\n", ctdb->daemon.name));
556                 goto failed;
557         }
558
559         return 0;
560
561 failed:
562         close(ctdb->daemon.sd);
563         ctdb->daemon.sd = -1;
564         return -1;      
565 }
566
567 /*
568   delete the socket on exit - called on destruction of autofree context
569  */
570 static int unlink_destructor(const char *name)
571 {
572         unlink(name);
573         return 0;
574 }
575
576 /*
577   start the protocol going as a daemon
578 */
579 int ctdb_start_daemon(struct ctdb_context *ctdb, bool do_fork)
580 {
581         int res, ret = -1;
582         struct fd_event *fde;
583         const char *domain_socket_name;
584
585         /* get rid of any old sockets */
586         unlink(ctdb->daemon.name);
587
588         /* create a unix domain stream socket to listen to */
589         res = ux_socket_bind(ctdb);
590         if (res!=0) {
591                 DEBUG(0,(__location__ " Failed to open CTDB unix domain socket\n"));
592                 exit(10);
593         }
594
595         if (do_fork && fork()) {
596                 return 0;
597         }
598
599         tdb_reopen_all(False);
600
601         if (do_fork) {
602                 setsid();
603                 close(0);
604                 if (open("/dev/null", O_RDONLY) != 0) {
605                         DEBUG(0,(__location__ " Failed to setup stdin on /dev/null\n"));
606                         exit(11);
607                 }
608         }
609         block_signal(SIGPIPE);
610
611         if (ctdb->do_setsched) {
612                 /* try to set us up as realtime */
613                 ctdb_set_scheduler(ctdb);
614         }
615
616         /* ensure the socket is deleted on exit of the daemon */
617         domain_socket_name = talloc_strdup(talloc_autofree_context(), ctdb->daemon.name);
618         talloc_set_destructor(domain_socket_name, unlink_destructor);   
619
620         ctdb->ev = event_context_init(NULL);
621
622         /* force initial recovery for election */
623         ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
624
625         if (strcmp(ctdb->transport, "tcp") == 0) {
626                 int ctdb_tcp_init(struct ctdb_context *);
627                 ret = ctdb_tcp_init(ctdb);
628         }
629 #ifdef USE_INFINIBAND
630         if (strcmp(ctdb->transport, "ib") == 0) {
631                 int ctdb_ibw_init(struct ctdb_context *);
632                 ret = ctdb_ibw_init(ctdb);
633         }
634 #endif
635         if (ret != 0) {
636                 DEBUG(0,("Failed to initialise transport '%s'\n", ctdb->transport));
637                 return -1;
638         }
639
640         /* initialise the transport  */
641         if (ctdb->methods->initialise(ctdb) != 0) {
642                 DEBUG(0,("transport failed to initialise!\n"));
643                 ctdb_fatal(ctdb, "transport failed to initialise");
644         }
645
646         /* attach to any existing persistent databases */
647         if (ctdb_attach_persistent(ctdb) != 0) {
648                 ctdb_fatal(ctdb, "Failed to attach to persistent databases\n");         
649         }
650
651         /* start frozen, then let the first election sort things out */
652         if (!ctdb_blocking_freeze(ctdb)) {
653                 ctdb_fatal(ctdb, "Failed to get initial freeze\n");
654         }
655
656         /* now start accepting clients, only can do this once frozen */
657         fde = event_add_fd(ctdb->ev, ctdb, ctdb->daemon.sd, 
658                            EVENT_FD_READ|EVENT_FD_AUTOCLOSE, 
659                            ctdb_accept_client, ctdb);
660
661         /* tell all other nodes we've just started up */
662         ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL,
663                                  0, CTDB_CONTROL_STARTUP, 0,
664                                  CTDB_CTRL_FLAG_NOREPLY,
665                                  tdb_null, NULL, NULL);
666
667         /* release any IPs we hold from previous runs of the daemon */
668         ctdb_release_all_ips(ctdb);
669
670         /* start the transport going */
671         ctdb_start_transport(ctdb);
672
673         /* go into a wait loop to allow other nodes to complete */
674         event_loop_wait(ctdb->ev);
675
676         DEBUG(0,("event_loop_wait() returned. this should not happen\n"));
677         exit(1);
678 }
679
680 /*
681   allocate a packet for use in daemon<->daemon communication
682  */
683 struct ctdb_req_header *_ctdb_transport_allocate(struct ctdb_context *ctdb,
684                                                  TALLOC_CTX *mem_ctx, 
685                                                  enum ctdb_operation operation, 
686                                                  size_t length, size_t slength,
687                                                  const char *type)
688 {
689         int size;
690         struct ctdb_req_header *hdr;
691
692         length = MAX(length, slength);
693         size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
694
695         hdr = (struct ctdb_req_header *)ctdb->methods->allocate_pkt(mem_ctx, size);
696         if (hdr == NULL) {
697                 DEBUG(0,("Unable to allocate transport packet for operation %u of length %u\n",
698                          operation, (unsigned)length));
699                 return NULL;
700         }
701         talloc_set_name_const(hdr, type);
702         memset(hdr, 0, slength);
703         hdr->length       = length;
704         hdr->operation    = operation;
705         hdr->ctdb_magic   = CTDB_MAGIC;
706         hdr->ctdb_version = CTDB_VERSION;
707         hdr->generation   = ctdb->vnn_map->generation;
708         hdr->srcnode      = ctdb->pnn;
709
710         return hdr;     
711 }
712
713 struct daemon_control_state {
714         struct daemon_control_state *next, *prev;
715         struct ctdb_client *client;
716         struct ctdb_req_control *c;
717         uint32_t reqid;
718         struct ctdb_node *node;
719 };
720
721 /*
722   callback when a control reply comes in
723  */
724 static void daemon_control_callback(struct ctdb_context *ctdb,
725                                     int32_t status, TDB_DATA data, 
726                                     const char *errormsg,
727                                     void *private_data)
728 {
729         struct daemon_control_state *state = talloc_get_type(private_data, 
730                                                              struct daemon_control_state);
731         struct ctdb_client *client = state->client;
732         struct ctdb_reply_control *r;
733         size_t len;
734
735         /* construct a message to send to the client containing the data */
736         len = offsetof(struct ctdb_reply_control, data) + data.dsize;
737         if (errormsg) {
738                 len += strlen(errormsg);
739         }
740         r = ctdbd_allocate_pkt(ctdb, state, CTDB_REPLY_CONTROL, len, 
741                                struct ctdb_reply_control);
742         CTDB_NO_MEMORY_VOID(ctdb, r);
743
744         r->hdr.reqid     = state->reqid;
745         r->status        = status;
746         r->datalen       = data.dsize;
747         r->errorlen = 0;
748         memcpy(&r->data[0], data.dptr, data.dsize);
749         if (errormsg) {
750                 r->errorlen = strlen(errormsg);
751                 memcpy(&r->data[r->datalen], errormsg, r->errorlen);
752         }
753
754         daemon_queue_send(client, &r->hdr);
755
756         talloc_free(state);
757 }
758
759 /*
760   fail all pending controls to a disconnected node
761  */
762 void ctdb_daemon_cancel_controls(struct ctdb_context *ctdb, struct ctdb_node *node)
763 {
764         struct daemon_control_state *state;
765         while ((state = node->pending_controls)) {
766                 DLIST_REMOVE(node->pending_controls, state);
767                 daemon_control_callback(ctdb, (uint32_t)-1, tdb_null, 
768                                         "node is disconnected", state);
769         }
770 }
771
772 /*
773   destroy a daemon_control_state
774  */
775 static int daemon_control_destructor(struct daemon_control_state *state)
776 {
777         if (state->node) {
778                 DLIST_REMOVE(state->node->pending_controls, state);
779         }
780         return 0;
781 }
782
783 /*
784   this is called when the ctdb daemon received a ctdb request control
785   from a local client over the unix domain socket
786  */
787 static void daemon_request_control_from_client(struct ctdb_client *client, 
788                                                struct ctdb_req_control *c)
789 {
790         TDB_DATA data;
791         int res;
792         struct daemon_control_state *state;
793         TALLOC_CTX *tmp_ctx = talloc_new(client);
794
795         if (c->hdr.destnode == CTDB_CURRENT_NODE) {
796                 c->hdr.destnode = client->ctdb->pnn;
797         }
798
799         state = talloc(client, struct daemon_control_state);
800         CTDB_NO_MEMORY_VOID(client->ctdb, state);
801
802         state->client = client;
803         state->c = talloc_steal(state, c);
804         state->reqid = c->hdr.reqid;
805         if (ctdb_validate_pnn(client->ctdb, c->hdr.destnode)) {
806                 state->node = client->ctdb->nodes[c->hdr.destnode];
807                 DLIST_ADD(state->node->pending_controls, state);
808         } else {
809                 state->node = NULL;
810         }
811
812         talloc_set_destructor(state, daemon_control_destructor);
813
814         if (c->flags & CTDB_CTRL_FLAG_NOREPLY) {
815                 talloc_steal(tmp_ctx, state);
816         }
817         
818         data.dptr = &c->data[0];
819         data.dsize = c->datalen;
820         res = ctdb_daemon_send_control(client->ctdb, c->hdr.destnode,
821                                        c->srvid, c->opcode, client->client_id,
822                                        c->flags,
823                                        data, daemon_control_callback,
824                                        state);
825         if (res != 0) {
826                 DEBUG(0,(__location__ " Failed to send control to remote node %u\n",
827                          c->hdr.destnode));
828         }
829
830         talloc_free(tmp_ctx);
831 }
832
833 /*
834   register a call function
835 */
836 int ctdb_daemon_set_call(struct ctdb_context *ctdb, uint32_t db_id,
837                          ctdb_fn_t fn, int id)
838 {
839         struct ctdb_registered_call *call;
840         struct ctdb_db_context *ctdb_db;
841
842         ctdb_db = find_ctdb_db(ctdb, db_id);
843         if (ctdb_db == NULL) {
844                 return -1;
845         }
846
847         call = talloc(ctdb_db, struct ctdb_registered_call);
848         call->fn = fn;
849         call->id = id;
850
851         DLIST_ADD(ctdb_db->calls, call);        
852         return 0;
853 }
854
855
856
857 /*
858   this local messaging handler is ugly, but is needed to prevent
859   recursion in ctdb_send_message() when the destination node is the
860   same as the source node
861  */
862 struct ctdb_local_message {
863         struct ctdb_context *ctdb;
864         uint64_t srvid;
865         TDB_DATA data;
866 };
867
868 static void ctdb_local_message_trigger(struct event_context *ev, struct timed_event *te, 
869                                        struct timeval t, void *private_data)
870 {
871         struct ctdb_local_message *m = talloc_get_type(private_data, 
872                                                        struct ctdb_local_message);
873         int res;
874
875         res = ctdb_dispatch_message(m->ctdb, m->srvid, m->data);
876         if (res != 0) {
877                 DEBUG(0, (__location__ " Failed to dispatch message for srvid=%llu\n", 
878                           (unsigned long long)m->srvid));
879         }
880         talloc_free(m);
881 }
882
883 static int ctdb_local_message(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data)
884 {
885         struct ctdb_local_message *m;
886         m = talloc(ctdb, struct ctdb_local_message);
887         CTDB_NO_MEMORY(ctdb, m);
888
889         m->ctdb = ctdb;
890         m->srvid = srvid;
891         m->data  = data;
892         m->data.dptr = talloc_memdup(m, m->data.dptr, m->data.dsize);
893         if (m->data.dptr == NULL) {
894                 talloc_free(m);
895                 return -1;
896         }
897
898         /* this needs to be done as an event to prevent recursion */
899         event_add_timed(ctdb->ev, m, timeval_zero(), ctdb_local_message_trigger, m);
900         return 0;
901 }
902
903 /*
904   send a ctdb message
905 */
906 int ctdb_daemon_send_message(struct ctdb_context *ctdb, uint32_t pnn,
907                              uint64_t srvid, TDB_DATA data)
908 {
909         struct ctdb_req_message *r;
910         int len;
911
912         /* see if this is a message to ourselves */
913         if (pnn == ctdb->pnn) {
914                 return ctdb_local_message(ctdb, srvid, data);
915         }
916
917         len = offsetof(struct ctdb_req_message, data) + data.dsize;
918         r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_MESSAGE, len,
919                                     struct ctdb_req_message);
920         CTDB_NO_MEMORY(ctdb, r);
921
922         r->hdr.destnode  = pnn;
923         r->srvid         = srvid;
924         r->datalen       = data.dsize;
925         memcpy(&r->data[0], data.dptr, data.dsize);
926
927         ctdb_queue_packet(ctdb, &r->hdr);
928
929         talloc_free(r);
930         return 0;
931 }
932