Use single copy of tdb in both samba3 and samba4.
[samba.git] / source4 / cluster / ctdb / server / ctdb_daemon.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2006
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "../tdb/include/tdb.h"
22 #include "lib/events/events.h"
23 #include "lib/util/dlinklist.h"
24 #include "system/network.h"
25 #include "system/filesys.h"
26 #include "system/wait.h"
27 #include "../include/ctdb.h"
28 #include "../include/ctdb_private.h"
29
30 static void daemon_incoming_packet(void *, struct ctdb_req_header *);
31
32 /*
33   handler for when a node changes its flags
34 */
35 static void flag_change_handler(struct ctdb_context *ctdb, uint64_t srvid, 
36                                 TDB_DATA data, void *private_data)
37 {
38         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
39
40         if (data.dsize != sizeof(*c) || !ctdb_validate_vnn(ctdb, c->vnn)) {
41                 DEBUG(0,(__location__ "Invalid data in ctdb_node_flag_change\n"));
42                 return;
43         }
44
45         if (!ctdb_validate_vnn(ctdb, c->vnn)) {
46                 DEBUG(0,("Bad vnn %u in flag_change_handler\n", c->vnn));
47                 return;
48         }
49
50         /* don't get the disconnected flag from the other node */
51         ctdb->nodes[c->vnn]->flags = 
52                 (ctdb->nodes[c->vnn]->flags&NODE_FLAGS_DISCONNECTED) 
53                 | (c->flags & ~NODE_FLAGS_DISCONNECTED);        
54         DEBUG(2,("Node flags for node %u are now 0x%x\n", c->vnn, ctdb->nodes[c->vnn]->flags));
55
56         /* make sure we don't hold any IPs when we shouldn't */
57         if (c->vnn == ctdb->vnn &&
58             (ctdb->nodes[c->vnn]->flags & (NODE_FLAGS_INACTIVE|NODE_FLAGS_BANNED))) {
59                 ctdb_release_all_ips(ctdb);
60         }
61 }
62
63 /* called when the "startup" event script has finished */
64 static void ctdb_start_transport(struct ctdb_context *ctdb, int status, void *p)
65 {
66         if (status != 0) {
67                 DEBUG(0,("startup event failed!\n"));
68                 ctdb_fatal(ctdb, "startup event script failed");                
69         }
70
71         /* start the transport running */
72         if (ctdb->methods->start(ctdb) != 0) {
73                 DEBUG(0,("transport failed to start!\n"));
74                 ctdb_fatal(ctdb, "transport failed to start");
75         }
76
77         /* start the recovery daemon process */
78         if (ctdb_start_recoverd(ctdb) != 0) {
79                 DEBUG(0,("Failed to start recovery daemon\n"));
80                 exit(11);
81         }
82
83         /* a handler for when nodes are disabled/enabled */
84         ctdb_register_message_handler(ctdb, ctdb, CTDB_SRVID_NODE_FLAGS_CHANGED, 
85                                       flag_change_handler, NULL);
86
87         /* start monitoring for dead nodes */
88         ctdb_start_monitoring(ctdb);
89 }
90
91 /* go into main ctdb loop */
92 static void ctdb_main_loop(struct ctdb_context *ctdb)
93 {
94         int ret = -1;
95
96         if (strcmp(ctdb->transport, "tcp") == 0) {
97                 int ctdb_tcp_init(struct ctdb_context *);
98                 ret = ctdb_tcp_init(ctdb);
99         }
100 #ifdef USE_INFINIBAND
101         if (strcmp(ctdb->transport, "ib") == 0) {
102                 int ctdb_ibw_init(struct ctdb_context *);
103                 ret = ctdb_ibw_init(ctdb);
104         }
105 #endif
106         if (ret != 0) {
107                 DEBUG(0,("Failed to initialise transport '%s'\n", ctdb->transport));
108                 return;
109         }
110
111         /* initialise the transport  */
112         if (ctdb->methods->initialise(ctdb) != 0) {
113                 DEBUG(0,("transport failed to initialise!\n"));
114                 ctdb_fatal(ctdb, "transport failed to initialise");
115         }
116
117         /* tell all other nodes we've just started up */
118         ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL,
119                                  0, CTDB_CONTROL_STARTUP, 0,
120                                  CTDB_CTRL_FLAG_NOREPLY,
121                                  tdb_null, NULL, NULL);
122
123         /* release any IPs we hold from previous runs of the daemon */
124         ctdb_release_all_ips(ctdb);
125
126         ret = ctdb_event_script_callback(ctdb, timeval_zero(), ctdb, 
127                                          ctdb_start_transport, NULL, "startup");
128         if (ret != 0) {
129                 DEBUG(0,("Failed startup event script\n"));
130                 return;
131         }
132
133         /* go into a wait loop to allow other nodes to complete */
134         event_loop_wait(ctdb->ev);
135
136         DEBUG(0,("event_loop_wait() returned. this should not happen\n"));
137         exit(1);
138 }
139
140
141 static void block_signal(int signum)
142 {
143         struct sigaction act;
144
145         memset(&act, 0, sizeof(act));
146
147         act.sa_handler = SIG_IGN;
148         sigemptyset(&act.sa_mask);
149         sigaddset(&act.sa_mask, signum);
150         sigaction(signum, &act, NULL);
151 }
152
153
154 /*
155   send a packet to a client
156  */
157 static int daemon_queue_send(struct ctdb_client *client, struct ctdb_req_header *hdr)
158 {
159         client->ctdb->statistics.client_packets_sent++;
160         return ctdb_queue_send(client->queue, (uint8_t *)hdr, hdr->length);
161 }
162
163 /*
164   message handler for when we are in daemon mode. This redirects the message
165   to the right client
166  */
167 static void daemon_message_handler(struct ctdb_context *ctdb, uint64_t srvid, 
168                                     TDB_DATA data, void *private_data)
169 {
170         struct ctdb_client *client = talloc_get_type(private_data, struct ctdb_client);
171         struct ctdb_req_message *r;
172         int len;
173
174         /* construct a message to send to the client containing the data */
175         len = offsetof(struct ctdb_req_message, data) + data.dsize;
176         r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE, 
177                                len, struct ctdb_req_message);
178         CTDB_NO_MEMORY_VOID(ctdb, r);
179
180         talloc_set_name_const(r, "req_message packet");
181
182         r->srvid         = srvid;
183         r->datalen       = data.dsize;
184         memcpy(&r->data[0], data.dptr, data.dsize);
185
186         daemon_queue_send(client, &r->hdr);
187
188         talloc_free(r);
189 }
190                                            
191
192 /*
193   this is called when the ctdb daemon received a ctdb request to 
194   set the srvid from the client
195  */
196 int daemon_register_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
197 {
198         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
199         int res;
200         if (client == NULL) {
201                 DEBUG(0,("Bad client_id in daemon_request_register_message_handler\n"));
202                 return -1;
203         }
204         res = ctdb_register_message_handler(ctdb, client, srvid, daemon_message_handler, client);
205         if (res != 0) {
206                 DEBUG(0,(__location__ " Failed to register handler %llu in daemon\n", 
207                          (unsigned long long)srvid));
208         } else {
209                 DEBUG(2,(__location__ " Registered message handler for srvid=%llu\n", 
210                          (unsigned long long)srvid));
211         }
212
213         /* this is a hack for Samba - we now know the pid of the Samba client */
214         if ((srvid & 0xFFFFFFFF) == srvid &&
215             kill(srvid, 0) == 0) {
216                 client->pid = srvid;
217                 DEBUG(0,(__location__ " Registered PID %u for client %u\n",
218                          (unsigned)client->pid, client_id));
219         }
220         return res;
221 }
222
223 /*
224   this is called when the ctdb daemon received a ctdb request to 
225   remove a srvid from the client
226  */
227 int daemon_deregister_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
228 {
229         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
230         if (client == NULL) {
231                 DEBUG(0,("Bad client_id in daemon_request_deregister_message_handler\n"));
232                 return -1;
233         }
234         return ctdb_deregister_message_handler(ctdb, srvid, client);
235 }
236
237
238 /*
239   destroy a ctdb_client
240 */
241 static int ctdb_client_destructor(struct ctdb_client *client)
242 {
243         ctdb_takeover_client_destructor_hook(client);
244         ctdb_reqid_remove(client->ctdb, client->client_id);
245         client->ctdb->statistics.num_clients--;
246         return 0;
247 }
248
249
250 /*
251   this is called when the ctdb daemon received a ctdb request message
252   from a local client over the unix domain socket
253  */
254 static void daemon_request_message_from_client(struct ctdb_client *client, 
255                                                struct ctdb_req_message *c)
256 {
257         TDB_DATA data;
258         int res;
259
260         /* maybe the message is for another client on this node */
261         if (ctdb_get_vnn(client->ctdb)==c->hdr.destnode) {
262                 ctdb_request_message(client->ctdb, (struct ctdb_req_header *)c);
263                 return;
264         }
265
266         /* its for a remote node */
267         data.dptr = &c->data[0];
268         data.dsize = c->datalen;
269         res = ctdb_daemon_send_message(client->ctdb, c->hdr.destnode,
270                                        c->srvid, data);
271         if (res != 0) {
272                 DEBUG(0,(__location__ " Failed to send message to remote node %u\n",
273                          c->hdr.destnode));
274         }
275 }
276
277
278 struct daemon_call_state {
279         struct ctdb_client *client;
280         uint32_t reqid;
281         struct ctdb_call *call;
282         struct timeval start_time;
283 };
284
285 /* 
286    complete a call from a client 
287 */
288 static void daemon_call_from_client_callback(struct ctdb_call_state *state)
289 {
290         struct daemon_call_state *dstate = talloc_get_type(state->async.private_data, 
291                                                            struct daemon_call_state);
292         struct ctdb_reply_call *r;
293         int res;
294         uint32_t length;
295         struct ctdb_client *client = dstate->client;
296
297         talloc_steal(client, dstate);
298         talloc_steal(dstate, dstate->call);
299
300         res = ctdb_daemon_call_recv(state, dstate->call);
301         if (res != 0) {
302                 DEBUG(0, (__location__ " ctdbd_call_recv() returned error\n"));
303                 client->ctdb->statistics.pending_calls--;
304                 ctdb_latency(&client->ctdb->statistics.max_call_latency, dstate->start_time);
305                 return;
306         }
307
308         length = offsetof(struct ctdb_reply_call, data) + dstate->call->reply_data.dsize;
309         r = ctdbd_allocate_pkt(client->ctdb, dstate, CTDB_REPLY_CALL, 
310                                length, struct ctdb_reply_call);
311         if (r == NULL) {
312                 DEBUG(0, (__location__ " Failed to allocate reply_call in ctdb daemon\n"));
313                 client->ctdb->statistics.pending_calls--;
314                 ctdb_latency(&client->ctdb->statistics.max_call_latency, dstate->start_time);
315                 return;
316         }
317         r->hdr.reqid        = dstate->reqid;
318         r->datalen          = dstate->call->reply_data.dsize;
319         memcpy(&r->data[0], dstate->call->reply_data.dptr, r->datalen);
320
321         res = daemon_queue_send(client, &r->hdr);
322         if (res != 0) {
323                 DEBUG(0, (__location__ " Failed to queue packet from daemon to client\n"));
324         }
325         ctdb_latency(&client->ctdb->statistics.max_call_latency, dstate->start_time);
326         talloc_free(dstate);
327         client->ctdb->statistics.pending_calls--;
328 }
329
330
331 static void daemon_request_call_from_client(struct ctdb_client *client, 
332                                             struct ctdb_req_call *c);
333
334 /*
335   this is called when the ctdb daemon received a ctdb request call
336   from a local client over the unix domain socket
337  */
338 static void daemon_request_call_from_client(struct ctdb_client *client, 
339                                             struct ctdb_req_call *c)
340 {
341         struct ctdb_call_state *state;
342         struct ctdb_db_context *ctdb_db;
343         struct daemon_call_state *dstate;
344         struct ctdb_call *call;
345         struct ctdb_ltdb_header header;
346         TDB_DATA key, data;
347         int ret;
348         struct ctdb_context *ctdb = client->ctdb;
349
350         ctdb->statistics.total_calls++;
351         ctdb->statistics.pending_calls++;
352
353         ctdb_db = find_ctdb_db(client->ctdb, c->db_id);
354         if (!ctdb_db) {
355                 DEBUG(0, (__location__ " Unknown database in request. db_id==0x%08x",
356                           c->db_id));
357                 ctdb->statistics.pending_calls--;
358                 return;
359         }
360
361         key.dptr = c->data;
362         key.dsize = c->keylen;
363
364         ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header, 
365                                            (struct ctdb_req_header *)c, &data,
366                                            daemon_incoming_packet, client, True);
367         if (ret == -2) {
368                 /* will retry later */
369                 ctdb->statistics.pending_calls--;
370                 return;
371         }
372
373         if (ret != 0) {
374                 DEBUG(0,(__location__ " Unable to fetch record\n"));
375                 ctdb->statistics.pending_calls--;
376                 return;
377         }
378
379         dstate = talloc(client, struct daemon_call_state);
380         if (dstate == NULL) {
381                 ctdb_ltdb_unlock(ctdb_db, key);
382                 DEBUG(0,(__location__ " Unable to allocate dstate\n"));
383                 ctdb->statistics.pending_calls--;
384                 return;
385         }
386         dstate->start_time = timeval_current();
387         dstate->client = client;
388         dstate->reqid  = c->hdr.reqid;
389         talloc_steal(dstate, data.dptr);
390
391         call = dstate->call = talloc_zero(dstate, struct ctdb_call);
392         if (call == NULL) {
393                 ctdb_ltdb_unlock(ctdb_db, key);
394                 DEBUG(0,(__location__ " Unable to allocate call\n"));
395                 ctdb->statistics.pending_calls--;
396                 ctdb_latency(&ctdb->statistics.max_call_latency, dstate->start_time);
397                 return;
398         }
399
400         call->call_id = c->callid;
401         call->key = key;
402         call->call_data.dptr = c->data + c->keylen;
403         call->call_data.dsize = c->calldatalen;
404         call->flags = c->flags;
405
406         if (header.dmaster == ctdb->vnn) {
407                 state = ctdb_call_local_send(ctdb_db, call, &header, &data);
408         } else {
409                 state = ctdb_daemon_call_send_remote(ctdb_db, call, &header);
410         }
411
412         ctdb_ltdb_unlock(ctdb_db, key);
413
414         if (state == NULL) {
415                 DEBUG(0,(__location__ " Unable to setup call send\n"));
416                 ctdb->statistics.pending_calls--;
417                 ctdb_latency(&ctdb->statistics.max_call_latency, dstate->start_time);
418                 return;
419         }
420         talloc_steal(state, dstate);
421         talloc_steal(client, state);
422
423         state->async.fn = daemon_call_from_client_callback;
424         state->async.private_data = dstate;
425 }
426
427
428 static void daemon_request_control_from_client(struct ctdb_client *client, 
429                                                struct ctdb_req_control *c);
430
431 /* data contains a packet from the client */
432 static void daemon_incoming_packet(void *p, struct ctdb_req_header *hdr)
433 {
434         struct ctdb_client *client = talloc_get_type(p, struct ctdb_client);
435         TALLOC_CTX *tmp_ctx;
436         struct ctdb_context *ctdb = client->ctdb;
437
438         /* place the packet as a child of a tmp_ctx. We then use
439            talloc_free() below to free it. If any of the calls want
440            to keep it, then they will steal it somewhere else, and the
441            talloc_free() will be a no-op */
442         tmp_ctx = talloc_new(client);
443         talloc_steal(tmp_ctx, hdr);
444
445         if (hdr->ctdb_magic != CTDB_MAGIC) {
446                 ctdb_set_error(client->ctdb, "Non CTDB packet rejected in daemon\n");
447                 goto done;
448         }
449
450         if (hdr->ctdb_version != CTDB_VERSION) {
451                 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
452                 goto done;
453         }
454
455         switch (hdr->operation) {
456         case CTDB_REQ_CALL:
457                 ctdb->statistics.client.req_call++;
458                 daemon_request_call_from_client(client, (struct ctdb_req_call *)hdr);
459                 break;
460
461         case CTDB_REQ_MESSAGE:
462                 ctdb->statistics.client.req_message++;
463                 daemon_request_message_from_client(client, (struct ctdb_req_message *)hdr);
464                 break;
465
466         case CTDB_REQ_CONTROL:
467                 ctdb->statistics.client.req_control++;
468                 daemon_request_control_from_client(client, (struct ctdb_req_control *)hdr);
469                 break;
470
471         default:
472                 DEBUG(0,(__location__ " daemon: unrecognized operation %u\n",
473                          hdr->operation));
474         }
475
476 done:
477         talloc_free(tmp_ctx);
478 }
479
480 /*
481   called when the daemon gets a incoming packet
482  */
483 static void ctdb_daemon_read_cb(uint8_t *data, size_t cnt, void *args)
484 {
485         struct ctdb_client *client = talloc_get_type(args, struct ctdb_client);
486         struct ctdb_req_header *hdr;
487
488         if (cnt == 0) {
489                 talloc_free(client);
490                 return;
491         }
492
493         client->ctdb->statistics.client_packets_recv++;
494
495         if (cnt < sizeof(*hdr)) {
496                 ctdb_set_error(client->ctdb, "Bad packet length %u in daemon\n", 
497                                (unsigned)cnt);
498                 return;
499         }
500         hdr = (struct ctdb_req_header *)data;
501         if (cnt != hdr->length) {
502                 ctdb_set_error(client->ctdb, "Bad header length %u expected %u\n in daemon", 
503                                (unsigned)hdr->length, (unsigned)cnt);
504                 return;
505         }
506
507         if (hdr->ctdb_magic != CTDB_MAGIC) {
508                 ctdb_set_error(client->ctdb, "Non CTDB packet rejected\n");
509                 return;
510         }
511
512         if (hdr->ctdb_version != CTDB_VERSION) {
513                 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
514                 return;
515         }
516
517         DEBUG(3,(__location__ " client request %u of type %u length %u from "
518                  "node %u to %u\n", hdr->reqid, hdr->operation, hdr->length,
519                  hdr->srcnode, hdr->destnode));
520
521         /* it is the responsibility of the incoming packet function to free 'data' */
522         daemon_incoming_packet(client, hdr);
523 }
524
525 static void ctdb_accept_client(struct event_context *ev, struct fd_event *fde, 
526                          uint16_t flags, void *private_data)
527 {
528         struct sockaddr_in addr;
529         socklen_t len;
530         int fd;
531         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
532         struct ctdb_client *client;
533
534         memset(&addr, 0, sizeof(addr));
535         len = sizeof(addr);
536         fd = accept(ctdb->daemon.sd, (struct sockaddr *)&addr, &len);
537         if (fd == -1) {
538                 return;
539         }
540
541         set_nonblocking(fd);
542         set_close_on_exec(fd);
543
544         client = talloc_zero(ctdb, struct ctdb_client);
545         client->ctdb = ctdb;
546         client->fd = fd;
547         client->client_id = ctdb_reqid_new(ctdb, client);
548         ctdb->statistics.num_clients++;
549
550         client->queue = ctdb_queue_setup(ctdb, client, fd, CTDB_DS_ALIGNMENT, 
551                                          ctdb_daemon_read_cb, client);
552
553         talloc_set_destructor(client, ctdb_client_destructor);
554 }
555
556
557
558 /*
559   create a unix domain socket and bind it
560   return a file descriptor open on the socket 
561 */
562 static int ux_socket_bind(struct ctdb_context *ctdb)
563 {
564         struct sockaddr_un addr;
565
566         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
567         if (ctdb->daemon.sd == -1) {
568                 return -1;
569         }
570
571         set_nonblocking(ctdb->daemon.sd);
572         set_close_on_exec(ctdb->daemon.sd);
573
574 #if 0
575         /* AIX doesn't like this :( */
576         if (fchown(ctdb->daemon.sd, geteuid(), getegid()) != 0 ||
577             fchmod(ctdb->daemon.sd, 0700) != 0) {
578                 DEBUG(0,("Unable to secure ctdb socket '%s', ctdb->daemon.name\n"));
579                 goto failed;
580         }
581 #endif
582
583         set_nonblocking(ctdb->daemon.sd);
584
585         memset(&addr, 0, sizeof(addr));
586         addr.sun_family = AF_UNIX;
587         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
588
589         if (bind(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
590                 DEBUG(0,("Unable to bind on ctdb socket '%s'\n", ctdb->daemon.name));
591                 goto failed;
592         }       
593         if (listen(ctdb->daemon.sd, 10) != 0) {
594                 DEBUG(0,("Unable to listen on ctdb socket '%s'\n", ctdb->daemon.name));
595                 goto failed;
596         }
597
598         return 0;
599
600 failed:
601         close(ctdb->daemon.sd);
602         ctdb->daemon.sd = -1;
603         return -1;      
604 }
605
606 /*
607   delete the socket on exit - called on destruction of autofree context
608  */
609 static int unlink_destructor(const char *name)
610 {
611         unlink(name);
612         return 0;
613 }
614
615
616 /*
617   start the protocol going as a daemon
618 */
619 int ctdb_start_daemon(struct ctdb_context *ctdb, bool do_fork)
620 {
621         int res;
622         struct fd_event *fde;
623         const char *domain_socket_name;
624
625         /* get rid of any old sockets */
626         unlink(ctdb->daemon.name);
627
628         /* create a unix domain stream socket to listen to */
629         res = ux_socket_bind(ctdb);
630         if (res!=0) {
631                 DEBUG(0,(__location__ " Failed to open CTDB unix domain socket\n"));
632                 exit(10);
633         }
634
635         if (do_fork && fork()) {
636                 return 0;
637         }
638
639         tdb_reopen_all(False);
640
641         if (do_fork) {
642                 setsid();
643         }
644         block_signal(SIGPIPE);
645
646         /* try to set us up as realtime */
647         ctdb_set_realtime(true);
648
649         /* ensure the socket is deleted on exit of the daemon */
650         domain_socket_name = talloc_strdup(talloc_autofree_context(), ctdb->daemon.name);
651         talloc_set_destructor(domain_socket_name, unlink_destructor);   
652
653         ctdb->ev = s4_event_context_init(NULL);
654
655         /* start frozen, then let the first election sort things out */
656         if (!ctdb_blocking_freeze(ctdb)) {
657                 DEBUG(0,("Failed to get initial freeze\n"));
658                 exit(12);
659         }
660
661         /* force initial recovery for election */
662         ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
663
664         /* now start accepting clients, only can do this once frozen */
665         fde = event_add_fd(ctdb->ev, ctdb, ctdb->daemon.sd, 
666                            EVENT_FD_READ|EVENT_FD_AUTOCLOSE, 
667                            ctdb_accept_client, ctdb);
668
669         ctdb_main_loop(ctdb);
670
671         return 0;
672 }
673
674 /*
675   allocate a packet for use in daemon<->daemon communication
676  */
677 struct ctdb_req_header *_ctdb_transport_allocate(struct ctdb_context *ctdb,
678                                                  TALLOC_CTX *mem_ctx, 
679                                                  enum ctdb_operation operation, 
680                                                  size_t length, size_t slength,
681                                                  const char *type)
682 {
683         int size;
684         struct ctdb_req_header *hdr;
685
686         length = MAX(length, slength);
687         size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
688
689         hdr = (struct ctdb_req_header *)ctdb->methods->allocate_pkt(mem_ctx, size);
690         if (hdr == NULL) {
691                 DEBUG(0,("Unable to allocate transport packet for operation %u of length %u\n",
692                          operation, (unsigned)length));
693                 return NULL;
694         }
695         talloc_set_name_const(hdr, type);
696         memset(hdr, 0, slength);
697         hdr->length       = length;
698         hdr->operation    = operation;
699         hdr->ctdb_magic   = CTDB_MAGIC;
700         hdr->ctdb_version = CTDB_VERSION;
701         hdr->generation   = ctdb->vnn_map->generation;
702         hdr->srcnode      = ctdb->vnn;
703
704         return hdr;     
705 }
706
707 struct daemon_control_state {
708         struct daemon_control_state *next, *prev;
709         struct ctdb_client *client;
710         struct ctdb_req_control *c;
711         uint32_t reqid;
712         struct ctdb_node *node;
713 };
714
715 /*
716   callback when a control reply comes in
717  */
718 static void daemon_control_callback(struct ctdb_context *ctdb,
719                                     int32_t status, TDB_DATA data, 
720                                     const char *errormsg,
721                                     void *private_data)
722 {
723         struct daemon_control_state *state = talloc_get_type(private_data, 
724                                                              struct daemon_control_state);
725         struct ctdb_client *client = state->client;
726         struct ctdb_reply_control *r;
727         size_t len;
728
729         /* construct a message to send to the client containing the data */
730         len = offsetof(struct ctdb_reply_control, data) + data.dsize;
731         if (errormsg) {
732                 len += strlen(errormsg);
733         }
734         r = ctdbd_allocate_pkt(ctdb, state, CTDB_REPLY_CONTROL, len, 
735                                struct ctdb_reply_control);
736         CTDB_NO_MEMORY_VOID(ctdb, r);
737
738         r->hdr.reqid     = state->reqid;
739         r->status        = status;
740         r->datalen       = data.dsize;
741         r->errorlen = 0;
742         memcpy(&r->data[0], data.dptr, data.dsize);
743         if (errormsg) {
744                 r->errorlen = strlen(errormsg);
745                 memcpy(&r->data[r->datalen], errormsg, r->errorlen);
746         }
747
748         daemon_queue_send(client, &r->hdr);
749
750         talloc_free(state);
751 }
752
753 /*
754   fail all pending controls to a disconnected node
755  */
756 void ctdb_daemon_cancel_controls(struct ctdb_context *ctdb, struct ctdb_node *node)
757 {
758         struct daemon_control_state *state;
759         while ((state = node->pending_controls)) {
760                 DLIST_REMOVE(node->pending_controls, state);
761                 daemon_control_callback(ctdb, (uint32_t)-1, tdb_null, 
762                                         "node is disconnected", state);
763         }
764 }
765
766 /*
767   destroy a daemon_control_state
768  */
769 static int daemon_control_destructor(struct daemon_control_state *state)
770 {
771         if (state->node) {
772                 DLIST_REMOVE(state->node->pending_controls, state);
773         }
774         return 0;
775 }
776
777 /*
778   this is called when the ctdb daemon received a ctdb request control
779   from a local client over the unix domain socket
780  */
781 static void daemon_request_control_from_client(struct ctdb_client *client, 
782                                                struct ctdb_req_control *c)
783 {
784         TDB_DATA data;
785         int res;
786         struct daemon_control_state *state;
787         TALLOC_CTX *tmp_ctx = talloc_new(client);
788
789         if (c->hdr.destnode == CTDB_CURRENT_NODE) {
790                 c->hdr.destnode = client->ctdb->vnn;
791         }
792
793         state = talloc(client, struct daemon_control_state);
794         CTDB_NO_MEMORY_VOID(client->ctdb, state);
795
796         state->client = client;
797         state->c = talloc_steal(state, c);
798         state->reqid = c->hdr.reqid;
799         if (ctdb_validate_vnn(client->ctdb, c->hdr.destnode)) {
800                 state->node = client->ctdb->nodes[c->hdr.destnode];
801                 DLIST_ADD(state->node->pending_controls, state);
802         } else {
803                 state->node = NULL;
804         }
805
806         talloc_set_destructor(state, daemon_control_destructor);
807
808         if (c->flags & CTDB_CTRL_FLAG_NOREPLY) {
809                 talloc_steal(tmp_ctx, state);
810         }
811         
812         data.dptr = &c->data[0];
813         data.dsize = c->datalen;
814         res = ctdb_daemon_send_control(client->ctdb, c->hdr.destnode,
815                                        c->srvid, c->opcode, client->client_id,
816                                        c->flags,
817                                        data, daemon_control_callback,
818                                        state);
819         if (res != 0) {
820                 DEBUG(0,(__location__ " Failed to send control to remote node %u\n",
821                          c->hdr.destnode));
822         }
823
824         talloc_free(tmp_ctx);
825 }
826
827 /*
828   register a call function
829 */
830 int ctdb_daemon_set_call(struct ctdb_context *ctdb, uint32_t db_id,
831                          ctdb_fn_t fn, int id)
832 {
833         struct ctdb_registered_call *call;
834         struct ctdb_db_context *ctdb_db;
835
836         ctdb_db = find_ctdb_db(ctdb, db_id);
837         if (ctdb_db == NULL) {
838                 return -1;
839         }
840
841         call = talloc(ctdb_db, struct ctdb_registered_call);
842         call->fn = fn;
843         call->id = id;
844
845         DLIST_ADD(ctdb_db->calls, call);        
846         return 0;
847 }
848
849
850
851 /*
852   this local messaging handler is ugly, but is needed to prevent
853   recursion in ctdb_send_message() when the destination node is the
854   same as the source node
855  */
856 struct ctdb_local_message {
857         struct ctdb_context *ctdb;
858         uint64_t srvid;
859         TDB_DATA data;
860 };
861
862 static void ctdb_local_message_trigger(struct event_context *ev, struct timed_event *te, 
863                                        struct timeval t, void *private_data)
864 {
865         struct ctdb_local_message *m = talloc_get_type(private_data, 
866                                                        struct ctdb_local_message);
867         int res;
868
869         res = ctdb_dispatch_message(m->ctdb, m->srvid, m->data);
870         if (res != 0) {
871                 DEBUG(0, (__location__ " Failed to dispatch message for srvid=%llu\n", 
872                           (unsigned long long)m->srvid));
873         }
874         talloc_free(m);
875 }
876
877 static int ctdb_local_message(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data)
878 {
879         struct ctdb_local_message *m;
880         m = talloc(ctdb, struct ctdb_local_message);
881         CTDB_NO_MEMORY(ctdb, m);
882
883         m->ctdb = ctdb;
884         m->srvid = srvid;
885         m->data  = data;
886         m->data.dptr = talloc_memdup(m, m->data.dptr, m->data.dsize);
887         if (m->data.dptr == NULL) {
888                 talloc_free(m);
889                 return -1;
890         }
891
892         /* this needs to be done as an event to prevent recursion */
893         event_add_timed(ctdb->ev, m, timeval_zero(), ctdb_local_message_trigger, m);
894         return 0;
895 }
896
897 /*
898   send a ctdb message
899 */
900 int ctdb_daemon_send_message(struct ctdb_context *ctdb, uint32_t vnn,
901                              uint64_t srvid, TDB_DATA data)
902 {
903         struct ctdb_req_message *r;
904         int len;
905
906         /* see if this is a message to ourselves */
907         if (vnn == ctdb->vnn) {
908                 return ctdb_local_message(ctdb, srvid, data);
909         }
910
911         len = offsetof(struct ctdb_req_message, data) + data.dsize;
912         r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_MESSAGE, len,
913                                     struct ctdb_req_message);
914         CTDB_NO_MEMORY(ctdb, r);
915
916         r->hdr.destnode  = vnn;
917         r->srvid         = srvid;
918         r->datalen       = data.dsize;
919         memcpy(&r->data[0], data.dptr, data.dsize);
920
921         ctdb_queue_packet(ctdb, &r->hdr);
922
923         talloc_free(r);
924         return 0;
925 }
926