5d619e0a9078d99cd2b0f1485f791a453cfb4998
[vlendec/samba-autobuild/.git] / ctdb / common / ctdb_daemon.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2006
5
6    This library is free software; you can redistribute it and/or
7    modify it under the terms of the GNU Lesser General Public
8    License as published by the Free Software Foundation; either
9    version 2 of the License, or (at your option) any later version.
10
11    This library is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14    Lesser General Public License for more details.
15
16    You should have received a copy of the GNU Lesser General Public
17    License along with this library; if not, write to the Free Software
18    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
19 */
20
21 #include "includes.h"
22 #include "db_wrap.h"
23 #include "lib/tdb/include/tdb.h"
24 #include "lib/events/events.h"
25 #include "lib/util/dlinklist.h"
26 #include "system/network.h"
27 #include "system/filesys.h"
28 #include "system/wait.h"
29 #include "../include/ctdb.h"
30 #include "../include/ctdb_private.h"
31
32 /*
33   structure describing a connected client in the daemon
34  */
35 struct ctdb_client {
36         struct ctdb_context *ctdb;
37         int fd;
38         struct ctdb_queue *queue;
39 };
40
41
42
43 static void daemon_incoming_packet(void *, uint8_t *, uint32_t );
44
45 static void ctdb_main_loop(struct ctdb_context *ctdb)
46 {
47         int ret = -1;
48
49         if (strcmp(ctdb->transport, "tcp") == 0) {
50                 int ctdb_tcp_init(struct ctdb_context *);
51                 ret = ctdb_tcp_init(ctdb);
52         }
53 #ifdef USE_INFINIBAND
54         if (strcmp(ctdb->transport, "ib") == 0) {
55                 int ctdb_ibw_init(struct ctdb_context *);
56                 ret = ctdb_ibw_init(ctdb);
57         }
58 #endif
59         if (ret != 0) {
60                 DEBUG(0,("Failed to initialise transport '%s'\n", ctdb->transport));
61                 return;
62         }
63
64         /* start the transport running */
65         ctdb->methods->start(ctdb);
66
67         /* go into a wait loop to allow other nodes to complete */
68         event_loop_wait(ctdb->ev);
69
70         DEBUG(0,("event_loop_wait() returned. this should not happen\n"));
71         exit(1);
72 }
73
74
75 static void set_non_blocking(int fd)
76 {
77         unsigned v;
78         v = fcntl(fd, F_GETFL, 0);
79         fcntl(fd, F_SETFL, v | O_NONBLOCK);
80 }
81
82 static void block_signal(int signum)
83 {
84         struct sigaction act;
85
86         memset(&act, 0, sizeof(act));
87
88         act.sa_handler = SIG_IGN;
89         sigemptyset(&act.sa_mask);
90         sigaddset(&act.sa_mask, signum);
91         sigaction(signum, &act, NULL);
92 }
93
94
95 /*
96   send a packet to a client
97  */
98 static int daemon_queue_send(struct ctdb_client *client, struct ctdb_req_header *hdr)
99 {
100         client->ctdb->status.client_packets_sent++;
101         return ctdb_queue_send(client->queue, (uint8_t *)hdr, hdr->length);
102 }
103
104 /*
105   message handler for when we are in daemon mode. This redirects the message
106   to the right client
107  */
108 static void daemon_message_handler(struct ctdb_context *ctdb, uint64_t srvid, 
109                                     TDB_DATA data, void *private_data)
110 {
111         struct ctdb_client *client = talloc_get_type(private_data, struct ctdb_client);
112         struct ctdb_req_message *r;
113         int len;
114
115         /* construct a message to send to the client containing the data */
116         len = offsetof(struct ctdb_req_message, data) + data.dsize;
117         r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE, 
118                                len, struct ctdb_req_message);
119         CTDB_NO_MEMORY_VOID(ctdb, r);
120
121         talloc_set_name_const(r, "req_message packet");
122
123         r->srvid         = srvid;
124         r->datalen       = data.dsize;
125         memcpy(&r->data[0], data.dptr, data.dsize);
126
127         daemon_queue_send(client, &r->hdr);
128
129         talloc_free(r);
130 }
131                                            
132
133 /*
134   this is called when the ctdb daemon received a ctdb request to 
135   set the srvid from the client
136  */
137 static void daemon_request_register_message_handler(struct ctdb_client *client, 
138                                                     struct ctdb_req_register *c)
139 {
140         int res;
141         res = ctdb_register_message_handler(client->ctdb, client, 
142                                             c->srvid, daemon_message_handler, 
143                                             client);
144         if (res != 0) {
145                 DEBUG(0,(__location__ " Failed to register handler %llu in daemon\n", 
146                          c->srvid));
147         } else {
148                 DEBUG(2,(__location__ " Registered message handler for srvid=%llu\n", 
149                          c->srvid));
150         }
151 }
152
153
154 /*
155   called when the daemon gets a shutdown request from a client
156  */
157 static void daemon_request_shutdown(struct ctdb_client *client, 
158                                       struct ctdb_req_shutdown *f)
159 {
160         struct ctdb_context *ctdb = talloc_get_type(client->ctdb, struct ctdb_context);
161         int len;
162         uint32_t node;
163
164         /* we dont send to ourself so we can already count one daemon as
165            exiting */
166         ctdb->num_finished++;
167
168
169         /* loop over all nodes of the cluster */
170         for (node=0; node<ctdb->num_nodes;node++) {
171                 struct ctdb_req_finished *rf;
172
173                 /* dont send a message to ourself */
174                 if (ctdb->vnn == node) {
175                         continue;
176                 }
177
178                 len = sizeof(struct ctdb_req_finished);
179                 rf = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_FINISHED, len,
180                                              struct ctdb_req_finished);
181                 CTDB_NO_MEMORY_FATAL(ctdb, rf);
182
183                 rf->hdr.destnode  = node;
184
185                 ctdb_queue_packet(ctdb, &(rf->hdr));
186
187                 talloc_free(rf);
188         }
189
190         /* wait until all nodes have are prepared to shutdown */
191         while (ctdb->num_finished != ctdb->num_nodes) {
192                 event_loop_once(ctdb->ev);
193         }
194
195         /* all daemons have requested to finish - we now exit */
196         DEBUG(1,("All daemons finished - exiting\n"));
197         _exit(0);
198 }
199
200
201
202 /*
203   called when the daemon gets a connect wait request from a client
204  */
205 static void daemon_request_connect_wait(struct ctdb_client *client, 
206                                         struct ctdb_req_connect_wait *c)
207 {
208         struct ctdb_reply_connect_wait *r;
209         int res;
210
211         /* first wait - in the daemon */
212         ctdb_daemon_connect_wait(client->ctdb);
213
214         /* now send the reply */
215         r = ctdbd_allocate_pkt(client->ctdb, client, CTDB_REPLY_CONNECT_WAIT, sizeof(*r), 
216                                struct ctdb_reply_connect_wait);
217         CTDB_NO_MEMORY_VOID(client->ctdb, r);
218         r->vnn           = ctdb_get_vnn(client->ctdb);
219         r->num_connected = client->ctdb->num_connected;
220         
221         res = daemon_queue_send(client, &r->hdr);
222         talloc_free(r);
223         if (res != 0) {
224                 DEBUG(0,(__location__ " Failed to queue a connect wait response\n"));
225                 return;
226         }
227 }
228
229
230 /*
231   destroy a ctdb_client
232 */
233 static int ctdb_client_destructor(struct ctdb_client *client)
234 {
235         client->ctdb->num_clients--;
236         close(client->fd);
237         client->fd = -1;
238         return 0;
239 }
240
241
242 /*
243   this is called when the ctdb daemon received a ctdb request message
244   from a local client over the unix domain socket
245  */
246 static void daemon_request_message_from_client(struct ctdb_client *client, 
247                                                struct ctdb_req_message *c)
248 {
249         TDB_DATA data;
250         int res;
251
252         /* maybe the message is for another client on this node */
253         if (ctdb_get_vnn(client->ctdb)==c->hdr.destnode) {
254                 ctdb_request_message(client->ctdb, (struct ctdb_req_header *)c);
255                 return;
256         }
257
258         /* its for a remote node */
259         data.dptr = &c->data[0];
260         data.dsize = c->datalen;
261         res = ctdb_daemon_send_message(client->ctdb, c->hdr.destnode,
262                                        c->srvid, data);
263         if (res != 0) {
264                 DEBUG(0,(__location__ " Failed to send message to remote node %u\n",
265                          c->hdr.destnode));
266         }
267 }
268
269
270 struct daemon_call_state {
271         struct ctdb_client *client;
272         uint32_t reqid;
273         struct ctdb_call *call;
274         struct timeval start_time;
275 };
276
277 /* 
278    complete a call from a client 
279 */
280 static void daemon_call_from_client_callback(struct ctdb_call_state *state)
281 {
282         struct daemon_call_state *dstate = talloc_get_type(state->async.private_data, 
283                                                            struct daemon_call_state);
284         struct ctdb_reply_call *r;
285         int res;
286         uint32_t length;
287         struct ctdb_client *client = dstate->client;
288
289         talloc_steal(client, dstate);
290         talloc_steal(dstate, dstate->call);
291
292         res = ctdb_daemon_call_recv(state, dstate->call);
293         if (res != 0) {
294                 DEBUG(0, (__location__ " ctdbd_call_recv() returned error\n"));
295                 client->ctdb->status.pending_calls--;
296                 ctdb_latency(&client->ctdb->status.max_call_latency, dstate->start_time);
297                 return;
298         }
299
300         length = offsetof(struct ctdb_reply_call, data) + dstate->call->reply_data.dsize;
301         r = ctdbd_allocate_pkt(client->ctdb, dstate, CTDB_REPLY_CALL, 
302                                length, struct ctdb_reply_call);
303         if (r == NULL) {
304                 DEBUG(0, (__location__ " Failed to allocate reply_call in ctdb daemon\n"));
305                 client->ctdb->status.pending_calls--;
306                 ctdb_latency(&client->ctdb->status.max_call_latency, dstate->start_time);
307                 return;
308         }
309         r->hdr.reqid        = dstate->reqid;
310         r->datalen          = dstate->call->reply_data.dsize;
311         memcpy(&r->data[0], dstate->call->reply_data.dptr, r->datalen);
312
313         res = daemon_queue_send(client, &r->hdr);
314         if (res != 0) {
315                 DEBUG(0, (__location__ "Failed to queue packet from daemon to client\n"));
316         }
317         ctdb_latency(&client->ctdb->status.max_call_latency, dstate->start_time);
318         talloc_free(dstate);
319         client->ctdb->status.pending_calls--;
320 }
321
322
323 /*
324   this is called when the ctdb daemon received a ctdb request call
325   from a local client over the unix domain socket
326  */
327 static void daemon_request_call_from_client(struct ctdb_client *client, 
328                                             struct ctdb_req_call *c)
329 {
330         struct ctdb_call_state *state;
331         struct ctdb_db_context *ctdb_db;
332         struct daemon_call_state *dstate;
333         struct ctdb_call *call;
334         struct ctdb_ltdb_header header;
335         TDB_DATA key, data;
336         int ret;
337         struct ctdb_context *ctdb = client->ctdb;
338
339         ctdb->status.total_calls++;
340         ctdb->status.pending_calls++;
341
342         ctdb_db = find_ctdb_db(client->ctdb, c->db_id);
343         if (!ctdb_db) {
344                 DEBUG(0, (__location__ " Unknown database in request. db_id==0x%08x",
345                           c->db_id));
346                 ctdb->status.pending_calls--;
347                 return;
348         }
349
350         key.dptr = c->data;
351         key.dsize = c->keylen;
352
353         ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header, 
354                                            (struct ctdb_req_header *)c, &data,
355                                            daemon_incoming_packet, client);
356         if (ret == -2) {
357                 /* will retry later */
358                 ctdb->status.pending_calls--;
359                 return;
360         }
361
362         if (ret != 0) {
363                 DEBUG(0,(__location__ " Unable to fetch record\n"));
364                 ctdb->status.pending_calls--;
365                 return;
366         }
367
368         dstate = talloc(client, struct daemon_call_state);
369         if (dstate == NULL) {
370                 ctdb_ltdb_unlock(ctdb_db, key);
371                 DEBUG(0,(__location__ " Unable to allocate dstate\n"));
372                 ctdb->status.pending_calls--;
373                 return;
374         }
375         dstate->start_time = timeval_current();
376         dstate->client = client;
377         dstate->reqid  = c->hdr.reqid;
378         talloc_steal(dstate, data.dptr);
379
380         call = dstate->call = talloc_zero(dstate, struct ctdb_call);
381         if (call == NULL) {
382                 ctdb_ltdb_unlock(ctdb_db, key);
383                 DEBUG(0,(__location__ " Unable to allocate call\n"));
384                 ctdb->status.pending_calls--;
385                 ctdb_latency(&ctdb->status.max_call_latency, dstate->start_time);
386                 return;
387         }
388
389         call->call_id = c->callid;
390         call->key = key;
391         call->call_data.dptr = c->data + c->keylen;
392         call->call_data.dsize = c->calldatalen;
393         call->flags = c->flags;
394
395         if (header.dmaster == ctdb->vnn && !(ctdb->flags & CTDB_FLAG_SELF_CONNECT)) {
396                 state = ctdb_call_local_send(ctdb_db, call, &header, &data);
397         } else {
398                 state = ctdb_daemon_call_send_remote(ctdb_db, call, &header);
399         }
400
401         ctdb_ltdb_unlock(ctdb_db, key);
402
403         if (state == NULL) {
404                 DEBUG(0,(__location__ " Unable to setup call send\n"));
405                 ctdb->status.pending_calls--;
406                 ctdb_latency(&ctdb->status.max_call_latency, dstate->start_time);
407                 return;
408         }
409         talloc_steal(state, dstate);
410         talloc_steal(client, state);
411
412         state->async.fn = daemon_call_from_client_callback;
413         state->async.private_data = dstate;
414 }
415
416
417 static void daemon_request_control_from_client(struct ctdb_client *client, 
418                                                struct ctdb_req_control *c);
419
420 /* data contains a packet from the client */
421 static void daemon_incoming_packet(void *p, uint8_t *data, uint32_t nread)
422 {
423         struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
424         struct ctdb_client *client = talloc_get_type(p, struct ctdb_client);
425         TALLOC_CTX *tmp_ctx;
426         struct ctdb_context *ctdb = client->ctdb;
427
428         /* place the packet as a child of a tmp_ctx. We then use
429            talloc_free() below to free it. If any of the calls want
430            to keep it, then they will steal it somewhere else, and the
431            talloc_free() will be a no-op */
432         tmp_ctx = talloc_new(client);
433         talloc_steal(tmp_ctx, hdr);
434
435         if (hdr->ctdb_magic != CTDB_MAGIC) {
436                 ctdb_set_error(client->ctdb, "Non CTDB packet rejected in daemon\n");
437                 goto done;
438         }
439
440         if (hdr->ctdb_version != CTDB_VERSION) {
441                 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
442                 goto done;
443         }
444
445         switch (hdr->operation) {
446         case CTDB_REQ_CALL:
447                 ctdb->status.client.req_call++;
448                 daemon_request_call_from_client(client, (struct ctdb_req_call *)hdr);
449                 break;
450
451         case CTDB_REQ_REGISTER:
452                 ctdb->status.client.req_register++;
453                 daemon_request_register_message_handler(client, 
454                                                         (struct ctdb_req_register *)hdr);
455                 break;
456
457         case CTDB_REQ_MESSAGE:
458                 ctdb->status.client.req_message++;
459                 daemon_request_message_from_client(client, (struct ctdb_req_message *)hdr);
460                 break;
461
462         case CTDB_REQ_CONNECT_WAIT:
463                 ctdb->status.client.req_connect_wait++;
464                 daemon_request_connect_wait(client, (struct ctdb_req_connect_wait *)hdr);
465                 break;
466
467         case CTDB_REQ_SHUTDOWN:
468                 ctdb->status.client.req_shutdown++;
469                 daemon_request_shutdown(client, (struct ctdb_req_shutdown *)hdr);
470                 break;
471
472         case CTDB_REQ_CONTROL:
473                 ctdb->status.client.req_control++;
474                 daemon_request_control_from_client(client, (struct ctdb_req_control *)hdr);
475                 break;
476
477         default:
478                 DEBUG(0,(__location__ " daemon: unrecognized operation %d\n",
479                          hdr->operation));
480         }
481
482 done:
483         talloc_free(tmp_ctx);
484 }
485
486 /*
487   called when the daemon gets a incoming packet
488  */
489 static void ctdb_daemon_read_cb(uint8_t *data, size_t cnt, void *args)
490 {
491         struct ctdb_client *client = talloc_get_type(args, struct ctdb_client);
492         struct ctdb_req_header *hdr;
493
494         if (cnt == 0) {
495                 talloc_free(client);
496                 return;
497         }
498
499         client->ctdb->status.client_packets_recv++;
500
501         if (cnt < sizeof(*hdr)) {
502                 ctdb_set_error(client->ctdb, "Bad packet length %u in daemon\n", 
503                                (unsigned)cnt);
504                 return;
505         }
506         hdr = (struct ctdb_req_header *)data;
507         if (cnt != hdr->length) {
508                 ctdb_set_error(client->ctdb, "Bad header length %u expected %u\n in daemon", 
509                                (unsigned)hdr->length, (unsigned)cnt);
510                 return;
511         }
512
513         if (hdr->ctdb_magic != CTDB_MAGIC) {
514                 ctdb_set_error(client->ctdb, "Non CTDB packet rejected\n");
515                 return;
516         }
517
518         if (hdr->ctdb_version != CTDB_VERSION) {
519                 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
520                 return;
521         }
522
523         DEBUG(3,(__location__ " client request %d of type %d length %d from "
524                  "node %d to %d\n", hdr->reqid, hdr->operation, hdr->length,
525                  hdr->srcnode, hdr->destnode));
526
527         /* it is the responsibility of the incoming packet function to free 'data' */
528         daemon_incoming_packet(client, data, cnt);
529 }
530
531 static void ctdb_accept_client(struct event_context *ev, struct fd_event *fde, 
532                          uint16_t flags, void *private_data)
533 {
534         struct sockaddr_in addr;
535         socklen_t len;
536         int fd;
537         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
538         struct ctdb_client *client;
539
540         memset(&addr, 0, sizeof(addr));
541         len = sizeof(addr);
542         fd = accept(ctdb->daemon.sd, (struct sockaddr *)&addr, &len);
543         if (fd == -1) {
544                 return;
545         }
546         set_non_blocking(fd);
547
548         client = talloc_zero(ctdb, struct ctdb_client);
549         client->ctdb = ctdb;
550         client->fd = fd;
551         ctdb->num_clients++;
552
553         client->queue = ctdb_queue_setup(ctdb, client, fd, CTDB_DS_ALIGNMENT, 
554                                          ctdb_daemon_read_cb, client);
555
556         talloc_set_destructor(client, ctdb_client_destructor);
557 }
558
559
560
561 static void ctdb_read_from_parent(struct event_context *ev, struct fd_event *fde, 
562                          uint16_t flags, void *private_data)
563 {
564         int *fd = private_data;
565         int cnt;
566         char buf;
567
568         /* XXX this is a good place to try doing some cleaning up before exiting */
569         cnt = read(*fd, &buf, 1);
570         if (cnt==0) {
571                 DEBUG(2,(__location__ " parent process exited. filedescriptor dissappeared\n"));
572                 exit(1);
573         } else {
574                 DEBUG(0,(__location__ " ctdb: did not expect data from parent process\n"));
575                 exit(1);
576         }
577 }
578
579
580
581 /*
582   create a unix domain socket and bind it
583   return a file descriptor open on the socket 
584 */
585 static int ux_socket_bind(struct ctdb_context *ctdb)
586 {
587         struct sockaddr_un addr;
588
589         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
590         if (ctdb->daemon.sd == -1) {
591                 ctdb->daemon.sd = -1;
592                 return -1;
593         }
594
595         set_non_blocking(ctdb->daemon.sd);
596
597         memset(&addr, 0, sizeof(addr));
598         addr.sun_family = AF_UNIX;
599         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
600
601         if (bind(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
602                 close(ctdb->daemon.sd);
603                 ctdb->daemon.sd = -1;
604                 return -1;
605         }       
606         listen(ctdb->daemon.sd, 1);
607
608         return 0;
609 }
610
611 /*
612   delete the socket on exit - called on destruction of autofree context
613  */
614 static int unlink_destructor(const char *name)
615 {
616         unlink(name);
617         return 0;
618 }
619
620 /*
621   start the protocol going
622 */
623 int ctdb_start(struct ctdb_context *ctdb)
624 {
625         pid_t pid;
626         static int fd[2];
627         int res;
628         struct fd_event *fde;
629         const char *domain_socket_name;
630
631         /* get rid of any old sockets */
632         unlink(ctdb->daemon.name);
633
634         /* create a unix domain stream socket to listen to */
635         res = ux_socket_bind(ctdb);
636         if (res!=0) {
637                 DEBUG(0,(__location__ " Failed to open CTDB unix domain socket\n"));
638                 exit(10);
639         }
640
641         res = pipe(&fd[0]);
642         if (res) {
643                 DEBUG(0,(__location__ " Failed to open pipe for CTDB\n"));
644                 exit(1);
645         }
646         pid = fork();
647         if (pid==-1) {
648                 DEBUG(0,(__location__ " Failed to fork CTDB daemon\n"));
649                 exit(1);
650         }
651
652         if (pid) {
653                 close(fd[0]);
654                 close(ctdb->daemon.sd);
655                 ctdb->daemon.sd = -1;
656                 ctdb_ctrl_get_config(ctdb);
657                 return 0;
658         }
659
660         block_signal(SIGPIPE);
661
662         /* ensure the socket is deleted on exit of the daemon */
663         domain_socket_name = talloc_strdup(talloc_autofree_context(), ctdb->daemon.name);
664         talloc_set_destructor(domain_socket_name, unlink_destructor);   
665         
666         close(fd[1]);
667
668
669         ctdb->ev = event_context_init(NULL);
670         fde = event_add_fd(ctdb->ev, ctdb, fd[0], EVENT_FD_READ, ctdb_read_from_parent, &fd[0]);
671         fde = event_add_fd(ctdb->ev, ctdb, ctdb->daemon.sd, EVENT_FD_READ, ctdb_accept_client, ctdb);
672         ctdb_main_loop(ctdb);
673
674         return 0;
675 }
676
677
678 /*
679   start the protocol going as a daemon
680 */
681 int ctdb_start_daemon(struct ctdb_context *ctdb)
682 {
683         int res;
684         struct fd_event *fde;
685         const char *domain_socket_name;
686
687         /* get rid of any old sockets */
688         unlink(ctdb->daemon.name);
689
690         /* create a unix domain stream socket to listen to */
691         res = ux_socket_bind(ctdb);
692         if (res!=0) {
693                 DEBUG(0,(__location__ " Failed to open CTDB unix domain socket\n"));
694                 exit(10);
695         }
696
697         if (fork()) {
698                 return 0;
699         }
700
701         tdb_reopen_all(False);
702
703         setsid();
704         block_signal(SIGPIPE);
705         block_signal(SIGCHLD);
706
707         /* ensure the socket is deleted on exit of the daemon */
708         domain_socket_name = talloc_strdup(talloc_autofree_context(), ctdb->daemon.name);
709         talloc_set_destructor(domain_socket_name, unlink_destructor);   
710
711         ctdb->ev = event_context_init(NULL);
712         fde = event_add_fd(ctdb->ev, ctdb, ctdb->daemon.sd, EVENT_FD_READ, 
713                            ctdb_accept_client, ctdb);
714         ctdb_main_loop(ctdb);
715
716         return 0;
717 }
718
719 /*
720   allocate a packet for use in client<->daemon communication
721  */
722 struct ctdb_req_header *_ctdbd_allocate_pkt(struct ctdb_context *ctdb,
723                                             TALLOC_CTX *mem_ctx, 
724                                             enum ctdb_operation operation, 
725                                             size_t length, size_t slength,
726                                             const char *type)
727 {
728         int size;
729         struct ctdb_req_header *hdr;
730         size = ((length+1)+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
731
732         hdr = (struct ctdb_req_header *)talloc_size(mem_ctx, size);
733         if (hdr == NULL) {
734                 DEBUG(0,("Unable to allocate packet for operation %u of length %u\n",
735                          operation, length));
736                 return NULL;
737         }
738         talloc_set_name_const(hdr, type);
739         memset(hdr, 0, size);
740         hdr->operation    = operation;
741         hdr->length       = size;
742         hdr->ctdb_magic   = CTDB_MAGIC;
743         hdr->ctdb_version = CTDB_VERSION;
744         hdr->srcnode      = ctdb->vnn;
745         if (ctdb->vnn_map) {
746                 hdr->generation = ctdb->vnn_map->generation;
747         }
748
749         return hdr;
750 }
751
752
753 /*
754   allocate a packet for use in daemon<->daemon communication
755  */
756 struct ctdb_req_header *_ctdb_transport_allocate(struct ctdb_context *ctdb,
757                                                  TALLOC_CTX *mem_ctx, 
758                                                  enum ctdb_operation operation, 
759                                                  size_t length, size_t slength,
760                                                  const char *type)
761 {
762         int size;
763         struct ctdb_req_header *hdr;
764         size = ((length+1)+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
765         hdr = (struct ctdb_req_header *)ctdb->methods->allocate_pkt(mem_ctx, size);
766         if (hdr == NULL) {
767                 DEBUG(0,("Unable to allocate transport packet for operation %u of length %u\n",
768                          operation, length));
769                 return NULL;
770         }
771         talloc_set_name_const(hdr, type);
772         memset(hdr, 0, size);
773         hdr->operation    = operation;
774         hdr->length       = size;
775         hdr->ctdb_magic   = CTDB_MAGIC;
776         hdr->ctdb_version = CTDB_VERSION;
777         hdr->generation   = ctdb->vnn_map->generation;
778         hdr->srcnode      = ctdb->vnn;
779
780         return hdr;     
781 }
782
783 /*
784   called when a CTDB_REQ_FINISHED packet comes in
785 */
786 void ctdb_request_finished(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
787 {
788         ctdb->num_finished++;
789 }
790
791
792 struct daemon_control_state {
793         struct ctdb_client *client;
794         struct ctdb_req_control *c;
795         uint32_t reqid;
796 };
797
798 /*
799   callback when a control reply comes in
800  */
801 static void daemon_control_callback(struct ctdb_context *ctdb,
802                                     uint32_t status, TDB_DATA data, 
803                                     void *private_data)
804 {
805         struct daemon_control_state *state = talloc_get_type(private_data, 
806                                                              struct daemon_control_state);
807         struct ctdb_client *client = state->client;
808         struct ctdb_reply_control *r;
809         size_t len;
810
811         /* construct a message to send to the client containing the data */
812         len = offsetof(struct ctdb_reply_control, data) + data.dsize;
813         r = ctdbd_allocate_pkt(ctdb, state, CTDB_REPLY_CONTROL, len, 
814                                struct ctdb_reply_control);
815         CTDB_NO_MEMORY_VOID(ctdb, r);
816
817         r->hdr.reqid     = state->reqid;
818         r->status        = status;
819         r->datalen       = data.dsize;
820         memcpy(&r->data[0], data.dptr, data.dsize);
821
822         daemon_queue_send(client, &r->hdr);
823
824         talloc_free(state);
825 }
826
827 /*
828   this is called when the ctdb daemon received a ctdb request control
829   from a local client over the unix domain socket
830  */
831 static void daemon_request_control_from_client(struct ctdb_client *client, 
832                                                struct ctdb_req_control *c)
833 {
834         TDB_DATA data;
835         int res;
836         struct daemon_control_state *state;
837
838         if (c->hdr.destnode == CTDB_CURRENT_NODE) {
839                 c->hdr.destnode = client->ctdb->vnn;
840         }
841
842         state = talloc(client, struct daemon_control_state);
843         CTDB_NO_MEMORY_VOID(client->ctdb, state);
844
845         state->client = client;
846         state->c = talloc_steal(state, c);
847         state->reqid = c->hdr.reqid;
848         
849         data.dptr = &c->data[0];
850         data.dsize = c->datalen;
851         res = ctdb_daemon_send_control(client->ctdb, c->hdr.destnode,
852                                        c->srvid, c->opcode, c->flags,
853                                        data, daemon_control_callback,
854                                        state);
855         if (res != 0) {
856                 DEBUG(0,(__location__ " Failed to send control to remote node %u\n",
857                          c->hdr.destnode));
858         }
859 }
860
861 /*
862   register a call function
863 */
864 int ctdb_daemon_set_call(struct ctdb_context *ctdb, uint32_t db_id,
865                          ctdb_fn_t fn, int id)
866 {
867         struct ctdb_registered_call *call;
868         struct ctdb_db_context *ctdb_db;
869
870         ctdb_db = find_ctdb_db(ctdb, db_id);
871         if (ctdb_db == NULL) {
872                 return -1;
873         }
874
875         call = talloc(ctdb_db, struct ctdb_registered_call);
876         call->fn = fn;
877         call->id = id;
878
879         DLIST_ADD(ctdb_db->calls, call);        
880         return 0;
881 }