got rid of the getdbpath call
[vlendec/samba-autobuild/.git] / ctdb / common / ctdb_daemon.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2006
5
6    This library is free software; you can redistribute it and/or
7    modify it under the terms of the GNU Lesser General Public
8    License as published by the Free Software Foundation; either
9    version 2 of the License, or (at your option) any later version.
10
11    This library is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14    Lesser General Public License for more details.
15
16    You should have received a copy of the GNU Lesser General Public
17    License along with this library; if not, write to the Free Software
18    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
19 */
20
21 #include "includes.h"
22 #include "db_wrap.h"
23 #include "lib/tdb/include/tdb.h"
24 #include "lib/events/events.h"
25 #include "lib/util/dlinklist.h"
26 #include "system/network.h"
27 #include "system/filesys.h"
28 #include "system/wait.h"
29 #include "../include/ctdb.h"
30 #include "../include/ctdb_private.h"
31
32 /*
33   structure describing a connected client in the daemon
34  */
35 struct ctdb_client {
36         struct ctdb_context *ctdb;
37         int fd;
38         struct ctdb_queue *queue;
39 };
40
41
42
43 static void daemon_incoming_packet(void *, uint8_t *, uint32_t );
44
45 static void ctdb_main_loop(struct ctdb_context *ctdb)
46 {
47         int ret = -1;
48
49         if (strcmp(ctdb->transport, "tcp") == 0) {
50                 int ctdb_tcp_init(struct ctdb_context *);
51                 ret = ctdb_tcp_init(ctdb);
52         }
53 #ifdef USE_INFINIBAND
54         if (strcmp(ctdb->transport, "ib") == 0) {
55                 int ctdb_ibw_init(struct ctdb_context *);
56                 ret = ctdb_ibw_init(ctdb);
57         }
58 #endif
59         if (ret != 0) {
60                 DEBUG(0,("Failed to initialise transport '%s'\n", ctdb->transport));
61                 return;
62         }
63
64         /* start the transport running */
65         ctdb->methods->start(ctdb);
66
67         /* go into a wait loop to allow other nodes to complete */
68         event_loop_wait(ctdb->ev);
69
70         DEBUG(0,("event_loop_wait() returned. this should not happen\n"));
71         exit(1);
72 }
73
74
75 static void set_non_blocking(int fd)
76 {
77         unsigned v;
78         v = fcntl(fd, F_GETFL, 0);
79         fcntl(fd, F_SETFL, v | O_NONBLOCK);
80 }
81
82 static void block_signal(int signum)
83 {
84         struct sigaction act;
85
86         memset(&act, 0, sizeof(act));
87
88         act.sa_handler = SIG_IGN;
89         sigemptyset(&act.sa_mask);
90         sigaddset(&act.sa_mask, signum);
91         sigaction(signum, &act, NULL);
92 }
93
94
95 /*
96   send a packet to a client
97  */
98 static int daemon_queue_send(struct ctdb_client *client, struct ctdb_req_header *hdr)
99 {
100         client->ctdb->status.client_packets_sent++;
101         return ctdb_queue_send(client->queue, (uint8_t *)hdr, hdr->length);
102 }
103
104 /*
105   message handler for when we are in daemon mode. This redirects the message
106   to the right client
107  */
108 static void daemon_message_handler(struct ctdb_context *ctdb, uint32_t srvid, 
109                                     TDB_DATA data, void *private_data)
110 {
111         struct ctdb_client *client = talloc_get_type(private_data, struct ctdb_client);
112         struct ctdb_req_message *r;
113         int len;
114
115         /* construct a message to send to the client containing the data */
116         len = offsetof(struct ctdb_req_message, data) + data.dsize;
117         r = ctdbd_allocate_pkt(ctdb, len);
118
119         talloc_set_name_const(r, "req_message packet");
120
121         memset(r, 0, offsetof(struct ctdb_req_message, data));
122
123         r->hdr.length    = len;
124         r->hdr.ctdb_magic = CTDB_MAGIC;
125         r->hdr.ctdb_version = CTDB_VERSION;
126         r->hdr.operation = CTDB_REQ_MESSAGE;
127         r->srvid         = srvid;
128         r->datalen       = data.dsize;
129         memcpy(&r->data[0], data.dptr, data.dsize);
130
131         daemon_queue_send(client, &r->hdr);
132
133         talloc_free(r);
134 }
135                                            
136
137 /*
138   this is called when the ctdb daemon received a ctdb request to 
139   set the srvid from the client
140  */
141 static void daemon_request_register_message_handler(struct ctdb_client *client, 
142                                                     struct ctdb_req_register *c)
143 {
144         int res;
145         res = ctdb_register_message_handler(client->ctdb, client, 
146                                             c->srvid, daemon_message_handler, 
147                                             client);
148         if (res != 0) {
149                 DEBUG(0,(__location__ " Failed to register handler %u in daemon\n", 
150                          c->srvid));
151         } else {
152                 DEBUG(2,(__location__ " Registered message handler for srvid=%u\n", 
153                          c->srvid));
154         }
155 }
156
157
158 /*
159   called when the daemon gets a shutdown request from a client
160  */
161 static void daemon_request_shutdown(struct ctdb_client *client, 
162                                       struct ctdb_req_shutdown *f)
163 {
164         struct ctdb_context *ctdb = talloc_get_type(client->ctdb, struct ctdb_context);
165         int len;
166         uint32_t node;
167
168         /* we dont send to ourself so we can already count one daemon as
169            exiting */
170         ctdb->num_finished++;
171
172
173         /* loop over all nodes of the cluster */
174         for (node=0; node<ctdb->num_nodes;node++) {
175                 struct ctdb_req_finished *rf;
176
177                 /* dont send a message to ourself */
178                 if (ctdb->vnn == node) {
179                         continue;
180                 }
181
182                 len = sizeof(struct ctdb_req_finished);
183                 rf = ctdb->methods->allocate_pkt(ctdb, len);
184                 CTDB_NO_MEMORY_FATAL(ctdb, rf);
185                 talloc_set_name_const(rf, "ctdb_req_finished packet");
186
187                 ZERO_STRUCT(*rf);
188                 rf->hdr.length    = len;
189                 rf->hdr.ctdb_magic = CTDB_MAGIC;
190                 rf->hdr.ctdb_version = CTDB_VERSION;
191                 rf->hdr.operation = CTDB_REQ_FINISHED;
192                 rf->hdr.destnode  = node;
193                 rf->hdr.srcnode   = ctdb->vnn;
194                 rf->hdr.reqid     = 0;
195
196                 ctdb_queue_packet(ctdb, &(rf->hdr));
197
198                 talloc_free(rf);
199         }
200
201         /* wait until all nodes have are prepared to shutdown */
202         while (ctdb->num_finished != ctdb->num_nodes) {
203                 event_loop_once(ctdb->ev);
204         }
205
206         /* all daemons have requested to finish - we now exit */
207         DEBUG(1,("All daemons finished - exiting\n"));
208         _exit(0);
209 }
210
211
212
213 /*
214   called when the daemon gets a connect wait request from a client
215  */
216 static void daemon_request_connect_wait(struct ctdb_client *client, 
217                                         struct ctdb_req_connect_wait *c)
218 {
219         struct ctdb_reply_connect_wait r;
220         int res;
221
222         /* first wait - in the daemon */
223         ctdb_daemon_connect_wait(client->ctdb);
224
225         /* now send the reply */
226         ZERO_STRUCT(r);
227
228         r.hdr.length     = sizeof(r);
229         r.hdr.ctdb_magic = CTDB_MAGIC;
230         r.hdr.ctdb_version = CTDB_VERSION;
231         r.hdr.operation = CTDB_REPLY_CONNECT_WAIT;
232         r.vnn           = ctdb_get_vnn(client->ctdb);
233         r.num_connected = client->ctdb->num_connected;
234         
235         res = daemon_queue_send(client, &r.hdr);
236         if (res != 0) {
237                 DEBUG(0,(__location__ " Failed to queue a connect wait response\n"));
238                 return;
239         }
240 }
241
242
243 /*
244   destroy a ctdb_client
245 */
246 static int ctdb_client_destructor(struct ctdb_client *client)
247 {
248         close(client->fd);
249         client->fd = -1;
250         return 0;
251 }
252
253
254 /*
255   this is called when the ctdb daemon received a ctdb request message
256   from a local client over the unix domain socket
257  */
258 static void daemon_request_message_from_client(struct ctdb_client *client, 
259                                                struct ctdb_req_message *c)
260 {
261         TDB_DATA data;
262         int res;
263
264         /* maybe the message is for another client on this node */
265         if (ctdb_get_vnn(client->ctdb)==c->hdr.destnode) {
266                 ctdb_request_message(client->ctdb, (struct ctdb_req_header *)c);
267                 return;
268         }
269         
270         /* its for a remote node */
271         data.dptr = &c->data[0];
272         data.dsize = c->datalen;
273         res = ctdb_daemon_send_message(client->ctdb, c->hdr.destnode,
274                                        c->srvid, data);
275         if (res != 0) {
276                 DEBUG(0,(__location__ " Failed to send message to remote node %u\n",
277                          c->hdr.destnode));
278         }
279 }
280
281
282 struct daemon_call_state {
283         struct ctdb_client *client;
284         uint32_t reqid;
285         struct ctdb_call *call;
286         struct timeval start_time;
287 };
288
289 /* 
290    complete a call from a client 
291 */
292 static void daemon_call_from_client_callback(struct ctdb_call_state *state)
293 {
294         struct daemon_call_state *dstate = talloc_get_type(state->async.private_data, 
295                                                            struct daemon_call_state);
296         struct ctdb_reply_call *r;
297         int res;
298         uint32_t length;
299         struct ctdb_client *client = dstate->client;
300
301         talloc_steal(client, dstate);
302         talloc_steal(dstate, dstate->call);
303
304         res = ctdb_daemon_call_recv(state, dstate->call);
305         if (res != 0) {
306                 DEBUG(0, (__location__ " ctdbd_call_recv() returned error\n"));
307                 client->ctdb->status.pending_calls--;
308                 ctdb_latency(&client->ctdb->status.max_call_latency, dstate->start_time);
309                 return;
310         }
311
312         length = offsetof(struct ctdb_reply_call, data) + dstate->call->reply_data.dsize;
313         r = ctdbd_allocate_pkt(dstate, length);
314         if (r == NULL) {
315                 DEBUG(0, (__location__ " Failed to allocate reply_call in ctdb daemon\n"));
316                 client->ctdb->status.pending_calls--;
317                 ctdb_latency(&client->ctdb->status.max_call_latency, dstate->start_time);
318                 return;
319         }
320         memset(r, 0, offsetof(struct ctdb_reply_call, data));
321         r->hdr.length       = length;
322         r->hdr.ctdb_magic   = CTDB_MAGIC;
323         r->hdr.ctdb_version = CTDB_VERSION;
324         r->hdr.operation    = CTDB_REPLY_CALL;
325         r->hdr.reqid        = dstate->reqid;
326         r->datalen          = dstate->call->reply_data.dsize;
327         memcpy(&r->data[0], dstate->call->reply_data.dptr, r->datalen);
328
329         res = daemon_queue_send(client, &r->hdr);
330         if (res != 0) {
331                 DEBUG(0, (__location__ "Failed to queue packet from daemon to client\n"));
332         }
333         ctdb_latency(&client->ctdb->status.max_call_latency, dstate->start_time);
334         talloc_free(dstate);
335         client->ctdb->status.pending_calls--;
336 }
337
338
339 /*
340   this is called when the ctdb daemon received a ctdb request call
341   from a local client over the unix domain socket
342  */
343 static void daemon_request_call_from_client(struct ctdb_client *client, 
344                                             struct ctdb_req_call *c)
345 {
346         struct ctdb_call_state *state;
347         struct ctdb_db_context *ctdb_db;
348         struct daemon_call_state *dstate;
349         struct ctdb_call *call;
350         struct ctdb_ltdb_header header;
351         TDB_DATA key, data;
352         int ret;
353         struct ctdb_context *ctdb = client->ctdb;
354
355         ctdb->status.total_calls++;
356         ctdb->status.pending_calls++;
357
358         ctdb_db = find_ctdb_db(client->ctdb, c->db_id);
359         if (!ctdb_db) {
360                 DEBUG(0, (__location__ " Unknown database in request. db_id==0x%08x",
361                           c->db_id));
362                 ctdb->status.pending_calls--;
363                 return;
364         }
365
366         key.dptr = c->data;
367         key.dsize = c->keylen;
368
369         ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header, 
370                                            (struct ctdb_req_header *)c, &data,
371                                            daemon_incoming_packet, client);
372         if (ret == -2) {
373                 /* will retry later */
374                 ctdb->status.pending_calls--;
375                 return;
376         }
377
378         if (ret != 0) {
379                 DEBUG(0,(__location__ " Unable to fetch record\n"));
380                 ctdb->status.pending_calls--;
381                 return;
382         }
383
384         dstate = talloc(client, struct daemon_call_state);
385         if (dstate == NULL) {
386                 ctdb_ltdb_unlock(ctdb_db, key);
387                 DEBUG(0,(__location__ " Unable to allocate dstate\n"));
388                 ctdb->status.pending_calls--;
389                 return;
390         }
391         dstate->start_time = timeval_current();
392         dstate->client = client;
393         dstate->reqid  = c->hdr.reqid;
394         talloc_steal(dstate, data.dptr);
395
396         call = dstate->call = talloc_zero(dstate, struct ctdb_call);
397         if (call == NULL) {
398                 ctdb_ltdb_unlock(ctdb_db, key);
399                 DEBUG(0,(__location__ " Unable to allocate call\n"));
400                 ctdb->status.pending_calls--;
401                 ctdb_latency(&ctdb->status.max_call_latency, dstate->start_time);
402                 return;
403         }
404
405         call->call_id = c->callid;
406         call->key = key;
407         call->call_data.dptr = c->data + c->keylen;
408         call->call_data.dsize = c->calldatalen;
409         call->flags = c->flags;
410
411         if (header.dmaster == ctdb->vnn && !(ctdb->flags & CTDB_FLAG_SELF_CONNECT)) {
412                 state = ctdb_call_local_send(ctdb_db, call, &header, &data);
413         } else {
414                 state = ctdb_daemon_call_send_remote(ctdb_db, call, &header);
415         }
416
417         ctdb_ltdb_unlock(ctdb_db, key);
418
419         if (state == NULL) {
420                 DEBUG(0,(__location__ " Unable to setup call send\n"));
421                 ctdb->status.pending_calls--;
422                 ctdb_latency(&ctdb->status.max_call_latency, dstate->start_time);
423                 return;
424         }
425         talloc_steal(state, dstate);
426         talloc_steal(client, state);
427
428         state->async.fn = daemon_call_from_client_callback;
429         state->async.private_data = dstate;
430 }
431
432
433 static void daemon_request_control_from_client(struct ctdb_client *client, 
434                                                struct ctdb_req_control *c);
435
436 /* data contains a packet from the client */
437 static void daemon_incoming_packet(void *p, uint8_t *data, uint32_t nread)
438 {
439         struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
440         struct ctdb_client *client = talloc_get_type(p, struct ctdb_client);
441         TALLOC_CTX *tmp_ctx;
442         struct ctdb_context *ctdb = client->ctdb;
443
444         /* place the packet as a child of a tmp_ctx. We then use
445            talloc_free() below to free it. If any of the calls want
446            to keep it, then they will steal it somewhere else, and the
447            talloc_free() will be a no-op */
448         tmp_ctx = talloc_new(client);
449         talloc_steal(tmp_ctx, hdr);
450
451         if (hdr->ctdb_magic != CTDB_MAGIC) {
452                 ctdb_set_error(client->ctdb, "Non CTDB packet rejected in daemon\n");
453                 goto done;
454         }
455
456         if (hdr->ctdb_version != CTDB_VERSION) {
457                 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
458                 goto done;
459         }
460
461         switch (hdr->operation) {
462         case CTDB_REQ_CALL:
463                 ctdb->status.client.req_call++;
464                 daemon_request_call_from_client(client, (struct ctdb_req_call *)hdr);
465                 break;
466
467         case CTDB_REQ_REGISTER:
468                 ctdb->status.client.req_register++;
469                 daemon_request_register_message_handler(client, 
470                                                         (struct ctdb_req_register *)hdr);
471                 break;
472
473         case CTDB_REQ_MESSAGE:
474                 ctdb->status.client.req_message++;
475                 daemon_request_message_from_client(client, (struct ctdb_req_message *)hdr);
476                 break;
477
478         case CTDB_REQ_CONNECT_WAIT:
479                 ctdb->status.client.req_connect_wait++;
480                 daemon_request_connect_wait(client, (struct ctdb_req_connect_wait *)hdr);
481                 break;
482
483         case CTDB_REQ_SHUTDOWN:
484                 ctdb->status.client.req_shutdown++;
485                 daemon_request_shutdown(client, (struct ctdb_req_shutdown *)hdr);
486                 break;
487
488         case CTDB_REQ_CONTROL:
489                 ctdb->status.client.req_control++;
490                 daemon_request_control_from_client(client, (struct ctdb_req_control *)hdr);
491                 break;
492
493         default:
494                 DEBUG(0,(__location__ " daemon: unrecognized operation %d\n",
495                          hdr->operation));
496         }
497
498 done:
499         talloc_free(tmp_ctx);
500 }
501
502 /*
503   called when the daemon gets a incoming packet
504  */
505 static void ctdb_daemon_read_cb(uint8_t *data, size_t cnt, void *args)
506 {
507         struct ctdb_client *client = talloc_get_type(args, struct ctdb_client);
508         struct ctdb_req_header *hdr;
509
510         if (cnt == 0) {
511                 talloc_free(client);
512                 return;
513         }
514
515         client->ctdb->status.client_packets_recv++;
516
517         if (cnt < sizeof(*hdr)) {
518                 ctdb_set_error(client->ctdb, "Bad packet length %u in daemon\n", cnt);
519                 return;
520         }
521         hdr = (struct ctdb_req_header *)data;
522         if (cnt != hdr->length) {
523                 ctdb_set_error(client->ctdb, "Bad header length %u expected %u\n in daemon", 
524                                hdr->length, cnt);
525                 return;
526         }
527
528         if (hdr->ctdb_magic != CTDB_MAGIC) {
529                 ctdb_set_error(client->ctdb, "Non CTDB packet rejected\n");
530                 return;
531         }
532
533         if (hdr->ctdb_version != CTDB_VERSION) {
534                 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
535                 return;
536         }
537
538         DEBUG(3,(__location__ " client request %d of type %d length %d from "
539                  "node %d to %d\n", hdr->reqid, hdr->operation, hdr->length,
540                  hdr->srcnode, hdr->destnode));
541
542         /* it is the responsibility of the incoming packet function to free 'data' */
543         daemon_incoming_packet(client, data, cnt);
544 }
545
546 static void ctdb_accept_client(struct event_context *ev, struct fd_event *fde, 
547                          uint16_t flags, void *private_data)
548 {
549         struct sockaddr_in addr;
550         socklen_t len;
551         int fd;
552         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
553         struct ctdb_client *client;
554
555         memset(&addr, 0, sizeof(addr));
556         len = sizeof(addr);
557         fd = accept(ctdb->daemon.sd, (struct sockaddr *)&addr, &len);
558         if (fd == -1) {
559                 return;
560         }
561         set_non_blocking(fd);
562
563         client = talloc_zero(ctdb, struct ctdb_client);
564         client->ctdb = ctdb;
565         client->fd = fd;
566
567         client->queue = ctdb_queue_setup(ctdb, client, fd, CTDB_DS_ALIGNMENT, 
568                                          ctdb_daemon_read_cb, client);
569
570         talloc_set_destructor(client, ctdb_client_destructor);
571 }
572
573
574
575 static void ctdb_read_from_parent(struct event_context *ev, struct fd_event *fde, 
576                          uint16_t flags, void *private_data)
577 {
578         int *fd = private_data;
579         int cnt;
580         char buf;
581
582         /* XXX this is a good place to try doing some cleaning up before exiting */
583         cnt = read(*fd, &buf, 1);
584         if (cnt==0) {
585                 DEBUG(2,(__location__ " parent process exited. filedescriptor dissappeared\n"));
586                 exit(1);
587         } else {
588                 DEBUG(0,(__location__ " ctdb: did not expect data from parent process\n"));
589                 exit(1);
590         }
591 }
592
593
594
595 /*
596   create a unix domain socket and bind it
597   return a file descriptor open on the socket 
598 */
599 static int ux_socket_bind(struct ctdb_context *ctdb)
600 {
601         struct sockaddr_un addr;
602
603         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
604         if (ctdb->daemon.sd == -1) {
605                 ctdb->daemon.sd = -1;
606                 return -1;
607         }
608
609         set_non_blocking(ctdb->daemon.sd);
610
611         memset(&addr, 0, sizeof(addr));
612         addr.sun_family = AF_UNIX;
613         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
614
615         if (bind(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
616                 close(ctdb->daemon.sd);
617                 ctdb->daemon.sd = -1;
618                 return -1;
619         }       
620         listen(ctdb->daemon.sd, 1);
621
622         return 0;
623 }
624
625 /*
626   delete the socket on exit - called on destruction of autofree context
627  */
628 static int unlink_destructor(const char *name)
629 {
630         unlink(name);
631         return 0;
632 }
633
634 /*
635   start the protocol going
636 */
637 int ctdb_start(struct ctdb_context *ctdb)
638 {
639         pid_t pid;
640         static int fd[2];
641         int res;
642         struct fd_event *fde;
643         const char *domain_socket_name;
644
645         /* get rid of any old sockets */
646         unlink(ctdb->daemon.name);
647
648         /* create a unix domain stream socket to listen to */
649         res = ux_socket_bind(ctdb);
650         if (res!=0) {
651                 DEBUG(0,(__location__ " Failed to open CTDB unix domain socket\n"));
652                 exit(10);
653         }
654
655         res = pipe(&fd[0]);
656         if (res) {
657                 DEBUG(0,(__location__ " Failed to open pipe for CTDB\n"));
658                 exit(1);
659         }
660         pid = fork();
661         if (pid==-1) {
662                 DEBUG(0,(__location__ " Failed to fork CTDB daemon\n"));
663                 exit(1);
664         }
665
666         if (pid) {
667                 close(fd[0]);
668                 close(ctdb->daemon.sd);
669                 ctdb->daemon.sd = -1;
670                 return 0;
671         }
672
673         block_signal(SIGPIPE);
674
675         /* ensure the socket is deleted on exit of the daemon */
676         domain_socket_name = talloc_strdup(talloc_autofree_context(), ctdb->daemon.name);
677         talloc_set_destructor(domain_socket_name, unlink_destructor);   
678         
679         close(fd[1]);
680
681         ctdb->ev = event_context_init(NULL);
682         fde = event_add_fd(ctdb->ev, ctdb, fd[0], EVENT_FD_READ, ctdb_read_from_parent, &fd[0]);
683         fde = event_add_fd(ctdb->ev, ctdb, ctdb->daemon.sd, EVENT_FD_READ, ctdb_accept_client, ctdb);
684         ctdb_main_loop(ctdb);
685
686         return 0;
687 }
688
689 /*
690   allocate a packet for use in client<->daemon communication
691  */
692 void *ctdbd_allocate_pkt(TALLOC_CTX *mem_ctx, size_t len)
693 {
694         int size;
695
696         size = (len+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
697         return talloc_size(mem_ctx, size);
698 }
699
700 /*
701   called when a CTDB_REQ_FINISHED packet comes in
702 */
703 void ctdb_request_finished(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
704 {
705         ctdb->num_finished++;
706 }
707
708
709 struct daemon_control_state {
710         struct ctdb_client *client;
711         struct ctdb_req_control *c;
712         uint32_t reqid;
713 };
714
715 /*
716   callback when a control reply comes in
717  */
718 static void daemon_control_callback(struct ctdb_context *ctdb,
719                                     uint32_t status, TDB_DATA data, 
720                                     void *private_data)
721 {
722         struct daemon_control_state *state = talloc_get_type(private_data, 
723                                                              struct daemon_control_state);
724         struct ctdb_client *client = state->client;
725         struct ctdb_reply_control *r;
726         size_t len;
727
728         /* construct a message to send to the client containing the data */
729         len = offsetof(struct ctdb_reply_control, data) + data.dsize;
730         r = ctdbd_allocate_pkt(client, len);
731         talloc_set_name_const(r, "reply_control packet");
732
733         memset(r, 0, offsetof(struct ctdb_reply_control, data));
734
735         r->hdr.length    = len;
736         r->hdr.ctdb_magic = CTDB_MAGIC;
737         r->hdr.ctdb_version = CTDB_VERSION;
738         r->hdr.operation = CTDB_REPLY_CONTROL;
739         r->hdr.reqid     = state->reqid;
740         r->status        = status;
741         r->datalen       = data.dsize;
742         memcpy(&r->data[0], data.dptr, data.dsize);
743
744         daemon_queue_send(client, &r->hdr);
745
746         talloc_free(state);
747 }
748
749 /*
750   this is called when the ctdb daemon received a ctdb request control
751   from a local client over the unix domain socket
752  */
753 static void daemon_request_control_from_client(struct ctdb_client *client, 
754                                                struct ctdb_req_control *c)
755 {
756         TDB_DATA data;
757         int res;
758         struct daemon_control_state *state;
759
760         if (c->hdr.destnode == CTDB_CURRENT_NODE) {
761                 c->hdr.destnode = client->ctdb->vnn;
762         }
763
764         state = talloc(client, struct daemon_control_state);
765         CTDB_NO_MEMORY_VOID(client->ctdb, state);
766
767         state->client = client;
768         state->c = talloc_steal(state, c);
769         state->reqid = c->hdr.reqid;
770         
771         data.dptr = &c->data[0];
772         data.dsize = c->datalen;
773         res = ctdb_daemon_send_control(client->ctdb, c->hdr.destnode,
774                                        c->srvid, c->opcode, data, daemon_control_callback,
775                                        state);
776         if (res != 0) {
777                 DEBUG(0,(__location__ " Failed to send control to remote node %u\n",
778                          c->hdr.destnode));
779         }
780 }