merge from tridge
[sahlberg/ctdb.git] / server / ctdb_daemon.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2006
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 2 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, write to the Free Software
18    Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
19 */
20
21 #include "includes.h"
22 #include "db_wrap.h"
23 #include "lib/tdb/include/tdb.h"
24 #include "lib/events/events.h"
25 #include "lib/util/dlinklist.h"
26 #include "system/network.h"
27 #include "system/filesys.h"
28 #include "system/wait.h"
29 #include "../include/ctdb.h"
30 #include "../include/ctdb_private.h"
31
32 static void daemon_incoming_packet(void *, struct ctdb_req_header *);
33
34 /*
35   handler for when a node changes its flags
36 */
37 static void flag_change_handler(struct ctdb_context *ctdb, uint64_t srvid, 
38                                 TDB_DATA data, void *private_data)
39 {
40         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
41
42         if (data.dsize != sizeof(*c) || !ctdb_validate_vnn(ctdb, c->vnn)) {
43                 DEBUG(0,(__location__ "Invalid data in ctdb_node_flag_change\n"));
44                 return;
45         }
46
47         if (!ctdb_validate_vnn(ctdb, c->vnn)) {
48                 DEBUG(0,("Bad vnn %u in flag_change_handler\n", c->vnn));
49                 return;
50         }
51
52         /* don't get the disconnected flag from the other node */
53         ctdb->nodes[c->vnn]->flags = 
54                 (ctdb->nodes[c->vnn]->flags&NODE_FLAGS_DISCONNECTED) 
55                 | (c->flags & ~NODE_FLAGS_DISCONNECTED);        
56         DEBUG(2,("Node flags for node %u are now 0x%x\n", c->vnn, ctdb->nodes[c->vnn]->flags));
57
58         /* make sure we don't hold any IPs when we shouldn't */
59         if (c->vnn == ctdb->vnn &&
60             (ctdb->nodes[c->vnn]->flags & (NODE_FLAGS_INACTIVE|NODE_FLAGS_BANNED))) {
61                 ctdb_release_all_ips(ctdb);
62         }
63 }
64
65 /* called when the "startup" event script has finished */
66 static void ctdb_start_transport(struct ctdb_context *ctdb, int status, void *p)
67 {
68         if (status != 0) {
69                 DEBUG(0,("startup event failed!\n"));
70                 ctdb_fatal(ctdb, "startup event script failed");                
71         }
72
73         /* start the transport running */
74         if (ctdb->methods->start(ctdb) != 0) {
75                 DEBUG(0,("transport failed to start!\n"));
76                 ctdb_fatal(ctdb, "transport failed to start");
77         }
78
79         /* start the recovery daemon process */
80         if (ctdb_start_recoverd(ctdb) != 0) {
81                 DEBUG(0,("Failed to start recovery daemon\n"));
82                 exit(11);
83         }
84
85         /* a handler for when nodes are disabled/enabled */
86         ctdb_register_message_handler(ctdb, ctdb, CTDB_SRVID_NODE_FLAGS_CHANGED, 
87                                       flag_change_handler, NULL);
88
89         /* start monitoring for dead nodes */
90         ctdb_start_monitoring(ctdb);
91 }
92
93 /* go into main ctdb loop */
94 static void ctdb_main_loop(struct ctdb_context *ctdb)
95 {
96         int ret = -1;
97
98         if (strcmp(ctdb->transport, "tcp") == 0) {
99                 int ctdb_tcp_init(struct ctdb_context *);
100                 ret = ctdb_tcp_init(ctdb);
101         }
102 #ifdef USE_INFINIBAND
103         if (strcmp(ctdb->transport, "ib") == 0) {
104                 int ctdb_ibw_init(struct ctdb_context *);
105                 ret = ctdb_ibw_init(ctdb);
106         }
107 #endif
108         if (ret != 0) {
109                 DEBUG(0,("Failed to initialise transport '%s'\n", ctdb->transport));
110                 return;
111         }
112
113         /* initialise the transport  */
114         if (ctdb->methods->initialise(ctdb) != 0) {
115                 DEBUG(0,("transport failed to initialise!\n"));
116                 ctdb_fatal(ctdb, "transport failed to initialise");
117         }
118
119         /* tell all other nodes we've just started up */
120         ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL,
121                                  0, CTDB_CONTROL_STARTUP, 0,
122                                  CTDB_CTRL_FLAG_NOREPLY,
123                                  tdb_null, NULL, NULL);
124
125         /* release any IPs we hold from previous runs of the daemon */
126         ctdb_release_all_ips(ctdb);
127
128         ret = ctdb_event_script_callback(ctdb, timeval_zero(), ctdb, 
129                                          ctdb_start_transport, NULL, "startup");
130         if (ret != 0) {
131                 DEBUG(0,("Failed startup event script\n"));
132                 return;
133         }
134
135         /* go into a wait loop to allow other nodes to complete */
136         event_loop_wait(ctdb->ev);
137
138         DEBUG(0,("event_loop_wait() returned. this should not happen\n"));
139         exit(1);
140 }
141
142
143 static void block_signal(int signum)
144 {
145         struct sigaction act;
146
147         memset(&act, 0, sizeof(act));
148
149         act.sa_handler = SIG_IGN;
150         sigemptyset(&act.sa_mask);
151         sigaddset(&act.sa_mask, signum);
152         sigaction(signum, &act, NULL);
153 }
154
155
156 /*
157   send a packet to a client
158  */
159 static int daemon_queue_send(struct ctdb_client *client, struct ctdb_req_header *hdr)
160 {
161         client->ctdb->statistics.client_packets_sent++;
162         return ctdb_queue_send(client->queue, (uint8_t *)hdr, hdr->length);
163 }
164
165 /*
166   message handler for when we are in daemon mode. This redirects the message
167   to the right client
168  */
169 static void daemon_message_handler(struct ctdb_context *ctdb, uint64_t srvid, 
170                                     TDB_DATA data, void *private_data)
171 {
172         struct ctdb_client *client = talloc_get_type(private_data, struct ctdb_client);
173         struct ctdb_req_message *r;
174         int len;
175
176         /* construct a message to send to the client containing the data */
177         len = offsetof(struct ctdb_req_message, data) + data.dsize;
178         r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE, 
179                                len, struct ctdb_req_message);
180         CTDB_NO_MEMORY_VOID(ctdb, r);
181
182         talloc_set_name_const(r, "req_message packet");
183
184         r->srvid         = srvid;
185         r->datalen       = data.dsize;
186         memcpy(&r->data[0], data.dptr, data.dsize);
187
188         daemon_queue_send(client, &r->hdr);
189
190         talloc_free(r);
191 }
192                                            
193
194 /*
195   this is called when the ctdb daemon received a ctdb request to 
196   set the srvid from the client
197  */
198 int daemon_register_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
199 {
200         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
201         int res;
202         if (client == NULL) {
203                 DEBUG(0,("Bad client_id in daemon_request_register_message_handler\n"));
204                 return -1;
205         }
206         res = ctdb_register_message_handler(ctdb, client, srvid, daemon_message_handler, client);
207         if (res != 0) {
208                 DEBUG(0,(__location__ " Failed to register handler %llu in daemon\n", 
209                          (unsigned long long)srvid));
210         } else {
211                 DEBUG(2,(__location__ " Registered message handler for srvid=%llu\n", 
212                          (unsigned long long)srvid));
213         }
214
215         /* this is a hack for Samba - we now know the pid of the Samba client */
216         if ((srvid & 0xFFFFFFFF) == srvid &&
217             kill(srvid, 0) == 0) {
218                 client->pid = srvid;
219                 DEBUG(0,(__location__ " Registered PID %u for client %u\n",
220                          (unsigned)client->pid, client_id));
221         }
222         return res;
223 }
224
225 /*
226   this is called when the ctdb daemon received a ctdb request to 
227   remove a srvid from the client
228  */
229 int daemon_deregister_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
230 {
231         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
232         if (client == NULL) {
233                 DEBUG(0,("Bad client_id in daemon_request_deregister_message_handler\n"));
234                 return -1;
235         }
236         return ctdb_deregister_message_handler(ctdb, srvid, client);
237 }
238
239
240 /*
241   destroy a ctdb_client
242 */
243 static int ctdb_client_destructor(struct ctdb_client *client)
244 {
245         ctdb_takeover_client_destructor_hook(client);
246         ctdb_reqid_remove(client->ctdb, client->client_id);
247         client->ctdb->statistics.num_clients--;
248         return 0;
249 }
250
251
252 /*
253   this is called when the ctdb daemon received a ctdb request message
254   from a local client over the unix domain socket
255  */
256 static void daemon_request_message_from_client(struct ctdb_client *client, 
257                                                struct ctdb_req_message *c)
258 {
259         TDB_DATA data;
260         int res;
261
262         /* maybe the message is for another client on this node */
263         if (ctdb_get_vnn(client->ctdb)==c->hdr.destnode) {
264                 ctdb_request_message(client->ctdb, (struct ctdb_req_header *)c);
265                 return;
266         }
267
268         /* its for a remote node */
269         data.dptr = &c->data[0];
270         data.dsize = c->datalen;
271         res = ctdb_daemon_send_message(client->ctdb, c->hdr.destnode,
272                                        c->srvid, data);
273         if (res != 0) {
274                 DEBUG(0,(__location__ " Failed to send message to remote node %u\n",
275                          c->hdr.destnode));
276         }
277 }
278
279
280 struct daemon_call_state {
281         struct ctdb_client *client;
282         uint32_t reqid;
283         struct ctdb_call *call;
284         struct timeval start_time;
285 };
286
287 /* 
288    complete a call from a client 
289 */
290 static void daemon_call_from_client_callback(struct ctdb_call_state *state)
291 {
292         struct daemon_call_state *dstate = talloc_get_type(state->async.private_data, 
293                                                            struct daemon_call_state);
294         struct ctdb_reply_call *r;
295         int res;
296         uint32_t length;
297         struct ctdb_client *client = dstate->client;
298
299         talloc_steal(client, dstate);
300         talloc_steal(dstate, dstate->call);
301
302         res = ctdb_daemon_call_recv(state, dstate->call);
303         if (res != 0) {
304                 DEBUG(0, (__location__ " ctdbd_call_recv() returned error\n"));
305                 client->ctdb->statistics.pending_calls--;
306                 ctdb_latency(&client->ctdb->statistics.max_call_latency, dstate->start_time);
307                 return;
308         }
309
310         length = offsetof(struct ctdb_reply_call, data) + dstate->call->reply_data.dsize;
311         r = ctdbd_allocate_pkt(client->ctdb, dstate, CTDB_REPLY_CALL, 
312                                length, struct ctdb_reply_call);
313         if (r == NULL) {
314                 DEBUG(0, (__location__ " Failed to allocate reply_call in ctdb daemon\n"));
315                 client->ctdb->statistics.pending_calls--;
316                 ctdb_latency(&client->ctdb->statistics.max_call_latency, dstate->start_time);
317                 return;
318         }
319         r->hdr.reqid        = dstate->reqid;
320         r->datalen          = dstate->call->reply_data.dsize;
321         memcpy(&r->data[0], dstate->call->reply_data.dptr, r->datalen);
322
323         res = daemon_queue_send(client, &r->hdr);
324         if (res != 0) {
325                 DEBUG(0, (__location__ " Failed to queue packet from daemon to client\n"));
326         }
327         ctdb_latency(&client->ctdb->statistics.max_call_latency, dstate->start_time);
328         talloc_free(dstate);
329         client->ctdb->statistics.pending_calls--;
330 }
331
332
333 static void daemon_request_call_from_client(struct ctdb_client *client, 
334                                             struct ctdb_req_call *c);
335
336 /*
337   this is called when the ctdb daemon received a ctdb request call
338   from a local client over the unix domain socket
339  */
340 static void daemon_request_call_from_client(struct ctdb_client *client, 
341                                             struct ctdb_req_call *c)
342 {
343         struct ctdb_call_state *state;
344         struct ctdb_db_context *ctdb_db;
345         struct daemon_call_state *dstate;
346         struct ctdb_call *call;
347         struct ctdb_ltdb_header header;
348         TDB_DATA key, data;
349         int ret;
350         struct ctdb_context *ctdb = client->ctdb;
351
352         ctdb->statistics.total_calls++;
353         ctdb->statistics.pending_calls++;
354
355         ctdb_db = find_ctdb_db(client->ctdb, c->db_id);
356         if (!ctdb_db) {
357                 DEBUG(0, (__location__ " Unknown database in request. db_id==0x%08x",
358                           c->db_id));
359                 ctdb->statistics.pending_calls--;
360                 return;
361         }
362
363         key.dptr = c->data;
364         key.dsize = c->keylen;
365
366         ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header, 
367                                            (struct ctdb_req_header *)c, &data,
368                                            daemon_incoming_packet, client, True);
369         if (ret == -2) {
370                 /* will retry later */
371                 ctdb->statistics.pending_calls--;
372                 return;
373         }
374
375         if (ret != 0) {
376                 DEBUG(0,(__location__ " Unable to fetch record\n"));
377                 ctdb->statistics.pending_calls--;
378                 return;
379         }
380
381         dstate = talloc(client, struct daemon_call_state);
382         if (dstate == NULL) {
383                 ctdb_ltdb_unlock(ctdb_db, key);
384                 DEBUG(0,(__location__ " Unable to allocate dstate\n"));
385                 ctdb->statistics.pending_calls--;
386                 return;
387         }
388         dstate->start_time = timeval_current();
389         dstate->client = client;
390         dstate->reqid  = c->hdr.reqid;
391         talloc_steal(dstate, data.dptr);
392
393         call = dstate->call = talloc_zero(dstate, struct ctdb_call);
394         if (call == NULL) {
395                 ctdb_ltdb_unlock(ctdb_db, key);
396                 DEBUG(0,(__location__ " Unable to allocate call\n"));
397                 ctdb->statistics.pending_calls--;
398                 ctdb_latency(&ctdb->statistics.max_call_latency, dstate->start_time);
399                 return;
400         }
401
402         call->call_id = c->callid;
403         call->key = key;
404         call->call_data.dptr = c->data + c->keylen;
405         call->call_data.dsize = c->calldatalen;
406         call->flags = c->flags;
407
408         if (header.dmaster == ctdb->vnn) {
409                 state = ctdb_call_local_send(ctdb_db, call, &header, &data);
410         } else {
411                 state = ctdb_daemon_call_send_remote(ctdb_db, call, &header);
412         }
413
414         ctdb_ltdb_unlock(ctdb_db, key);
415
416         if (state == NULL) {
417                 DEBUG(0,(__location__ " Unable to setup call send\n"));
418                 ctdb->statistics.pending_calls--;
419                 ctdb_latency(&ctdb->statistics.max_call_latency, dstate->start_time);
420                 return;
421         }
422         talloc_steal(state, dstate);
423         talloc_steal(client, state);
424
425         state->async.fn = daemon_call_from_client_callback;
426         state->async.private_data = dstate;
427 }
428
429
430 static void daemon_request_control_from_client(struct ctdb_client *client, 
431                                                struct ctdb_req_control *c);
432
433 /* data contains a packet from the client */
434 static void daemon_incoming_packet(void *p, struct ctdb_req_header *hdr)
435 {
436         struct ctdb_client *client = talloc_get_type(p, struct ctdb_client);
437         TALLOC_CTX *tmp_ctx;
438         struct ctdb_context *ctdb = client->ctdb;
439
440         /* place the packet as a child of a tmp_ctx. We then use
441            talloc_free() below to free it. If any of the calls want
442            to keep it, then they will steal it somewhere else, and the
443            talloc_free() will be a no-op */
444         tmp_ctx = talloc_new(client);
445         talloc_steal(tmp_ctx, hdr);
446
447         if (hdr->ctdb_magic != CTDB_MAGIC) {
448                 ctdb_set_error(client->ctdb, "Non CTDB packet rejected in daemon\n");
449                 goto done;
450         }
451
452         if (hdr->ctdb_version != CTDB_VERSION) {
453                 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
454                 goto done;
455         }
456
457         switch (hdr->operation) {
458         case CTDB_REQ_CALL:
459                 ctdb->statistics.client.req_call++;
460                 daemon_request_call_from_client(client, (struct ctdb_req_call *)hdr);
461                 break;
462
463         case CTDB_REQ_MESSAGE:
464                 ctdb->statistics.client.req_message++;
465                 daemon_request_message_from_client(client, (struct ctdb_req_message *)hdr);
466                 break;
467
468         case CTDB_REQ_CONTROL:
469                 ctdb->statistics.client.req_control++;
470                 daemon_request_control_from_client(client, (struct ctdb_req_control *)hdr);
471                 break;
472
473         default:
474                 DEBUG(0,(__location__ " daemon: unrecognized operation %u\n",
475                          hdr->operation));
476         }
477
478 done:
479         talloc_free(tmp_ctx);
480 }
481
482 /*
483   called when the daemon gets a incoming packet
484  */
485 static void ctdb_daemon_read_cb(uint8_t *data, size_t cnt, void *args)
486 {
487         struct ctdb_client *client = talloc_get_type(args, struct ctdb_client);
488         struct ctdb_req_header *hdr;
489
490         if (cnt == 0) {
491                 talloc_free(client);
492                 return;
493         }
494
495         client->ctdb->statistics.client_packets_recv++;
496
497         if (cnt < sizeof(*hdr)) {
498                 ctdb_set_error(client->ctdb, "Bad packet length %u in daemon\n", 
499                                (unsigned)cnt);
500                 return;
501         }
502         hdr = (struct ctdb_req_header *)data;
503         if (cnt != hdr->length) {
504                 ctdb_set_error(client->ctdb, "Bad header length %u expected %u\n in daemon", 
505                                (unsigned)hdr->length, (unsigned)cnt);
506                 return;
507         }
508
509         if (hdr->ctdb_magic != CTDB_MAGIC) {
510                 ctdb_set_error(client->ctdb, "Non CTDB packet rejected\n");
511                 return;
512         }
513
514         if (hdr->ctdb_version != CTDB_VERSION) {
515                 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
516                 return;
517         }
518
519         DEBUG(3,(__location__ " client request %u of type %u length %u from "
520                  "node %u to %u\n", hdr->reqid, hdr->operation, hdr->length,
521                  hdr->srcnode, hdr->destnode));
522
523         /* it is the responsibility of the incoming packet function to free 'data' */
524         daemon_incoming_packet(client, hdr);
525 }
526
527 static void ctdb_accept_client(struct event_context *ev, struct fd_event *fde, 
528                          uint16_t flags, void *private_data)
529 {
530         struct sockaddr_in addr;
531         socklen_t len;
532         int fd;
533         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
534         struct ctdb_client *client;
535
536         memset(&addr, 0, sizeof(addr));
537         len = sizeof(addr);
538         fd = accept(ctdb->daemon.sd, (struct sockaddr *)&addr, &len);
539         if (fd == -1) {
540                 return;
541         }
542
543         set_nonblocking(fd);
544         set_close_on_exec(fd);
545
546         client = talloc_zero(ctdb, struct ctdb_client);
547         client->ctdb = ctdb;
548         client->fd = fd;
549         client->client_id = ctdb_reqid_new(ctdb, client);
550         ctdb->statistics.num_clients++;
551
552         client->queue = ctdb_queue_setup(ctdb, client, fd, CTDB_DS_ALIGNMENT, 
553                                          ctdb_daemon_read_cb, client);
554
555         talloc_set_destructor(client, ctdb_client_destructor);
556 }
557
558
559
560 /*
561   create a unix domain socket and bind it
562   return a file descriptor open on the socket 
563 */
564 static int ux_socket_bind(struct ctdb_context *ctdb)
565 {
566         struct sockaddr_un addr;
567
568         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
569         if (ctdb->daemon.sd == -1) {
570                 return -1;
571         }
572
573         set_nonblocking(ctdb->daemon.sd);
574         set_close_on_exec(ctdb->daemon.sd);
575
576 #if 0
577         /* AIX doesn't like this :( */
578         if (fchown(ctdb->daemon.sd, geteuid(), getegid()) != 0 ||
579             fchmod(ctdb->daemon.sd, 0700) != 0) {
580                 DEBUG(0,("Unable to secure ctdb socket '%s', ctdb->daemon.name\n"));
581                 goto failed;
582         }
583 #endif
584
585         set_nonblocking(ctdb->daemon.sd);
586
587         memset(&addr, 0, sizeof(addr));
588         addr.sun_family = AF_UNIX;
589         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
590
591         if (bind(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
592                 DEBUG(0,("Unable to bind on ctdb socket '%s'\n", ctdb->daemon.name));
593                 goto failed;
594         }       
595         if (listen(ctdb->daemon.sd, 10) != 0) {
596                 DEBUG(0,("Unable to listen on ctdb socket '%s'\n", ctdb->daemon.name));
597                 goto failed;
598         }
599
600         return 0;
601
602 failed:
603         close(ctdb->daemon.sd);
604         ctdb->daemon.sd = -1;
605         return -1;      
606 }
607
608 /*
609   delete the socket on exit - called on destruction of autofree context
610  */
611 static int unlink_destructor(const char *name)
612 {
613         unlink(name);
614         return 0;
615 }
616
617
618 /*
619   start the protocol going as a daemon
620 */
621 int ctdb_start_daemon(struct ctdb_context *ctdb, bool do_fork)
622 {
623         int res;
624         struct fd_event *fde;
625         const char *domain_socket_name;
626
627         /* get rid of any old sockets */
628         unlink(ctdb->daemon.name);
629
630         /* create a unix domain stream socket to listen to */
631         res = ux_socket_bind(ctdb);
632         if (res!=0) {
633                 DEBUG(0,(__location__ " Failed to open CTDB unix domain socket\n"));
634                 exit(10);
635         }
636
637         if (do_fork && fork()) {
638                 return 0;
639         }
640
641         tdb_reopen_all(False);
642
643         if (do_fork) {
644                 setsid();
645         }
646         block_signal(SIGPIPE);
647
648         /* try to set us up as realtime */
649         ctdb_set_realtime(true);
650
651         /* ensure the socket is deleted on exit of the daemon */
652         domain_socket_name = talloc_strdup(talloc_autofree_context(), ctdb->daemon.name);
653         talloc_set_destructor(domain_socket_name, unlink_destructor);   
654
655         ctdb->ev = event_context_init(NULL);
656
657         /* start frozen, then let the first election sort things out */
658         if (!ctdb_blocking_freeze(ctdb)) {
659                 DEBUG(0,("Failed to get initial freeze\n"));
660                 exit(12);
661         }
662
663         /* force initial recovery for election */
664         ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
665
666         /* now start accepting clients, only can do this once frozen */
667         fde = event_add_fd(ctdb->ev, ctdb, ctdb->daemon.sd, 
668                            EVENT_FD_READ|EVENT_FD_AUTOCLOSE, 
669                            ctdb_accept_client, ctdb);
670
671         ctdb_main_loop(ctdb);
672
673         return 0;
674 }
675
676 /*
677   allocate a packet for use in daemon<->daemon communication
678  */
679 struct ctdb_req_header *_ctdb_transport_allocate(struct ctdb_context *ctdb,
680                                                  TALLOC_CTX *mem_ctx, 
681                                                  enum ctdb_operation operation, 
682                                                  size_t length, size_t slength,
683                                                  const char *type)
684 {
685         int size;
686         struct ctdb_req_header *hdr;
687
688         length = MAX(length, slength);
689         size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
690
691         hdr = (struct ctdb_req_header *)ctdb->methods->allocate_pkt(mem_ctx, size);
692         if (hdr == NULL) {
693                 DEBUG(0,("Unable to allocate transport packet for operation %u of length %u\n",
694                          operation, (unsigned)length));
695                 return NULL;
696         }
697         talloc_set_name_const(hdr, type);
698         memset(hdr, 0, slength);
699         hdr->length       = length;
700         hdr->operation    = operation;
701         hdr->ctdb_magic   = CTDB_MAGIC;
702         hdr->ctdb_version = CTDB_VERSION;
703         hdr->generation   = ctdb->vnn_map->generation;
704         hdr->srcnode      = ctdb->vnn;
705
706         return hdr;     
707 }
708
709 struct daemon_control_state {
710         struct daemon_control_state *next, *prev;
711         struct ctdb_client *client;
712         struct ctdb_req_control *c;
713         uint32_t reqid;
714         struct ctdb_node *node;
715 };
716
717 /*
718   callback when a control reply comes in
719  */
720 static void daemon_control_callback(struct ctdb_context *ctdb,
721                                     int32_t status, TDB_DATA data, 
722                                     const char *errormsg,
723                                     void *private_data)
724 {
725         struct daemon_control_state *state = talloc_get_type(private_data, 
726                                                              struct daemon_control_state);
727         struct ctdb_client *client = state->client;
728         struct ctdb_reply_control *r;
729         size_t len;
730
731         /* construct a message to send to the client containing the data */
732         len = offsetof(struct ctdb_reply_control, data) + data.dsize;
733         if (errormsg) {
734                 len += strlen(errormsg);
735         }
736         r = ctdbd_allocate_pkt(ctdb, state, CTDB_REPLY_CONTROL, len, 
737                                struct ctdb_reply_control);
738         CTDB_NO_MEMORY_VOID(ctdb, r);
739
740         r->hdr.reqid     = state->reqid;
741         r->status        = status;
742         r->datalen       = data.dsize;
743         r->errorlen = 0;
744         memcpy(&r->data[0], data.dptr, data.dsize);
745         if (errormsg) {
746                 r->errorlen = strlen(errormsg);
747                 memcpy(&r->data[r->datalen], errormsg, r->errorlen);
748         }
749
750         daemon_queue_send(client, &r->hdr);
751
752         talloc_free(state);
753 }
754
755 /*
756   fail all pending controls to a disconnected node
757  */
758 void ctdb_daemon_cancel_controls(struct ctdb_context *ctdb, struct ctdb_node *node)
759 {
760         struct daemon_control_state *state;
761         while ((state = node->pending_controls)) {
762                 DLIST_REMOVE(node->pending_controls, state);
763                 daemon_control_callback(ctdb, (uint32_t)-1, tdb_null, 
764                                         "node is disconnected", state);
765         }
766 }
767
768 /*
769   destroy a daemon_control_state
770  */
771 static int daemon_control_destructor(struct daemon_control_state *state)
772 {
773         if (state->node) {
774                 DLIST_REMOVE(state->node->pending_controls, state);
775         }
776         return 0;
777 }
778
779 /*
780   this is called when the ctdb daemon received a ctdb request control
781   from a local client over the unix domain socket
782  */
783 static void daemon_request_control_from_client(struct ctdb_client *client, 
784                                                struct ctdb_req_control *c)
785 {
786         TDB_DATA data;
787         int res;
788         struct daemon_control_state *state;
789         TALLOC_CTX *tmp_ctx = talloc_new(client);
790
791         if (c->hdr.destnode == CTDB_CURRENT_NODE) {
792                 c->hdr.destnode = client->ctdb->vnn;
793         }
794
795         state = talloc(client, struct daemon_control_state);
796         CTDB_NO_MEMORY_VOID(client->ctdb, state);
797
798         state->client = client;
799         state->c = talloc_steal(state, c);
800         state->reqid = c->hdr.reqid;
801         if (ctdb_validate_vnn(client->ctdb, c->hdr.destnode)) {
802                 state->node = client->ctdb->nodes[c->hdr.destnode];
803                 DLIST_ADD(state->node->pending_controls, state);
804         } else {
805                 state->node = NULL;
806         }
807
808         talloc_set_destructor(state, daemon_control_destructor);
809
810         if (c->flags & CTDB_CTRL_FLAG_NOREPLY) {
811                 talloc_steal(tmp_ctx, state);
812         }
813         
814         data.dptr = &c->data[0];
815         data.dsize = c->datalen;
816         res = ctdb_daemon_send_control(client->ctdb, c->hdr.destnode,
817                                        c->srvid, c->opcode, client->client_id,
818                                        c->flags,
819                                        data, daemon_control_callback,
820                                        state);
821         if (res != 0) {
822                 DEBUG(0,(__location__ " Failed to send control to remote node %u\n",
823                          c->hdr.destnode));
824         }
825
826         talloc_free(tmp_ctx);
827 }
828
829 /*
830   register a call function
831 */
832 int ctdb_daemon_set_call(struct ctdb_context *ctdb, uint32_t db_id,
833                          ctdb_fn_t fn, int id)
834 {
835         struct ctdb_registered_call *call;
836         struct ctdb_db_context *ctdb_db;
837
838         ctdb_db = find_ctdb_db(ctdb, db_id);
839         if (ctdb_db == NULL) {
840                 return -1;
841         }
842
843         call = talloc(ctdb_db, struct ctdb_registered_call);
844         call->fn = fn;
845         call->id = id;
846
847         DLIST_ADD(ctdb_db->calls, call);        
848         return 0;
849 }
850
851
852
853 /*
854   this local messaging handler is ugly, but is needed to prevent
855   recursion in ctdb_send_message() when the destination node is the
856   same as the source node
857  */
858 struct ctdb_local_message {
859         struct ctdb_context *ctdb;
860         uint64_t srvid;
861         TDB_DATA data;
862 };
863
864 static void ctdb_local_message_trigger(struct event_context *ev, struct timed_event *te, 
865                                        struct timeval t, void *private_data)
866 {
867         struct ctdb_local_message *m = talloc_get_type(private_data, 
868                                                        struct ctdb_local_message);
869         int res;
870
871         res = ctdb_dispatch_message(m->ctdb, m->srvid, m->data);
872         if (res != 0) {
873                 DEBUG(0, (__location__ " Failed to dispatch message for srvid=%llu\n", 
874                           (unsigned long long)m->srvid));
875         }
876         talloc_free(m);
877 }
878
879 static int ctdb_local_message(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data)
880 {
881         struct ctdb_local_message *m;
882         m = talloc(ctdb, struct ctdb_local_message);
883         CTDB_NO_MEMORY(ctdb, m);
884
885         m->ctdb = ctdb;
886         m->srvid = srvid;
887         m->data  = data;
888         m->data.dptr = talloc_memdup(m, m->data.dptr, m->data.dsize);
889         if (m->data.dptr == NULL) {
890                 talloc_free(m);
891                 return -1;
892         }
893
894         /* this needs to be done as an event to prevent recursion */
895         event_add_timed(ctdb->ev, m, timeval_zero(), ctdb_local_message_trigger, m);
896         return 0;
897 }
898
899 /*
900   send a ctdb message
901 */
902 int ctdb_daemon_send_message(struct ctdb_context *ctdb, uint32_t vnn,
903                              uint64_t srvid, TDB_DATA data)
904 {
905         struct ctdb_req_message *r;
906         int len;
907
908         /* see if this is a message to ourselves */
909         if (vnn == ctdb->vnn) {
910                 return ctdb_local_message(ctdb, srvid, data);
911         }
912
913         len = offsetof(struct ctdb_req_message, data) + data.dsize;
914         r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_MESSAGE, len,
915                                     struct ctdb_req_message);
916         CTDB_NO_MEMORY(ctdb, r);
917
918         r->hdr.destnode  = vnn;
919         r->srvid         = srvid;
920         r->datalen       = data.dsize;
921         memcpy(&r->data[0], data.dptr, data.dsize);
922
923         ctdb_queue_packet(ctdb, &r->hdr);
924
925         talloc_free(r);
926         return 0;
927 }
928