merge from ronnie
[samba.git] / ctdb / common / ctdb_daemon.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2006
5
6    This library is free software; you can redistribute it and/or
7    modify it under the terms of the GNU Lesser General Public
8    License as published by the Free Software Foundation; either
9    version 2 of the License, or (at your option) any later version.
10
11    This library is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14    Lesser General Public License for more details.
15
16    You should have received a copy of the GNU Lesser General Public
17    License along with this library; if not, write to the Free Software
18    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
19 */
20
21 #include "includes.h"
22 #include "db_wrap.h"
23 #include "lib/tdb/include/tdb.h"
24 #include "lib/events/events.h"
25 #include "lib/util/dlinklist.h"
26 #include "system/network.h"
27 #include "system/filesys.h"
28 #include "system/wait.h"
29 #include "../include/ctdb.h"
30 #include "../include/ctdb_private.h"
31
32 static void ctdb_main_loop(struct ctdb_context *ctdb)
33 {
34         ctdb->methods->start(ctdb);
35
36         /* go into a wait loop to allow other nodes to complete */
37         event_loop_wait(ctdb->ev);
38
39         DEBUG(0,("event_loop_wait() returned. this should not happen\n"));
40         exit(1);
41 }
42
43
44 static void set_non_blocking(int fd)
45 {
46         unsigned v;
47         v = fcntl(fd, F_GETFL, 0);
48         fcntl(fd, F_SETFL, v | O_NONBLOCK);
49 }
50
51 static void block_signal(int signum)
52 {
53         struct sigaction act;
54
55         memset(&act, 0, sizeof(act));
56
57         act.sa_handler = SIG_IGN;
58         sigemptyset(&act.sa_mask);
59         sigaddset(&act.sa_mask, signum);
60         sigaction(signum, &act, NULL);
61 }
62
63
64 /*
65   structure describing a connected client in the daemon
66  */
67 struct ctdb_client {
68         struct ctdb_context *ctdb;
69         int fd;
70         struct ctdb_queue *queue;
71 };
72
73
74 /*
75   message handler for when we are in daemon mode. This redirects the message
76   to the right client
77  */
78 static void daemon_message_handler(struct ctdb_context *ctdb, uint32_t srvid, 
79                                     TDB_DATA data, void *private_data)
80 {
81         struct ctdb_client *client = talloc_get_type(private_data, struct ctdb_client);
82         struct ctdb_req_message *r;
83         int len;
84
85         /* construct a message to send to the client containing the data */
86         len = offsetof(struct ctdb_req_message, data) + data.dsize;
87         r = ctdbd_allocate_pkt(ctdb, len);
88
89         talloc_set_name_const(r, "req_message packet");
90
91         memset(r, 0, offsetof(struct ctdb_req_message, data));
92
93         r->hdr.length    = len;
94         r->hdr.ctdb_magic = CTDB_MAGIC;
95         r->hdr.ctdb_version = CTDB_VERSION;
96         r->hdr.operation = CTDB_REQ_MESSAGE;
97         r->srvid         = srvid;
98         r->datalen       = data.dsize;
99         memcpy(&r->data[0], data.dptr, data.dsize);
100
101         ctdb_queue_send(client->queue, (uint8_t *)&r->hdr, len);
102
103         talloc_free(r);
104 }
105                                            
106
107 /*
108   this is called when the ctdb daemon received a ctdb request to 
109   set the srvid from the client
110  */
111 static void daemon_request_register_message_handler(struct ctdb_client *client, 
112                                                     struct ctdb_req_register *c)
113 {
114         int res;
115         res = ctdb_register_message_handler(client->ctdb, client, 
116                                             c->srvid, daemon_message_handler, 
117                                             client);
118         if (res != 0) {
119                 DEBUG(0,(__location__ " Failed to register handler %u in daemon\n", 
120                          c->srvid));
121         }
122 }
123
124
125 /*
126   called when the daemon gets a shutdown request from a client
127  */
128 static void daemon_request_shutdown(struct ctdb_client *client, 
129                                       struct ctdb_req_shutdown *f)
130 {
131         struct ctdb_context *ctdb = talloc_get_type(client->ctdb, struct ctdb_context);
132         int len;
133         uint32_t node;
134
135         /* we dont send to ourself so we can already count one daemon as
136            exiting */
137         ctdb->num_finished++;
138
139
140         /* loop over all nodes of the cluster */
141         for (node=0; node<ctdb->num_nodes;node++) {
142                 struct ctdb_req_finished *rf;
143
144                 /* dont send a message to ourself */
145                 if (ctdb->vnn == node) {
146                         continue;
147                 }
148
149                 len = sizeof(struct ctdb_req_finished);
150                 rf = ctdb->methods->allocate_pkt(ctdb, len);
151                 CTDB_NO_MEMORY_FATAL(ctdb, rf);
152                 talloc_set_name_const(rf, "ctdb_req_finished packet");
153
154                 ZERO_STRUCT(*rf);
155                 rf->hdr.length    = len;
156                 rf->hdr.ctdb_magic = CTDB_MAGIC;
157                 rf->hdr.ctdb_version = CTDB_VERSION;
158                 rf->hdr.operation = CTDB_REQ_FINISHED;
159                 rf->hdr.destnode  = node;
160                 rf->hdr.srcnode   = ctdb->vnn;
161                 rf->hdr.reqid     = 0;
162
163                 ctdb_queue_packet(ctdb, &(rf->hdr));
164
165                 talloc_free(rf);
166         }
167
168         /* wait until all nodes have are prepared to shutdown */
169         while (ctdb->num_finished != ctdb->num_nodes) {
170                 event_loop_once(ctdb->ev);
171         }
172
173         /* all daemons have requested to finish - we now exit */
174         DEBUG(1,("All daemons finished - exiting\n"));
175         _exit(0);
176 }
177
178
179
180 /*
181   called when the daemon gets a connect wait request from a client
182  */
183 static void daemon_request_connect_wait(struct ctdb_client *client, 
184                                         struct ctdb_req_connect_wait *c)
185 {
186         struct ctdb_reply_connect_wait r;
187         int res;
188
189         /* first wait - in the daemon */
190         ctdb_daemon_connect_wait(client->ctdb);
191
192         /* now send the reply */
193         ZERO_STRUCT(r);
194
195         r.hdr.length     = sizeof(r);
196         r.hdr.ctdb_magic = CTDB_MAGIC;
197         r.hdr.ctdb_version = CTDB_VERSION;
198         r.hdr.operation = CTDB_REPLY_CONNECT_WAIT;
199         r.vnn           = ctdb_get_vnn(client->ctdb);
200         r.num_connected = client->ctdb->num_connected;
201         
202         res = ctdb_queue_send(client->queue, (uint8_t *)&r.hdr, r.hdr.length);
203         if (res != 0) {
204                 DEBUG(0,(__location__ " Failed to queue a connect wait response\n"));
205                 return;
206         }
207 }
208
209 /*
210   destroy a ctdb_client
211 */
212 static int ctdb_client_destructor(struct ctdb_client *client)
213 {
214         close(client->fd);
215         client->fd = -1;
216         return 0;
217 }
218
219
220 /*
221   this is called when the ctdb daemon received a ctdb request message
222   from a local client over the unix domain socket
223  */
224 static void daemon_request_message_from_client(struct ctdb_client *client, 
225                                                struct ctdb_req_message *c)
226 {
227         TDB_DATA data;
228         int res;
229
230         /* maybe the message is for another client on this node */
231         if (ctdb_get_vnn(client->ctdb)==c->hdr.destnode) {
232                 ctdb_request_message(client->ctdb, (struct ctdb_req_header *)c);
233                 return;
234         }
235         
236         /* its for a remote node */
237         data.dptr = &c->data[0];
238         data.dsize = c->datalen;
239         res = ctdb_daemon_send_message(client->ctdb, c->hdr.destnode,
240                                        c->srvid, data);
241         if (res != 0) {
242                 DEBUG(0,(__location__ " Failed to send message to remote node %u\n",
243                          c->hdr.destnode));
244         }
245 }
246
247
248 struct daemon_call_state {
249         struct ctdb_client *client;
250         uint32_t reqid;
251         struct ctdb_call *call;
252 };
253
254 /* 
255    complete a call from a client 
256 */
257 static void daemon_call_from_client_callback(struct ctdb_call_state *state)
258 {
259         struct daemon_call_state *dstate = talloc_get_type(state->async.private_data, 
260                                                            struct daemon_call_state);
261         struct ctdb_reply_call *r;
262         int res;
263         uint32_t length;
264         struct ctdb_client *client = dstate->client;
265
266         talloc_steal(client, dstate);
267         talloc_steal(dstate, dstate->call);
268
269         res = ctdb_daemon_call_recv(state, dstate->call);
270         if (res != 0) {
271                 DEBUG(0, (__location__ " ctdbd_call_recv() returned error\n"));
272                 return;
273         }
274
275         length = offsetof(struct ctdb_reply_call, data) + dstate->call->reply_data.dsize;
276         r = ctdbd_allocate_pkt(dstate, length);
277         if (r == NULL) {
278                 DEBUG(0, (__location__ " Failed to allocate reply_call in ctdb daemon\n"));
279                 return;
280         }
281         memset(r, 0, offsetof(struct ctdb_reply_call, data));
282         r->hdr.length       = length;
283         r->hdr.ctdb_magic   = CTDB_MAGIC;
284         r->hdr.ctdb_version = CTDB_VERSION;
285         r->hdr.operation    = CTDB_REPLY_CALL;
286         r->hdr.reqid        = dstate->reqid;
287         r->datalen          = dstate->call->reply_data.dsize;
288         memcpy(&r->data[0], dstate->call->reply_data.dptr, r->datalen);
289
290         res = ctdb_queue_send(client->queue, (uint8_t *)&r->hdr, r->hdr.length);
291         if (res != 0) {
292                 DEBUG(0, (__location__ "Failed to queue packet from daemon to client\n"));
293         }
294         talloc_free(dstate);
295 }
296
297 /*
298   this is called when the ctdb daemon received a ctdb request call
299   from a local client over the unix domain socket
300  */
301 static void daemon_request_call_from_client(struct ctdb_client *client, 
302                                             struct ctdb_req_call *c)
303 {
304         struct ctdb_call_state *state;
305         struct ctdb_db_context *ctdb_db;
306         struct daemon_call_state *dstate;
307         struct ctdb_call *call;
308
309         ctdb_db = find_ctdb_db(client->ctdb, c->db_id);
310         if (!ctdb_db) {
311                 DEBUG(0, (__location__ " Unknown database in request. db_id==0x%08x",
312                           c->db_id));
313                 return;
314         }
315
316         dstate = talloc(client, struct daemon_call_state);
317         if (dstate == NULL) {
318                 DEBUG(0,(__location__ " Unable to allocate dstate\n"));
319                 return;
320         }
321         dstate->client = client;
322         dstate->reqid  = c->hdr.reqid;
323
324         call = dstate->call = talloc_zero(dstate, struct ctdb_call);
325         if (call == NULL) {
326                 DEBUG(0,(__location__ " Unable to allocate call\n"));
327                 return;
328         }
329
330         call->call_id = c->callid;
331         call->key.dptr = c->data;
332         call->key.dsize = c->keylen;
333         call->call_data.dptr = c->data + c->keylen;
334         call->call_data.dsize = c->calldatalen;
335
336         state = ctdb_daemon_call_send(ctdb_db, call);
337         if (state == NULL) {
338                 DEBUG(0,(__location__ " Unable to setup call send\n"));
339                 return;
340         }
341         talloc_steal(state, dstate);
342         talloc_steal(client, state);
343
344         state->async.fn = daemon_call_from_client_callback;
345         state->async.private_data = dstate;
346 }
347
348 /* data contains a packet from the client */
349 static void daemon_incoming_packet(struct ctdb_client *client, void *data, size_t nread)
350 {
351         struct ctdb_req_header *hdr = data;
352         TALLOC_CTX *tmp_ctx;
353
354         /* place the packet as a child of a tmp_ctx. We then use
355            talloc_free() below to free it. If any of the calls want
356            to keep it, then they will steal it somewhere else, and the
357            talloc_free() will be a no-op */
358         tmp_ctx = talloc_new(client);
359         talloc_steal(tmp_ctx, hdr);
360
361         if (hdr->ctdb_magic != CTDB_MAGIC) {
362                 ctdb_set_error(client->ctdb, "Non CTDB packet rejected in daemon\n");
363                 goto done;
364         }
365
366         if (hdr->ctdb_version != CTDB_VERSION) {
367                 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
368                 goto done;
369         }
370
371         switch (hdr->operation) {
372         case CTDB_REQ_CALL:
373                 daemon_request_call_from_client(client, (struct ctdb_req_call *)hdr);
374                 break;
375
376         case CTDB_REQ_REGISTER:
377                 daemon_request_register_message_handler(client, 
378                                                         (struct ctdb_req_register *)hdr);
379                 break;
380         case CTDB_REQ_MESSAGE:
381                 daemon_request_message_from_client(client, (struct ctdb_req_message *)hdr);
382                 break;
383
384         case CTDB_REQ_CONNECT_WAIT:
385                 daemon_request_connect_wait(client, (struct ctdb_req_connect_wait *)hdr);
386                 break;
387
388         case CTDB_REQ_SHUTDOWN:
389                 daemon_request_shutdown(client, (struct ctdb_req_shutdown *)hdr);
390                 break;
391
392         default:
393                 DEBUG(0,(__location__ " daemon: unrecognized operation %d\n",
394                          hdr->operation));
395         }
396
397 done:
398         talloc_free(tmp_ctx);
399 }
400
401
402 static void ctdb_daemon_read_cb(uint8_t *data, size_t cnt, void *args)
403 {
404         struct ctdb_client *client = talloc_get_type(args, struct ctdb_client);
405         struct ctdb_req_header *hdr;
406
407         if (cnt == 0) {
408                 talloc_free(client);
409                 return;
410         }
411
412         if (cnt < sizeof(*hdr)) {
413                 ctdb_set_error(client->ctdb, "Bad packet length %d in daemon\n", cnt);
414                 return;
415         }
416         hdr = (struct ctdb_req_header *)data;
417         if (cnt != hdr->length) {
418                 ctdb_set_error(client->ctdb, "Bad header length %d expected %d\n in daemon", 
419                                hdr->length, cnt);
420                 return;
421         }
422
423         if (hdr->ctdb_magic != CTDB_MAGIC) {
424                 ctdb_set_error(client->ctdb, "Non CTDB packet rejected\n");
425                 return;
426         }
427
428         if (hdr->ctdb_version != CTDB_VERSION) {
429                 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
430                 return;
431         }
432
433         /* it is the responsibility of the incoming packet function to free 'data' */
434         daemon_incoming_packet(client, data, cnt);
435 }
436
437 static void ctdb_accept_client(struct event_context *ev, struct fd_event *fde, 
438                          uint16_t flags, void *private_data)
439 {
440         struct sockaddr_in addr;
441         socklen_t len;
442         int fd;
443         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
444         struct ctdb_client *client;
445
446         memset(&addr, 0, sizeof(addr));
447         len = sizeof(addr);
448         fd = accept(ctdb->daemon.sd, (struct sockaddr *)&addr, &len);
449         if (fd == -1) {
450                 return;
451         }
452         set_non_blocking(fd);
453
454         client = talloc_zero(ctdb, struct ctdb_client);
455         client->ctdb = ctdb;
456         client->fd = fd;
457
458         client->queue = ctdb_queue_setup(ctdb, client, fd, CTDB_DS_ALIGNMENT, 
459                                          ctdb_daemon_read_cb, client);
460
461         talloc_set_destructor(client, ctdb_client_destructor);
462 }
463
464
465
466 static void ctdb_read_from_parent(struct event_context *ev, struct fd_event *fde, 
467                          uint16_t flags, void *private_data)
468 {
469         int *fd = private_data;
470         int cnt;
471         char buf;
472
473         /* XXX this is a good place to try doing some cleaning up before exiting */
474         cnt = read(*fd, &buf, 1);
475         if (cnt==0) {
476                 DEBUG(2,(__location__ " parent process exited. filedescriptor dissappeared\n"));
477                 exit(1);
478         } else {
479                 DEBUG(0,(__location__ " ctdb: did not expect data from parent process\n"));
480                 exit(1);
481         }
482 }
483
484
485
486 /*
487   create a unix domain socket and bind it
488   return a file descriptor open on the socket 
489 */
490 static int ux_socket_bind(struct ctdb_context *ctdb)
491 {
492         struct sockaddr_un addr;
493
494         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
495         if (ctdb->daemon.sd == -1) {
496                 ctdb->daemon.sd = -1;
497                 return -1;
498         }
499
500         set_non_blocking(ctdb->daemon.sd);
501
502         memset(&addr, 0, sizeof(addr));
503         addr.sun_family = AF_UNIX;
504         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
505
506         if (bind(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
507                 close(ctdb->daemon.sd);
508                 ctdb->daemon.sd = -1;
509                 return -1;
510         }       
511         listen(ctdb->daemon.sd, 1);
512
513         return 0;
514 }
515
516 /*
517   delete the socket on exit - called on destruction of autofree context
518  */
519 static int unlink_destructor(const char *name)
520 {
521         unlink(name);
522         return 0;
523 }
524
525 /*
526   start the protocol going
527 */
528 int ctdb_start(struct ctdb_context *ctdb)
529 {
530         pid_t pid;
531         static int fd[2];
532         int res;
533         struct fd_event *fde;
534         const char *domain_socket_name;
535
536         /* generate a name to use for our local socket */
537         ctdb->daemon.name = talloc_asprintf(ctdb, "%s.%s", CTDB_PATH, ctdb->address.address);
538         /* get rid of any old sockets */
539         unlink(ctdb->daemon.name);
540
541         /* create a unix domain stream socket to listen to */
542         res = ux_socket_bind(ctdb);
543         if (res!=0) {
544                 DEBUG(0,(__location__ " Failed to open CTDB unix domain socket\n"));
545                 exit(10);
546         }
547
548         res = pipe(&fd[0]);
549         if (res) {
550                 DEBUG(0,(__location__ " Failed to open pipe for CTDB\n"));
551                 exit(1);
552         }
553         pid = fork();
554         if (pid==-1) {
555                 DEBUG(0,(__location__ " Failed to fork CTDB daemon\n"));
556                 exit(1);
557         }
558
559         if (pid) {
560                 close(fd[0]);
561                 close(ctdb->daemon.sd);
562                 ctdb->daemon.sd = -1;
563                 return 0;
564         }
565
566         block_signal(SIGPIPE);
567
568         /* ensure the socket is deleted on exit of the daemon */
569         domain_socket_name = talloc_strdup(talloc_autofree_context(), ctdb->daemon.name);
570         talloc_set_destructor(domain_socket_name, unlink_destructor);   
571         
572         close(fd[1]);
573
574         ctdb->ev = event_context_init(NULL);
575         fde = event_add_fd(ctdb->ev, ctdb, fd[0], EVENT_FD_READ, ctdb_read_from_parent, &fd[0]);
576         fde = event_add_fd(ctdb->ev, ctdb, ctdb->daemon.sd, EVENT_FD_READ, ctdb_accept_client, ctdb);
577         ctdb_main_loop(ctdb);
578
579         return 0;
580 }
581
582 /*
583   allocate a packet for use in client<->daemon communication
584  */
585 void *ctdbd_allocate_pkt(TALLOC_CTX *mem_ctx, size_t len)
586 {
587         int size;
588
589         size = (len+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
590         return talloc_size(mem_ctx, size);
591 }
592
593 /*
594   called when a CTDB_REQ_FINISHED packet comes in
595 */
596 void ctdb_request_finished(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
597 {
598         ctdb->num_finished++;
599 }