validate vnn on node flags change
[metze/ctdb/wip.git] / common / ctdb_daemon.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2006
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 2 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, write to the Free Software
18    Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
19 */
20
21 #include "includes.h"
22 #include "db_wrap.h"
23 #include "lib/tdb/include/tdb.h"
24 #include "lib/events/events.h"
25 #include "lib/util/dlinklist.h"
26 #include "system/network.h"
27 #include "system/filesys.h"
28 #include "system/wait.h"
29 #include "../include/ctdb.h"
30 #include "../include/ctdb_private.h"
31
32 static void daemon_incoming_packet(void *, struct ctdb_req_header *);
33
34 /*
35   handler for when a node changes its flags
36 */
37 static void flag_change_handler(struct ctdb_context *ctdb, uint64_t srvid, 
38                                 TDB_DATA data, void *private_data)
39 {
40         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
41
42         if (data.dsize != sizeof(*c) || !ctdb_validate_vnn(ctdb, c->vnn)) {
43                 DEBUG(0,(__location__ "Invalid data in ctdb_node_flag_change\n"));
44                 return;
45         }
46
47         if (!ctdb_validate_vnn(ctdb, c->vnn)) {
48                 DEBUG(0,("Bad vnn %u in flag_change_handler\n", c->vnn));
49                 return;
50         }
51
52         /* don't get the disconnected flag from the other node */
53         ctdb->nodes[c->vnn]->flags = 
54                 (ctdb->nodes[c->vnn]->flags&NODE_FLAGS_DISCONNECTED) 
55                 | (c->flags & ~NODE_FLAGS_DISCONNECTED);        
56         DEBUG(2,("Node flags for node %u are now 0x%x\n", c->vnn, ctdb->nodes[c->vnn]->flags));
57 }
58
59 /* called when the "startup" event script has finished */
60 static void ctdb_start_transport(struct ctdb_context *ctdb, int status, void *p)
61 {
62         if (status != 0) {
63                 DEBUG(0,("startup event failed!\n"));
64                 ctdb_fatal(ctdb, "startup event script failed");                
65         }
66
67         /* start the transport running */
68         if (ctdb->methods->start(ctdb) != 0) {
69                 DEBUG(0,("transport failed to start!\n"));
70                 ctdb_fatal(ctdb, "transport failed to start");
71         }
72
73         /* start the recovery daemon process */
74         if (ctdb_start_recoverd(ctdb) != 0) {
75                 DEBUG(0,("Failed to start recovery daemon\n"));
76                 exit(11);
77         }
78
79         /* a handler for when nodes are disabled/enabled */
80         ctdb_register_message_handler(ctdb, ctdb, CTDB_SRVID_NODE_FLAGS_CHANGED, 
81                                       flag_change_handler, NULL);
82
83         /* start monitoring for dead nodes */
84         ctdb_start_monitoring(ctdb);
85 }
86
87 /* go into main ctdb loop */
88 static void ctdb_main_loop(struct ctdb_context *ctdb)
89 {
90         int ret = -1;
91
92         if (strcmp(ctdb->transport, "tcp") == 0) {
93                 int ctdb_tcp_init(struct ctdb_context *);
94                 ret = ctdb_tcp_init(ctdb);
95         }
96 #ifdef USE_INFINIBAND
97         if (strcmp(ctdb->transport, "ib") == 0) {
98                 int ctdb_ibw_init(struct ctdb_context *);
99                 ret = ctdb_ibw_init(ctdb);
100         }
101 #endif
102         if (ret != 0) {
103                 DEBUG(0,("Failed to initialise transport '%s'\n", ctdb->transport));
104                 return;
105         }
106
107         /* initialise the transport  */
108         if (ctdb->methods->initialise(ctdb) != 0) {
109                 DEBUG(0,("transport failed to initialise!\n"));
110                 ctdb_fatal(ctdb, "transport failed to initialise");
111         }
112
113         /* tell all other nodes we've just started up */
114         ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL,
115                                  0, CTDB_CONTROL_STARTUP, 0,
116                                  CTDB_CTRL_FLAG_NOREPLY,
117                                  tdb_null, NULL, NULL);
118
119         ret = ctdb_event_script_callback(ctdb, timeval_zero(), ctdb, 
120                                          ctdb_start_transport, NULL, "startup");
121         if (ret != 0) {
122                 DEBUG(0,("Failed startup event script\n"));
123                 return;
124         }
125
126         /* go into a wait loop to allow other nodes to complete */
127         event_loop_wait(ctdb->ev);
128
129         DEBUG(0,("event_loop_wait() returned. this should not happen\n"));
130         exit(1);
131 }
132
133
134 static void block_signal(int signum)
135 {
136         struct sigaction act;
137
138         memset(&act, 0, sizeof(act));
139
140         act.sa_handler = SIG_IGN;
141         sigemptyset(&act.sa_mask);
142         sigaddset(&act.sa_mask, signum);
143         sigaction(signum, &act, NULL);
144 }
145
146
147 /*
148   send a packet to a client
149  */
150 static int daemon_queue_send(struct ctdb_client *client, struct ctdb_req_header *hdr)
151 {
152         client->ctdb->statistics.client_packets_sent++;
153         return ctdb_queue_send(client->queue, (uint8_t *)hdr, hdr->length);
154 }
155
156 /*
157   message handler for when we are in daemon mode. This redirects the message
158   to the right client
159  */
160 static void daemon_message_handler(struct ctdb_context *ctdb, uint64_t srvid, 
161                                     TDB_DATA data, void *private_data)
162 {
163         struct ctdb_client *client = talloc_get_type(private_data, struct ctdb_client);
164         struct ctdb_req_message *r;
165         int len;
166
167         /* construct a message to send to the client containing the data */
168         len = offsetof(struct ctdb_req_message, data) + data.dsize;
169         r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE, 
170                                len, struct ctdb_req_message);
171         CTDB_NO_MEMORY_VOID(ctdb, r);
172
173         talloc_set_name_const(r, "req_message packet");
174
175         r->srvid         = srvid;
176         r->datalen       = data.dsize;
177         memcpy(&r->data[0], data.dptr, data.dsize);
178
179         daemon_queue_send(client, &r->hdr);
180
181         talloc_free(r);
182 }
183                                            
184
185 /*
186   this is called when the ctdb daemon received a ctdb request to 
187   set the srvid from the client
188  */
189 int daemon_register_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
190 {
191         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
192         int res;
193         if (client == NULL) {
194                 DEBUG(0,("Bad client_id in daemon_request_register_message_handler\n"));
195                 return -1;
196         }
197         res = ctdb_register_message_handler(ctdb, client, srvid, daemon_message_handler, client);
198         if (res != 0) {
199                 DEBUG(0,(__location__ " Failed to register handler %llu in daemon\n", 
200                          (unsigned long long)srvid));
201         } else {
202                 DEBUG(2,(__location__ " Registered message handler for srvid=%llu\n", 
203                          (unsigned long long)srvid));
204         }
205         return res;
206 }
207
208 /*
209   this is called when the ctdb daemon received a ctdb request to 
210   remove a srvid from the client
211  */
212 int daemon_deregister_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
213 {
214         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
215         if (client == NULL) {
216                 DEBUG(0,("Bad client_id in daemon_request_deregister_message_handler\n"));
217                 return -1;
218         }
219         return ctdb_deregister_message_handler(ctdb, srvid, client);
220 }
221
222
223 /*
224   destroy a ctdb_client
225 */
226 static int ctdb_client_destructor(struct ctdb_client *client)
227 {
228         ctdb_takeover_client_destructor_hook(client);
229         ctdb_reqid_remove(client->ctdb, client->client_id);
230         client->ctdb->statistics.num_clients--;
231         return 0;
232 }
233
234
235 /*
236   this is called when the ctdb daemon received a ctdb request message
237   from a local client over the unix domain socket
238  */
239 static void daemon_request_message_from_client(struct ctdb_client *client, 
240                                                struct ctdb_req_message *c)
241 {
242         TDB_DATA data;
243         int res;
244
245         /* maybe the message is for another client on this node */
246         if (ctdb_get_vnn(client->ctdb)==c->hdr.destnode) {
247                 ctdb_request_message(client->ctdb, (struct ctdb_req_header *)c);
248                 return;
249         }
250
251         /* its for a remote node */
252         data.dptr = &c->data[0];
253         data.dsize = c->datalen;
254         res = ctdb_daemon_send_message(client->ctdb, c->hdr.destnode,
255                                        c->srvid, data);
256         if (res != 0) {
257                 DEBUG(0,(__location__ " Failed to send message to remote node %u\n",
258                          c->hdr.destnode));
259         }
260 }
261
262
263 struct daemon_call_state {
264         struct ctdb_client *client;
265         uint32_t reqid;
266         struct ctdb_call *call;
267         struct timeval start_time;
268 };
269
270 /* 
271    complete a call from a client 
272 */
273 static void daemon_call_from_client_callback(struct ctdb_call_state *state)
274 {
275         struct daemon_call_state *dstate = talloc_get_type(state->async.private_data, 
276                                                            struct daemon_call_state);
277         struct ctdb_reply_call *r;
278         int res;
279         uint32_t length;
280         struct ctdb_client *client = dstate->client;
281
282         talloc_steal(client, dstate);
283         talloc_steal(dstate, dstate->call);
284
285         res = ctdb_daemon_call_recv(state, dstate->call);
286         if (res != 0) {
287                 DEBUG(0, (__location__ " ctdbd_call_recv() returned error\n"));
288                 client->ctdb->statistics.pending_calls--;
289                 ctdb_latency(&client->ctdb->statistics.max_call_latency, dstate->start_time);
290                 return;
291         }
292
293         length = offsetof(struct ctdb_reply_call, data) + dstate->call->reply_data.dsize;
294         r = ctdbd_allocate_pkt(client->ctdb, dstate, CTDB_REPLY_CALL, 
295                                length, struct ctdb_reply_call);
296         if (r == NULL) {
297                 DEBUG(0, (__location__ " Failed to allocate reply_call in ctdb daemon\n"));
298                 client->ctdb->statistics.pending_calls--;
299                 ctdb_latency(&client->ctdb->statistics.max_call_latency, dstate->start_time);
300                 return;
301         }
302         r->hdr.reqid        = dstate->reqid;
303         r->datalen          = dstate->call->reply_data.dsize;
304         memcpy(&r->data[0], dstate->call->reply_data.dptr, r->datalen);
305
306         res = daemon_queue_send(client, &r->hdr);
307         if (res != 0) {
308                 DEBUG(0, (__location__ " Failed to queue packet from daemon to client\n"));
309         }
310         ctdb_latency(&client->ctdb->statistics.max_call_latency, dstate->start_time);
311         talloc_free(dstate);
312         client->ctdb->statistics.pending_calls--;
313 }
314
315
316 static void daemon_request_call_from_client(struct ctdb_client *client, 
317                                             struct ctdb_req_call *c);
318
319 /*
320   this is called when the ctdb daemon received a ctdb request call
321   from a local client over the unix domain socket
322  */
323 static void daemon_request_call_from_client(struct ctdb_client *client, 
324                                             struct ctdb_req_call *c)
325 {
326         struct ctdb_call_state *state;
327         struct ctdb_db_context *ctdb_db;
328         struct daemon_call_state *dstate;
329         struct ctdb_call *call;
330         struct ctdb_ltdb_header header;
331         TDB_DATA key, data;
332         int ret;
333         struct ctdb_context *ctdb = client->ctdb;
334
335         ctdb->statistics.total_calls++;
336         ctdb->statistics.pending_calls++;
337
338         ctdb_db = find_ctdb_db(client->ctdb, c->db_id);
339         if (!ctdb_db) {
340                 DEBUG(0, (__location__ " Unknown database in request. db_id==0x%08x",
341                           c->db_id));
342                 ctdb->statistics.pending_calls--;
343                 return;
344         }
345
346         key.dptr = c->data;
347         key.dsize = c->keylen;
348
349         ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header, 
350                                            (struct ctdb_req_header *)c, &data,
351                                            daemon_incoming_packet, client, True);
352         if (ret == -2) {
353                 /* will retry later */
354                 ctdb->statistics.pending_calls--;
355                 return;
356         }
357
358         if (ret != 0) {
359                 DEBUG(0,(__location__ " Unable to fetch record\n"));
360                 ctdb->statistics.pending_calls--;
361                 return;
362         }
363
364         dstate = talloc(client, struct daemon_call_state);
365         if (dstate == NULL) {
366                 ctdb_ltdb_unlock(ctdb_db, key);
367                 DEBUG(0,(__location__ " Unable to allocate dstate\n"));
368                 ctdb->statistics.pending_calls--;
369                 return;
370         }
371         dstate->start_time = timeval_current();
372         dstate->client = client;
373         dstate->reqid  = c->hdr.reqid;
374         talloc_steal(dstate, data.dptr);
375
376         call = dstate->call = talloc_zero(dstate, struct ctdb_call);
377         if (call == NULL) {
378                 ctdb_ltdb_unlock(ctdb_db, key);
379                 DEBUG(0,(__location__ " Unable to allocate call\n"));
380                 ctdb->statistics.pending_calls--;
381                 ctdb_latency(&ctdb->statistics.max_call_latency, dstate->start_time);
382                 return;
383         }
384
385         call->call_id = c->callid;
386         call->key = key;
387         call->call_data.dptr = c->data + c->keylen;
388         call->call_data.dsize = c->calldatalen;
389         call->flags = c->flags;
390
391         if (header.dmaster == ctdb->vnn) {
392                 state = ctdb_call_local_send(ctdb_db, call, &header, &data);
393         } else {
394                 state = ctdb_daemon_call_send_remote(ctdb_db, call, &header);
395         }
396
397         ctdb_ltdb_unlock(ctdb_db, key);
398
399         if (state == NULL) {
400                 DEBUG(0,(__location__ " Unable to setup call send\n"));
401                 ctdb->statistics.pending_calls--;
402                 ctdb_latency(&ctdb->statistics.max_call_latency, dstate->start_time);
403                 return;
404         }
405         talloc_steal(state, dstate);
406         talloc_steal(client, state);
407
408         state->async.fn = daemon_call_from_client_callback;
409         state->async.private_data = dstate;
410 }
411
412
413 static void daemon_request_control_from_client(struct ctdb_client *client, 
414                                                struct ctdb_req_control *c);
415
416 /* data contains a packet from the client */
417 static void daemon_incoming_packet(void *p, struct ctdb_req_header *hdr)
418 {
419         struct ctdb_client *client = talloc_get_type(p, struct ctdb_client);
420         TALLOC_CTX *tmp_ctx;
421         struct ctdb_context *ctdb = client->ctdb;
422
423         /* place the packet as a child of a tmp_ctx. We then use
424            talloc_free() below to free it. If any of the calls want
425            to keep it, then they will steal it somewhere else, and the
426            talloc_free() will be a no-op */
427         tmp_ctx = talloc_new(client);
428         talloc_steal(tmp_ctx, hdr);
429
430         if (hdr->ctdb_magic != CTDB_MAGIC) {
431                 ctdb_set_error(client->ctdb, "Non CTDB packet rejected in daemon\n");
432                 goto done;
433         }
434
435         if (hdr->ctdb_version != CTDB_VERSION) {
436                 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
437                 goto done;
438         }
439
440         switch (hdr->operation) {
441         case CTDB_REQ_CALL:
442                 ctdb->statistics.client.req_call++;
443                 daemon_request_call_from_client(client, (struct ctdb_req_call *)hdr);
444                 break;
445
446         case CTDB_REQ_MESSAGE:
447                 ctdb->statistics.client.req_message++;
448                 daemon_request_message_from_client(client, (struct ctdb_req_message *)hdr);
449                 break;
450
451         case CTDB_REQ_CONTROL:
452                 ctdb->statistics.client.req_control++;
453                 daemon_request_control_from_client(client, (struct ctdb_req_control *)hdr);
454                 break;
455
456         default:
457                 DEBUG(0,(__location__ " daemon: unrecognized operation %u\n",
458                          hdr->operation));
459         }
460
461 done:
462         talloc_free(tmp_ctx);
463 }
464
465 /*
466   called when the daemon gets a incoming packet
467  */
468 static void ctdb_daemon_read_cb(uint8_t *data, size_t cnt, void *args)
469 {
470         struct ctdb_client *client = talloc_get_type(args, struct ctdb_client);
471         struct ctdb_req_header *hdr;
472
473         if (cnt == 0) {
474                 talloc_free(client);
475                 return;
476         }
477
478         client->ctdb->statistics.client_packets_recv++;
479
480         if (cnt < sizeof(*hdr)) {
481                 ctdb_set_error(client->ctdb, "Bad packet length %u in daemon\n", 
482                                (unsigned)cnt);
483                 return;
484         }
485         hdr = (struct ctdb_req_header *)data;
486         if (cnt != hdr->length) {
487                 ctdb_set_error(client->ctdb, "Bad header length %u expected %u\n in daemon", 
488                                (unsigned)hdr->length, (unsigned)cnt);
489                 return;
490         }
491
492         if (hdr->ctdb_magic != CTDB_MAGIC) {
493                 ctdb_set_error(client->ctdb, "Non CTDB packet rejected\n");
494                 return;
495         }
496
497         if (hdr->ctdb_version != CTDB_VERSION) {
498                 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
499                 return;
500         }
501
502         DEBUG(3,(__location__ " client request %u of type %u length %u from "
503                  "node %u to %u\n", hdr->reqid, hdr->operation, hdr->length,
504                  hdr->srcnode, hdr->destnode));
505
506         /* it is the responsibility of the incoming packet function to free 'data' */
507         daemon_incoming_packet(client, hdr);
508 }
509
510 static void ctdb_accept_client(struct event_context *ev, struct fd_event *fde, 
511                          uint16_t flags, void *private_data)
512 {
513         struct sockaddr_in addr;
514         socklen_t len;
515         int fd;
516         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
517         struct ctdb_client *client;
518
519         memset(&addr, 0, sizeof(addr));
520         len = sizeof(addr);
521         fd = accept(ctdb->daemon.sd, (struct sockaddr *)&addr, &len);
522         if (fd == -1) {
523                 return;
524         }
525
526         set_nonblocking(fd);
527         set_close_on_exec(fd);
528
529         client = talloc_zero(ctdb, struct ctdb_client);
530         client->ctdb = ctdb;
531         client->fd = fd;
532         client->client_id = ctdb_reqid_new(ctdb, client);
533         ctdb->statistics.num_clients++;
534
535         client->queue = ctdb_queue_setup(ctdb, client, fd, CTDB_DS_ALIGNMENT, 
536                                          ctdb_daemon_read_cb, client);
537
538         talloc_set_destructor(client, ctdb_client_destructor);
539 }
540
541
542
543 /*
544   create a unix domain socket and bind it
545   return a file descriptor open on the socket 
546 */
547 static int ux_socket_bind(struct ctdb_context *ctdb)
548 {
549         struct sockaddr_un addr;
550
551         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
552         if (ctdb->daemon.sd == -1) {
553                 return -1;
554         }
555
556         set_nonblocking(ctdb->daemon.sd);
557         set_close_on_exec(ctdb->daemon.sd);
558
559 #if 0
560         /* AIX doesn't like this :( */
561         if (fchown(ctdb->daemon.sd, geteuid(), getegid()) != 0 ||
562             fchmod(ctdb->daemon.sd, 0700) != 0) {
563                 DEBUG(0,("Unable to secure ctdb socket '%s', ctdb->daemon.name\n"));
564                 goto failed;
565         }
566 #endif
567
568         set_nonblocking(ctdb->daemon.sd);
569
570         memset(&addr, 0, sizeof(addr));
571         addr.sun_family = AF_UNIX;
572         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
573
574         if (bind(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
575                 DEBUG(0,("Unable to bind on ctdb socket '%s'\n", ctdb->daemon.name));
576                 goto failed;
577         }       
578         if (listen(ctdb->daemon.sd, 10) != 0) {
579                 DEBUG(0,("Unable to listen on ctdb socket '%s'\n", ctdb->daemon.name));
580                 goto failed;
581         }
582
583         return 0;
584
585 failed:
586         close(ctdb->daemon.sd);
587         ctdb->daemon.sd = -1;
588         return -1;      
589 }
590
591 /*
592   delete the socket on exit - called on destruction of autofree context
593  */
594 static int unlink_destructor(const char *name)
595 {
596         unlink(name);
597         return 0;
598 }
599
600
601 /*
602   start the protocol going as a daemon
603 */
604 int ctdb_start_daemon(struct ctdb_context *ctdb, bool do_fork)
605 {
606         int res;
607         struct fd_event *fde;
608         const char *domain_socket_name;
609
610         /* get rid of any old sockets */
611         unlink(ctdb->daemon.name);
612
613         /* create a unix domain stream socket to listen to */
614         res = ux_socket_bind(ctdb);
615         if (res!=0) {
616                 DEBUG(0,(__location__ " Failed to open CTDB unix domain socket\n"));
617                 exit(10);
618         }
619
620         if (do_fork && fork()) {
621                 return 0;
622         }
623
624         tdb_reopen_all(False);
625
626         if (do_fork) {
627                 setsid();
628         }
629         block_signal(SIGPIPE);
630
631         /* try to set us up as realtime */
632         ctdb_set_realtime(true);
633
634         /* ensure the socket is deleted on exit of the daemon */
635         domain_socket_name = talloc_strdup(talloc_autofree_context(), ctdb->daemon.name);
636         talloc_set_destructor(domain_socket_name, unlink_destructor);   
637
638         ctdb->ev = event_context_init(NULL);
639
640         /* start frozen, then let the first election sort things out */
641         if (!ctdb_blocking_freeze(ctdb)) {
642                 DEBUG(0,("Failed to get initial freeze\n"));
643                 exit(12);
644         }
645
646         /* force initial recovery for election */
647         ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
648
649         /* now start accepting clients, only can do this once frozen */
650         fde = event_add_fd(ctdb->ev, ctdb, ctdb->daemon.sd, 
651                            EVENT_FD_READ|EVENT_FD_AUTOCLOSE, 
652                            ctdb_accept_client, ctdb);
653
654         ctdb_main_loop(ctdb);
655
656         return 0;
657 }
658
659 /*
660   allocate a packet for use in client<->daemon communication
661  */
662 struct ctdb_req_header *_ctdbd_allocate_pkt(struct ctdb_context *ctdb,
663                                             TALLOC_CTX *mem_ctx, 
664                                             enum ctdb_operation operation, 
665                                             size_t length, size_t slength,
666                                             const char *type)
667 {
668         int size;
669         struct ctdb_req_header *hdr;
670
671         length = MAX(length, slength);
672         size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
673
674         hdr = (struct ctdb_req_header *)talloc_size(mem_ctx, size);
675         if (hdr == NULL) {
676                 DEBUG(0,("Unable to allocate packet for operation %u of length %u\n",
677                          operation, (unsigned)length));
678                 return NULL;
679         }
680         talloc_set_name_const(hdr, type);
681         memset(hdr, 0, slength);
682         hdr->length       = length;
683         hdr->operation    = operation;
684         hdr->ctdb_magic   = CTDB_MAGIC;
685         hdr->ctdb_version = CTDB_VERSION;
686         hdr->srcnode      = ctdb->vnn;
687         if (ctdb->vnn_map) {
688                 hdr->generation = ctdb->vnn_map->generation;
689         }
690
691         return hdr;
692 }
693
694
695 /*
696   allocate a packet for use in daemon<->daemon communication
697  */
698 struct ctdb_req_header *_ctdb_transport_allocate(struct ctdb_context *ctdb,
699                                                  TALLOC_CTX *mem_ctx, 
700                                                  enum ctdb_operation operation, 
701                                                  size_t length, size_t slength,
702                                                  const char *type)
703 {
704         int size;
705         struct ctdb_req_header *hdr;
706
707         length = MAX(length, slength);
708         size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
709
710         hdr = (struct ctdb_req_header *)ctdb->methods->allocate_pkt(mem_ctx, size);
711         if (hdr == NULL) {
712                 DEBUG(0,("Unable to allocate transport packet for operation %u of length %u\n",
713                          operation, (unsigned)length));
714                 return NULL;
715         }
716         talloc_set_name_const(hdr, type);
717         memset(hdr, 0, slength);
718         hdr->length       = length;
719         hdr->operation    = operation;
720         hdr->ctdb_magic   = CTDB_MAGIC;
721         hdr->ctdb_version = CTDB_VERSION;
722         hdr->generation   = ctdb->vnn_map->generation;
723         hdr->srcnode      = ctdb->vnn;
724
725         return hdr;     
726 }
727
728 struct daemon_control_state {
729         struct daemon_control_state *next, *prev;
730         struct ctdb_client *client;
731         struct ctdb_req_control *c;
732         uint32_t reqid;
733         struct ctdb_node *node;
734 };
735
736 /*
737   callback when a control reply comes in
738  */
739 static void daemon_control_callback(struct ctdb_context *ctdb,
740                                     int32_t status, TDB_DATA data, 
741                                     const char *errormsg,
742                                     void *private_data)
743 {
744         struct daemon_control_state *state = talloc_get_type(private_data, 
745                                                              struct daemon_control_state);
746         struct ctdb_client *client = state->client;
747         struct ctdb_reply_control *r;
748         size_t len;
749
750         /* construct a message to send to the client containing the data */
751         len = offsetof(struct ctdb_reply_control, data) + data.dsize;
752         if (errormsg) {
753                 len += strlen(errormsg);
754         }
755         r = ctdbd_allocate_pkt(ctdb, state, CTDB_REPLY_CONTROL, len, 
756                                struct ctdb_reply_control);
757         CTDB_NO_MEMORY_VOID(ctdb, r);
758
759         r->hdr.reqid     = state->reqid;
760         r->status        = status;
761         r->datalen       = data.dsize;
762         r->errorlen = 0;
763         memcpy(&r->data[0], data.dptr, data.dsize);
764         if (errormsg) {
765                 r->errorlen = strlen(errormsg);
766                 memcpy(&r->data[r->datalen], errormsg, r->errorlen);
767         }
768
769         daemon_queue_send(client, &r->hdr);
770
771         talloc_free(state);
772 }
773
774 /*
775   fail all pending controls to a disconnected node
776  */
777 void ctdb_daemon_cancel_controls(struct ctdb_context *ctdb, struct ctdb_node *node)
778 {
779         struct daemon_control_state *state;
780         while ((state = node->pending_controls)) {
781                 DLIST_REMOVE(node->pending_controls, state);
782                 daemon_control_callback(ctdb, (uint32_t)-1, tdb_null, 
783                                         "node is disconnected", state);
784         }
785 }
786
787 /*
788   destroy a daemon_control_state
789  */
790 static int daemon_control_destructor(struct daemon_control_state *state)
791 {
792         if (state->node) {
793                 DLIST_REMOVE(state->node->pending_controls, state);
794         }
795         return 0;
796 }
797
798 /*
799   this is called when the ctdb daemon received a ctdb request control
800   from a local client over the unix domain socket
801  */
802 static void daemon_request_control_from_client(struct ctdb_client *client, 
803                                                struct ctdb_req_control *c)
804 {
805         TDB_DATA data;
806         int res;
807         struct daemon_control_state *state;
808         TALLOC_CTX *tmp_ctx = talloc_new(client);
809
810         if (c->hdr.destnode == CTDB_CURRENT_NODE) {
811                 c->hdr.destnode = client->ctdb->vnn;
812         }
813
814         state = talloc(client, struct daemon_control_state);
815         CTDB_NO_MEMORY_VOID(client->ctdb, state);
816
817         state->client = client;
818         state->c = talloc_steal(state, c);
819         state->reqid = c->hdr.reqid;
820         if (ctdb_validate_vnn(client->ctdb, c->hdr.destnode)) {
821                 state->node = client->ctdb->nodes[c->hdr.destnode];
822                 DLIST_ADD(state->node->pending_controls, state);
823         } else {
824                 state->node = NULL;
825         }
826
827         talloc_set_destructor(state, daemon_control_destructor);
828
829         if (c->flags & CTDB_CTRL_FLAG_NOREPLY) {
830                 talloc_steal(tmp_ctx, state);
831         }
832         
833         data.dptr = &c->data[0];
834         data.dsize = c->datalen;
835         res = ctdb_daemon_send_control(client->ctdb, c->hdr.destnode,
836                                        c->srvid, c->opcode, client->client_id,
837                                        c->flags,
838                                        data, daemon_control_callback,
839                                        state);
840         if (res != 0) {
841                 DEBUG(0,(__location__ " Failed to send control to remote node %u\n",
842                          c->hdr.destnode));
843         }
844
845         talloc_free(tmp_ctx);
846 }
847
848 /*
849   register a call function
850 */
851 int ctdb_daemon_set_call(struct ctdb_context *ctdb, uint32_t db_id,
852                          ctdb_fn_t fn, int id)
853 {
854         struct ctdb_registered_call *call;
855         struct ctdb_db_context *ctdb_db;
856
857         ctdb_db = find_ctdb_db(ctdb, db_id);
858         if (ctdb_db == NULL) {
859                 return -1;
860         }
861
862         call = talloc(ctdb_db, struct ctdb_registered_call);
863         call->fn = fn;
864         call->id = id;
865
866         DLIST_ADD(ctdb_db->calls, call);        
867         return 0;
868 }