r23806: update Samba4 with the latest ctdb code.
[jra/samba/.git] / source4 / cluster / ctdb / server / ctdb_daemon.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2006
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "db_wrap.h"
22 #include "lib/tdb/include/tdb.h"
23 #include "lib/events/events.h"
24 #include "lib/util/dlinklist.h"
25 #include "system/network.h"
26 #include "system/filesys.h"
27 #include "system/wait.h"
28 #include "../include/ctdb.h"
29 #include "../include/ctdb_private.h"
30
31 static void daemon_incoming_packet(void *, struct ctdb_req_header *);
32
33 /*
34   handler for when a node changes its flags
35 */
36 static void flag_change_handler(struct ctdb_context *ctdb, uint64_t srvid, 
37                                 TDB_DATA data, void *private_data)
38 {
39         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
40
41         if (data.dsize != sizeof(*c) || !ctdb_validate_vnn(ctdb, c->vnn)) {
42                 DEBUG(0,(__location__ "Invalid data in ctdb_node_flag_change\n"));
43                 return;
44         }
45
46         if (!ctdb_validate_vnn(ctdb, c->vnn)) {
47                 DEBUG(0,("Bad vnn %u in flag_change_handler\n", c->vnn));
48                 return;
49         }
50
51         /* don't get the disconnected flag from the other node */
52         ctdb->nodes[c->vnn]->flags = 
53                 (ctdb->nodes[c->vnn]->flags&NODE_FLAGS_DISCONNECTED) 
54                 | (c->flags & ~NODE_FLAGS_DISCONNECTED);        
55         DEBUG(2,("Node flags for node %u are now 0x%x\n", c->vnn, ctdb->nodes[c->vnn]->flags));
56
57         /* make sure we don't hold any IPs when we shouldn't */
58         if (c->vnn == ctdb->vnn &&
59             (ctdb->nodes[c->vnn]->flags & (NODE_FLAGS_INACTIVE|NODE_FLAGS_BANNED))) {
60                 ctdb_release_all_ips(ctdb);
61         }
62 }
63
64 /* called when the "startup" event script has finished */
65 static void ctdb_start_transport(struct ctdb_context *ctdb, int status, void *p)
66 {
67         if (status != 0) {
68                 DEBUG(0,("startup event failed!\n"));
69                 ctdb_fatal(ctdb, "startup event script failed");                
70         }
71
72         /* start the transport running */
73         if (ctdb->methods->start(ctdb) != 0) {
74                 DEBUG(0,("transport failed to start!\n"));
75                 ctdb_fatal(ctdb, "transport failed to start");
76         }
77
78         /* start the recovery daemon process */
79         if (ctdb_start_recoverd(ctdb) != 0) {
80                 DEBUG(0,("Failed to start recovery daemon\n"));
81                 exit(11);
82         }
83
84         /* a handler for when nodes are disabled/enabled */
85         ctdb_register_message_handler(ctdb, ctdb, CTDB_SRVID_NODE_FLAGS_CHANGED, 
86                                       flag_change_handler, NULL);
87
88         /* start monitoring for dead nodes */
89         ctdb_start_monitoring(ctdb);
90 }
91
92 /* go into main ctdb loop */
93 static void ctdb_main_loop(struct ctdb_context *ctdb)
94 {
95         int ret = -1;
96
97         if (strcmp(ctdb->transport, "tcp") == 0) {
98                 int ctdb_tcp_init(struct ctdb_context *);
99                 ret = ctdb_tcp_init(ctdb);
100         }
101 #ifdef USE_INFINIBAND
102         if (strcmp(ctdb->transport, "ib") == 0) {
103                 int ctdb_ibw_init(struct ctdb_context *);
104                 ret = ctdb_ibw_init(ctdb);
105         }
106 #endif
107         if (ret != 0) {
108                 DEBUG(0,("Failed to initialise transport '%s'\n", ctdb->transport));
109                 return;
110         }
111
112         /* initialise the transport  */
113         if (ctdb->methods->initialise(ctdb) != 0) {
114                 DEBUG(0,("transport failed to initialise!\n"));
115                 ctdb_fatal(ctdb, "transport failed to initialise");
116         }
117
118         /* tell all other nodes we've just started up */
119         ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL,
120                                  0, CTDB_CONTROL_STARTUP, 0,
121                                  CTDB_CTRL_FLAG_NOREPLY,
122                                  tdb_null, NULL, NULL);
123
124         /* release any IPs we hold from previous runs of the daemon */
125         ctdb_release_all_ips(ctdb);
126
127         ret = ctdb_event_script_callback(ctdb, timeval_zero(), ctdb, 
128                                          ctdb_start_transport, NULL, "startup");
129         if (ret != 0) {
130                 DEBUG(0,("Failed startup event script\n"));
131                 return;
132         }
133
134         /* go into a wait loop to allow other nodes to complete */
135         event_loop_wait(ctdb->ev);
136
137         DEBUG(0,("event_loop_wait() returned. this should not happen\n"));
138         exit(1);
139 }
140
141
142 static void block_signal(int signum)
143 {
144         struct sigaction act;
145
146         memset(&act, 0, sizeof(act));
147
148         act.sa_handler = SIG_IGN;
149         sigemptyset(&act.sa_mask);
150         sigaddset(&act.sa_mask, signum);
151         sigaction(signum, &act, NULL);
152 }
153
154
155 /*
156   send a packet to a client
157  */
158 static int daemon_queue_send(struct ctdb_client *client, struct ctdb_req_header *hdr)
159 {
160         client->ctdb->statistics.client_packets_sent++;
161         return ctdb_queue_send(client->queue, (uint8_t *)hdr, hdr->length);
162 }
163
164 /*
165   message handler for when we are in daemon mode. This redirects the message
166   to the right client
167  */
168 static void daemon_message_handler(struct ctdb_context *ctdb, uint64_t srvid, 
169                                     TDB_DATA data, void *private_data)
170 {
171         struct ctdb_client *client = talloc_get_type(private_data, struct ctdb_client);
172         struct ctdb_req_message *r;
173         int len;
174
175         /* construct a message to send to the client containing the data */
176         len = offsetof(struct ctdb_req_message, data) + data.dsize;
177         r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE, 
178                                len, struct ctdb_req_message);
179         CTDB_NO_MEMORY_VOID(ctdb, r);
180
181         talloc_set_name_const(r, "req_message packet");
182
183         r->srvid         = srvid;
184         r->datalen       = data.dsize;
185         memcpy(&r->data[0], data.dptr, data.dsize);
186
187         daemon_queue_send(client, &r->hdr);
188
189         talloc_free(r);
190 }
191                                            
192
193 /*
194   this is called when the ctdb daemon received a ctdb request to 
195   set the srvid from the client
196  */
197 int daemon_register_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
198 {
199         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
200         int res;
201         if (client == NULL) {
202                 DEBUG(0,("Bad client_id in daemon_request_register_message_handler\n"));
203                 return -1;
204         }
205         res = ctdb_register_message_handler(ctdb, client, srvid, daemon_message_handler, client);
206         if (res != 0) {
207                 DEBUG(0,(__location__ " Failed to register handler %llu in daemon\n", 
208                          (unsigned long long)srvid));
209         } else {
210                 DEBUG(2,(__location__ " Registered message handler for srvid=%llu\n", 
211                          (unsigned long long)srvid));
212         }
213
214         /* this is a hack for Samba - we now know the pid of the Samba client */
215         if ((srvid & 0xFFFFFFFF) == srvid &&
216             kill(srvid, 0) == 0) {
217                 client->pid = srvid;
218                 DEBUG(0,(__location__ " Registered PID %u for client %u\n",
219                          (unsigned)client->pid, client_id));
220         }
221         return res;
222 }
223
224 /*
225   this is called when the ctdb daemon received a ctdb request to 
226   remove a srvid from the client
227  */
228 int daemon_deregister_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
229 {
230         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
231         if (client == NULL) {
232                 DEBUG(0,("Bad client_id in daemon_request_deregister_message_handler\n"));
233                 return -1;
234         }
235         return ctdb_deregister_message_handler(ctdb, srvid, client);
236 }
237
238
239 /*
240   destroy a ctdb_client
241 */
242 static int ctdb_client_destructor(struct ctdb_client *client)
243 {
244         ctdb_takeover_client_destructor_hook(client);
245         ctdb_reqid_remove(client->ctdb, client->client_id);
246         client->ctdb->statistics.num_clients--;
247         return 0;
248 }
249
250
251 /*
252   this is called when the ctdb daemon received a ctdb request message
253   from a local client over the unix domain socket
254  */
255 static void daemon_request_message_from_client(struct ctdb_client *client, 
256                                                struct ctdb_req_message *c)
257 {
258         TDB_DATA data;
259         int res;
260
261         /* maybe the message is for another client on this node */
262         if (ctdb_get_vnn(client->ctdb)==c->hdr.destnode) {
263                 ctdb_request_message(client->ctdb, (struct ctdb_req_header *)c);
264                 return;
265         }
266
267         /* its for a remote node */
268         data.dptr = &c->data[0];
269         data.dsize = c->datalen;
270         res = ctdb_daemon_send_message(client->ctdb, c->hdr.destnode,
271                                        c->srvid, data);
272         if (res != 0) {
273                 DEBUG(0,(__location__ " Failed to send message to remote node %u\n",
274                          c->hdr.destnode));
275         }
276 }
277
278
279 struct daemon_call_state {
280         struct ctdb_client *client;
281         uint32_t reqid;
282         struct ctdb_call *call;
283         struct timeval start_time;
284 };
285
286 /* 
287    complete a call from a client 
288 */
289 static void daemon_call_from_client_callback(struct ctdb_call_state *state)
290 {
291         struct daemon_call_state *dstate = talloc_get_type(state->async.private_data, 
292                                                            struct daemon_call_state);
293         struct ctdb_reply_call *r;
294         int res;
295         uint32_t length;
296         struct ctdb_client *client = dstate->client;
297
298         talloc_steal(client, dstate);
299         talloc_steal(dstate, dstate->call);
300
301         res = ctdb_daemon_call_recv(state, dstate->call);
302         if (res != 0) {
303                 DEBUG(0, (__location__ " ctdbd_call_recv() returned error\n"));
304                 client->ctdb->statistics.pending_calls--;
305                 ctdb_latency(&client->ctdb->statistics.max_call_latency, dstate->start_time);
306                 return;
307         }
308
309         length = offsetof(struct ctdb_reply_call, data) + dstate->call->reply_data.dsize;
310         r = ctdbd_allocate_pkt(client->ctdb, dstate, CTDB_REPLY_CALL, 
311                                length, struct ctdb_reply_call);
312         if (r == NULL) {
313                 DEBUG(0, (__location__ " Failed to allocate reply_call in ctdb daemon\n"));
314                 client->ctdb->statistics.pending_calls--;
315                 ctdb_latency(&client->ctdb->statistics.max_call_latency, dstate->start_time);
316                 return;
317         }
318         r->hdr.reqid        = dstate->reqid;
319         r->datalen          = dstate->call->reply_data.dsize;
320         memcpy(&r->data[0], dstate->call->reply_data.dptr, r->datalen);
321
322         res = daemon_queue_send(client, &r->hdr);
323         if (res != 0) {
324                 DEBUG(0, (__location__ " Failed to queue packet from daemon to client\n"));
325         }
326         ctdb_latency(&client->ctdb->statistics.max_call_latency, dstate->start_time);
327         talloc_free(dstate);
328         client->ctdb->statistics.pending_calls--;
329 }
330
331
332 static void daemon_request_call_from_client(struct ctdb_client *client, 
333                                             struct ctdb_req_call *c);
334
335 /*
336   this is called when the ctdb daemon received a ctdb request call
337   from a local client over the unix domain socket
338  */
339 static void daemon_request_call_from_client(struct ctdb_client *client, 
340                                             struct ctdb_req_call *c)
341 {
342         struct ctdb_call_state *state;
343         struct ctdb_db_context *ctdb_db;
344         struct daemon_call_state *dstate;
345         struct ctdb_call *call;
346         struct ctdb_ltdb_header header;
347         TDB_DATA key, data;
348         int ret;
349         struct ctdb_context *ctdb = client->ctdb;
350
351         ctdb->statistics.total_calls++;
352         ctdb->statistics.pending_calls++;
353
354         ctdb_db = find_ctdb_db(client->ctdb, c->db_id);
355         if (!ctdb_db) {
356                 DEBUG(0, (__location__ " Unknown database in request. db_id==0x%08x",
357                           c->db_id));
358                 ctdb->statistics.pending_calls--;
359                 return;
360         }
361
362         key.dptr = c->data;
363         key.dsize = c->keylen;
364
365         ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header, 
366                                            (struct ctdb_req_header *)c, &data,
367                                            daemon_incoming_packet, client, True);
368         if (ret == -2) {
369                 /* will retry later */
370                 ctdb->statistics.pending_calls--;
371                 return;
372         }
373
374         if (ret != 0) {
375                 DEBUG(0,(__location__ " Unable to fetch record\n"));
376                 ctdb->statistics.pending_calls--;
377                 return;
378         }
379
380         dstate = talloc(client, struct daemon_call_state);
381         if (dstate == NULL) {
382                 ctdb_ltdb_unlock(ctdb_db, key);
383                 DEBUG(0,(__location__ " Unable to allocate dstate\n"));
384                 ctdb->statistics.pending_calls--;
385                 return;
386         }
387         dstate->start_time = timeval_current();
388         dstate->client = client;
389         dstate->reqid  = c->hdr.reqid;
390         talloc_steal(dstate, data.dptr);
391
392         call = dstate->call = talloc_zero(dstate, struct ctdb_call);
393         if (call == NULL) {
394                 ctdb_ltdb_unlock(ctdb_db, key);
395                 DEBUG(0,(__location__ " Unable to allocate call\n"));
396                 ctdb->statistics.pending_calls--;
397                 ctdb_latency(&ctdb->statistics.max_call_latency, dstate->start_time);
398                 return;
399         }
400
401         call->call_id = c->callid;
402         call->key = key;
403         call->call_data.dptr = c->data + c->keylen;
404         call->call_data.dsize = c->calldatalen;
405         call->flags = c->flags;
406
407         if (header.dmaster == ctdb->vnn) {
408                 state = ctdb_call_local_send(ctdb_db, call, &header, &data);
409         } else {
410                 state = ctdb_daemon_call_send_remote(ctdb_db, call, &header);
411         }
412
413         ctdb_ltdb_unlock(ctdb_db, key);
414
415         if (state == NULL) {
416                 DEBUG(0,(__location__ " Unable to setup call send\n"));
417                 ctdb->statistics.pending_calls--;
418                 ctdb_latency(&ctdb->statistics.max_call_latency, dstate->start_time);
419                 return;
420         }
421         talloc_steal(state, dstate);
422         talloc_steal(client, state);
423
424         state->async.fn = daemon_call_from_client_callback;
425         state->async.private_data = dstate;
426 }
427
428
429 static void daemon_request_control_from_client(struct ctdb_client *client, 
430                                                struct ctdb_req_control *c);
431
432 /* data contains a packet from the client */
433 static void daemon_incoming_packet(void *p, struct ctdb_req_header *hdr)
434 {
435         struct ctdb_client *client = talloc_get_type(p, struct ctdb_client);
436         TALLOC_CTX *tmp_ctx;
437         struct ctdb_context *ctdb = client->ctdb;
438
439         /* place the packet as a child of a tmp_ctx. We then use
440            talloc_free() below to free it. If any of the calls want
441            to keep it, then they will steal it somewhere else, and the
442            talloc_free() will be a no-op */
443         tmp_ctx = talloc_new(client);
444         talloc_steal(tmp_ctx, hdr);
445
446         if (hdr->ctdb_magic != CTDB_MAGIC) {
447                 ctdb_set_error(client->ctdb, "Non CTDB packet rejected in daemon\n");
448                 goto done;
449         }
450
451         if (hdr->ctdb_version != CTDB_VERSION) {
452                 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
453                 goto done;
454         }
455
456         switch (hdr->operation) {
457         case CTDB_REQ_CALL:
458                 ctdb->statistics.client.req_call++;
459                 daemon_request_call_from_client(client, (struct ctdb_req_call *)hdr);
460                 break;
461
462         case CTDB_REQ_MESSAGE:
463                 ctdb->statistics.client.req_message++;
464                 daemon_request_message_from_client(client, (struct ctdb_req_message *)hdr);
465                 break;
466
467         case CTDB_REQ_CONTROL:
468                 ctdb->statistics.client.req_control++;
469                 daemon_request_control_from_client(client, (struct ctdb_req_control *)hdr);
470                 break;
471
472         default:
473                 DEBUG(0,(__location__ " daemon: unrecognized operation %u\n",
474                          hdr->operation));
475         }
476
477 done:
478         talloc_free(tmp_ctx);
479 }
480
481 /*
482   called when the daemon gets a incoming packet
483  */
484 static void ctdb_daemon_read_cb(uint8_t *data, size_t cnt, void *args)
485 {
486         struct ctdb_client *client = talloc_get_type(args, struct ctdb_client);
487         struct ctdb_req_header *hdr;
488
489         if (cnt == 0) {
490                 talloc_free(client);
491                 return;
492         }
493
494         client->ctdb->statistics.client_packets_recv++;
495
496         if (cnt < sizeof(*hdr)) {
497                 ctdb_set_error(client->ctdb, "Bad packet length %u in daemon\n", 
498                                (unsigned)cnt);
499                 return;
500         }
501         hdr = (struct ctdb_req_header *)data;
502         if (cnt != hdr->length) {
503                 ctdb_set_error(client->ctdb, "Bad header length %u expected %u\n in daemon", 
504                                (unsigned)hdr->length, (unsigned)cnt);
505                 return;
506         }
507
508         if (hdr->ctdb_magic != CTDB_MAGIC) {
509                 ctdb_set_error(client->ctdb, "Non CTDB packet rejected\n");
510                 return;
511         }
512
513         if (hdr->ctdb_version != CTDB_VERSION) {
514                 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
515                 return;
516         }
517
518         DEBUG(3,(__location__ " client request %u of type %u length %u from "
519                  "node %u to %u\n", hdr->reqid, hdr->operation, hdr->length,
520                  hdr->srcnode, hdr->destnode));
521
522         /* it is the responsibility of the incoming packet function to free 'data' */
523         daemon_incoming_packet(client, hdr);
524 }
525
526 static void ctdb_accept_client(struct event_context *ev, struct fd_event *fde, 
527                          uint16_t flags, void *private_data)
528 {
529         struct sockaddr_in addr;
530         socklen_t len;
531         int fd;
532         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
533         struct ctdb_client *client;
534
535         memset(&addr, 0, sizeof(addr));
536         len = sizeof(addr);
537         fd = accept(ctdb->daemon.sd, (struct sockaddr *)&addr, &len);
538         if (fd == -1) {
539                 return;
540         }
541
542         set_nonblocking(fd);
543         set_close_on_exec(fd);
544
545         client = talloc_zero(ctdb, struct ctdb_client);
546         client->ctdb = ctdb;
547         client->fd = fd;
548         client->client_id = ctdb_reqid_new(ctdb, client);
549         ctdb->statistics.num_clients++;
550
551         client->queue = ctdb_queue_setup(ctdb, client, fd, CTDB_DS_ALIGNMENT, 
552                                          ctdb_daemon_read_cb, client);
553
554         talloc_set_destructor(client, ctdb_client_destructor);
555 }
556
557
558
559 /*
560   create a unix domain socket and bind it
561   return a file descriptor open on the socket 
562 */
563 static int ux_socket_bind(struct ctdb_context *ctdb)
564 {
565         struct sockaddr_un addr;
566
567         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
568         if (ctdb->daemon.sd == -1) {
569                 return -1;
570         }
571
572         set_nonblocking(ctdb->daemon.sd);
573         set_close_on_exec(ctdb->daemon.sd);
574
575 #if 0
576         /* AIX doesn't like this :( */
577         if (fchown(ctdb->daemon.sd, geteuid(), getegid()) != 0 ||
578             fchmod(ctdb->daemon.sd, 0700) != 0) {
579                 DEBUG(0,("Unable to secure ctdb socket '%s', ctdb->daemon.name\n"));
580                 goto failed;
581         }
582 #endif
583
584         set_nonblocking(ctdb->daemon.sd);
585
586         memset(&addr, 0, sizeof(addr));
587         addr.sun_family = AF_UNIX;
588         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
589
590         if (bind(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
591                 DEBUG(0,("Unable to bind on ctdb socket '%s'\n", ctdb->daemon.name));
592                 goto failed;
593         }       
594         if (listen(ctdb->daemon.sd, 10) != 0) {
595                 DEBUG(0,("Unable to listen on ctdb socket '%s'\n", ctdb->daemon.name));
596                 goto failed;
597         }
598
599         return 0;
600
601 failed:
602         close(ctdb->daemon.sd);
603         ctdb->daemon.sd = -1;
604         return -1;      
605 }
606
607 /*
608   delete the socket on exit - called on destruction of autofree context
609  */
610 static int unlink_destructor(const char *name)
611 {
612         unlink(name);
613         return 0;
614 }
615
616
617 /*
618   start the protocol going as a daemon
619 */
620 int ctdb_start_daemon(struct ctdb_context *ctdb, bool do_fork)
621 {
622         int res;
623         struct fd_event *fde;
624         const char *domain_socket_name;
625
626         /* get rid of any old sockets */
627         unlink(ctdb->daemon.name);
628
629         /* create a unix domain stream socket to listen to */
630         res = ux_socket_bind(ctdb);
631         if (res!=0) {
632                 DEBUG(0,(__location__ " Failed to open CTDB unix domain socket\n"));
633                 exit(10);
634         }
635
636         if (do_fork && fork()) {
637                 return 0;
638         }
639
640         tdb_reopen_all(False);
641
642         if (do_fork) {
643                 setsid();
644         }
645         block_signal(SIGPIPE);
646
647         /* try to set us up as realtime */
648         ctdb_set_realtime(true);
649
650         /* ensure the socket is deleted on exit of the daemon */
651         domain_socket_name = talloc_strdup(talloc_autofree_context(), ctdb->daemon.name);
652         talloc_set_destructor(domain_socket_name, unlink_destructor);   
653
654         ctdb->ev = event_context_init(NULL);
655
656         /* start frozen, then let the first election sort things out */
657         if (!ctdb_blocking_freeze(ctdb)) {
658                 DEBUG(0,("Failed to get initial freeze\n"));
659                 exit(12);
660         }
661
662         /* force initial recovery for election */
663         ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
664
665         /* now start accepting clients, only can do this once frozen */
666         fde = event_add_fd(ctdb->ev, ctdb, ctdb->daemon.sd, 
667                            EVENT_FD_READ|EVENT_FD_AUTOCLOSE, 
668                            ctdb_accept_client, ctdb);
669
670         ctdb_main_loop(ctdb);
671
672         return 0;
673 }
674
675 /*
676   allocate a packet for use in daemon<->daemon communication
677  */
678 struct ctdb_req_header *_ctdb_transport_allocate(struct ctdb_context *ctdb,
679                                                  TALLOC_CTX *mem_ctx, 
680                                                  enum ctdb_operation operation, 
681                                                  size_t length, size_t slength,
682                                                  const char *type)
683 {
684         int size;
685         struct ctdb_req_header *hdr;
686
687         length = MAX(length, slength);
688         size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
689
690         hdr = (struct ctdb_req_header *)ctdb->methods->allocate_pkt(mem_ctx, size);
691         if (hdr == NULL) {
692                 DEBUG(0,("Unable to allocate transport packet for operation %u of length %u\n",
693                          operation, (unsigned)length));
694                 return NULL;
695         }
696         talloc_set_name_const(hdr, type);
697         memset(hdr, 0, slength);
698         hdr->length       = length;
699         hdr->operation    = operation;
700         hdr->ctdb_magic   = CTDB_MAGIC;
701         hdr->ctdb_version = CTDB_VERSION;
702         hdr->generation   = ctdb->vnn_map->generation;
703         hdr->srcnode      = ctdb->vnn;
704
705         return hdr;     
706 }
707
708 struct daemon_control_state {
709         struct daemon_control_state *next, *prev;
710         struct ctdb_client *client;
711         struct ctdb_req_control *c;
712         uint32_t reqid;
713         struct ctdb_node *node;
714 };
715
716 /*
717   callback when a control reply comes in
718  */
719 static void daemon_control_callback(struct ctdb_context *ctdb,
720                                     int32_t status, TDB_DATA data, 
721                                     const char *errormsg,
722                                     void *private_data)
723 {
724         struct daemon_control_state *state = talloc_get_type(private_data, 
725                                                              struct daemon_control_state);
726         struct ctdb_client *client = state->client;
727         struct ctdb_reply_control *r;
728         size_t len;
729
730         /* construct a message to send to the client containing the data */
731         len = offsetof(struct ctdb_reply_control, data) + data.dsize;
732         if (errormsg) {
733                 len += strlen(errormsg);
734         }
735         r = ctdbd_allocate_pkt(ctdb, state, CTDB_REPLY_CONTROL, len, 
736                                struct ctdb_reply_control);
737         CTDB_NO_MEMORY_VOID(ctdb, r);
738
739         r->hdr.reqid     = state->reqid;
740         r->status        = status;
741         r->datalen       = data.dsize;
742         r->errorlen = 0;
743         memcpy(&r->data[0], data.dptr, data.dsize);
744         if (errormsg) {
745                 r->errorlen = strlen(errormsg);
746                 memcpy(&r->data[r->datalen], errormsg, r->errorlen);
747         }
748
749         daemon_queue_send(client, &r->hdr);
750
751         talloc_free(state);
752 }
753
754 /*
755   fail all pending controls to a disconnected node
756  */
757 void ctdb_daemon_cancel_controls(struct ctdb_context *ctdb, struct ctdb_node *node)
758 {
759         struct daemon_control_state *state;
760         while ((state = node->pending_controls)) {
761                 DLIST_REMOVE(node->pending_controls, state);
762                 daemon_control_callback(ctdb, (uint32_t)-1, tdb_null, 
763                                         "node is disconnected", state);
764         }
765 }
766
767 /*
768   destroy a daemon_control_state
769  */
770 static int daemon_control_destructor(struct daemon_control_state *state)
771 {
772         if (state->node) {
773                 DLIST_REMOVE(state->node->pending_controls, state);
774         }
775         return 0;
776 }
777
778 /*
779   this is called when the ctdb daemon received a ctdb request control
780   from a local client over the unix domain socket
781  */
782 static void daemon_request_control_from_client(struct ctdb_client *client, 
783                                                struct ctdb_req_control *c)
784 {
785         TDB_DATA data;
786         int res;
787         struct daemon_control_state *state;
788         TALLOC_CTX *tmp_ctx = talloc_new(client);
789
790         if (c->hdr.destnode == CTDB_CURRENT_NODE) {
791                 c->hdr.destnode = client->ctdb->vnn;
792         }
793
794         state = talloc(client, struct daemon_control_state);
795         CTDB_NO_MEMORY_VOID(client->ctdb, state);
796
797         state->client = client;
798         state->c = talloc_steal(state, c);
799         state->reqid = c->hdr.reqid;
800         if (ctdb_validate_vnn(client->ctdb, c->hdr.destnode)) {
801                 state->node = client->ctdb->nodes[c->hdr.destnode];
802                 DLIST_ADD(state->node->pending_controls, state);
803         } else {
804                 state->node = NULL;
805         }
806
807         talloc_set_destructor(state, daemon_control_destructor);
808
809         if (c->flags & CTDB_CTRL_FLAG_NOREPLY) {
810                 talloc_steal(tmp_ctx, state);
811         }
812         
813         data.dptr = &c->data[0];
814         data.dsize = c->datalen;
815         res = ctdb_daemon_send_control(client->ctdb, c->hdr.destnode,
816                                        c->srvid, c->opcode, client->client_id,
817                                        c->flags,
818                                        data, daemon_control_callback,
819                                        state);
820         if (res != 0) {
821                 DEBUG(0,(__location__ " Failed to send control to remote node %u\n",
822                          c->hdr.destnode));
823         }
824
825         talloc_free(tmp_ctx);
826 }
827
828 /*
829   register a call function
830 */
831 int ctdb_daemon_set_call(struct ctdb_context *ctdb, uint32_t db_id,
832                          ctdb_fn_t fn, int id)
833 {
834         struct ctdb_registered_call *call;
835         struct ctdb_db_context *ctdb_db;
836
837         ctdb_db = find_ctdb_db(ctdb, db_id);
838         if (ctdb_db == NULL) {
839                 return -1;
840         }
841
842         call = talloc(ctdb_db, struct ctdb_registered_call);
843         call->fn = fn;
844         call->id = id;
845
846         DLIST_ADD(ctdb_db->calls, call);        
847         return 0;
848 }
849
850
851
852 /*
853   this local messaging handler is ugly, but is needed to prevent
854   recursion in ctdb_send_message() when the destination node is the
855   same as the source node
856  */
857 struct ctdb_local_message {
858         struct ctdb_context *ctdb;
859         uint64_t srvid;
860         TDB_DATA data;
861 };
862
863 static void ctdb_local_message_trigger(struct event_context *ev, struct timed_event *te, 
864                                        struct timeval t, void *private_data)
865 {
866         struct ctdb_local_message *m = talloc_get_type(private_data, 
867                                                        struct ctdb_local_message);
868         int res;
869
870         res = ctdb_dispatch_message(m->ctdb, m->srvid, m->data);
871         if (res != 0) {
872                 DEBUG(0, (__location__ " Failed to dispatch message for srvid=%llu\n", 
873                           (unsigned long long)m->srvid));
874         }
875         talloc_free(m);
876 }
877
878 static int ctdb_local_message(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data)
879 {
880         struct ctdb_local_message *m;
881         m = talloc(ctdb, struct ctdb_local_message);
882         CTDB_NO_MEMORY(ctdb, m);
883
884         m->ctdb = ctdb;
885         m->srvid = srvid;
886         m->data  = data;
887         m->data.dptr = talloc_memdup(m, m->data.dptr, m->data.dsize);
888         if (m->data.dptr == NULL) {
889                 talloc_free(m);
890                 return -1;
891         }
892
893         /* this needs to be done as an event to prevent recursion */
894         event_add_timed(ctdb->ev, m, timeval_zero(), ctdb_local_message_trigger, m);
895         return 0;
896 }
897
898 /*
899   send a ctdb message
900 */
901 int ctdb_daemon_send_message(struct ctdb_context *ctdb, uint32_t vnn,
902                              uint64_t srvid, TDB_DATA data)
903 {
904         struct ctdb_req_message *r;
905         int len;
906
907         /* see if this is a message to ourselves */
908         if (vnn == ctdb->vnn) {
909                 return ctdb_local_message(ctdb, srvid, data);
910         }
911
912         len = offsetof(struct ctdb_req_message, data) + data.dsize;
913         r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_MESSAGE, len,
914                                     struct ctdb_req_message);
915         CTDB_NO_MEMORY(ctdb, r);
916
917         r->hdr.destnode  = vnn;
918         r->srvid         = srvid;
919         r->datalen       = data.dsize;
920         memcpy(&r->data[0], data.dptr, data.dsize);
921
922         ctdb_queue_packet(ctdb, &r->hdr);
923
924         talloc_free(r);
925         return 0;
926 }
927