merged tridge's branch
[samba.git] / ctdb / common / ctdb_daemon.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2006
5
6    This library is free software; you can redistribute it and/or
7    modify it under the terms of the GNU Lesser General Public
8    License as published by the Free Software Foundation; either
9    version 2 of the License, or (at your option) any later version.
10
11    This library is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14    Lesser General Public License for more details.
15
16    You should have received a copy of the GNU Lesser General Public
17    License along with this library; if not, write to the Free Software
18    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
19 */
20
21 #include "includes.h"
22 #include "db_wrap.h"
23 #include "lib/tdb/include/tdb.h"
24 #include "lib/events/events.h"
25 #include "lib/util/dlinklist.h"
26 #include "system/network.h"
27 #include "system/filesys.h"
28 #include "system/wait.h"
29 #include "../include/ctdb.h"
30 #include "../include/ctdb_private.h"
31
32 /*
33   structure describing a connected client in the daemon
34  */
35 struct ctdb_client {
36         struct ctdb_context *ctdb;
37         int fd;
38         struct ctdb_queue *queue;
39 };
40
41
42
43 static void daemon_incoming_packet(void *, uint8_t *, uint32_t );
44
45 static void ctdb_main_loop(struct ctdb_context *ctdb)
46 {
47         /* we are the dispatcher process now, so start the protocol going */
48         if (ctdb_init_transport(ctdb)) {
49                 exit(1);
50         }
51
52         ctdb->methods->start(ctdb);
53
54         /* go into a wait loop to allow other nodes to complete */
55         event_loop_wait(ctdb->ev);
56
57         DEBUG(0,("event_loop_wait() returned. this should not happen\n"));
58         exit(1);
59 }
60
61
62 static void set_non_blocking(int fd)
63 {
64         unsigned v;
65         v = fcntl(fd, F_GETFL, 0);
66         fcntl(fd, F_SETFL, v | O_NONBLOCK);
67 }
68
69 static void block_signal(int signum)
70 {
71         struct sigaction act;
72
73         memset(&act, 0, sizeof(act));
74
75         act.sa_handler = SIG_IGN;
76         sigemptyset(&act.sa_mask);
77         sigaddset(&act.sa_mask, signum);
78         sigaction(signum, &act, NULL);
79 }
80
81
82 /*
83   message handler for when we are in daemon mode. This redirects the message
84   to the right client
85  */
86 static void daemon_message_handler(struct ctdb_context *ctdb, uint32_t srvid, 
87                                     TDB_DATA data, void *private_data)
88 {
89         struct ctdb_client *client = talloc_get_type(private_data, struct ctdb_client);
90         struct ctdb_req_message *r;
91         int len;
92
93         /* construct a message to send to the client containing the data */
94         len = offsetof(struct ctdb_req_message, data) + data.dsize;
95         r = ctdbd_allocate_pkt(ctdb, len);
96
97         talloc_set_name_const(r, "req_message packet");
98
99         memset(r, 0, offsetof(struct ctdb_req_message, data));
100
101         r->hdr.length    = len;
102         r->hdr.ctdb_magic = CTDB_MAGIC;
103         r->hdr.ctdb_version = CTDB_VERSION;
104         r->hdr.operation = CTDB_REQ_MESSAGE;
105         r->srvid         = srvid;
106         r->datalen       = data.dsize;
107         memcpy(&r->data[0], data.dptr, data.dsize);
108
109         ctdb_queue_send(client->queue, (uint8_t *)&r->hdr, len);
110
111         talloc_free(r);
112 }
113                                            
114
115 /*
116   this is called when the ctdb daemon received a ctdb request to 
117   set the srvid from the client
118  */
119 static void daemon_request_register_message_handler(struct ctdb_client *client, 
120                                                     struct ctdb_req_register *c)
121 {
122         int res;
123         res = ctdb_register_message_handler(client->ctdb, client, 
124                                             c->srvid, daemon_message_handler, 
125                                             client);
126         if (res != 0) {
127                 DEBUG(0,(__location__ " Failed to register handler %u in daemon\n", 
128                          c->srvid));
129         } else {
130                 DEBUG(2,(__location__ " Registered message handler for srvid=%u\n", 
131                          c->srvid));
132         }
133 }
134
135
136 /*
137   called when the daemon gets a shutdown request from a client
138  */
139 static void daemon_request_shutdown(struct ctdb_client *client, 
140                                       struct ctdb_req_shutdown *f)
141 {
142         struct ctdb_context *ctdb = talloc_get_type(client->ctdb, struct ctdb_context);
143         int len;
144         uint32_t node;
145
146         /* we dont send to ourself so we can already count one daemon as
147            exiting */
148         ctdb->num_finished++;
149
150
151         /* loop over all nodes of the cluster */
152         for (node=0; node<ctdb->num_nodes;node++) {
153                 struct ctdb_req_finished *rf;
154
155                 /* dont send a message to ourself */
156                 if (ctdb->vnn == node) {
157                         continue;
158                 }
159
160                 len = sizeof(struct ctdb_req_finished);
161                 rf = ctdb->methods->allocate_pkt(ctdb, len);
162                 CTDB_NO_MEMORY_FATAL(ctdb, rf);
163                 talloc_set_name_const(rf, "ctdb_req_finished packet");
164
165                 ZERO_STRUCT(*rf);
166                 rf->hdr.length    = len;
167                 rf->hdr.ctdb_magic = CTDB_MAGIC;
168                 rf->hdr.ctdb_version = CTDB_VERSION;
169                 rf->hdr.operation = CTDB_REQ_FINISHED;
170                 rf->hdr.destnode  = node;
171                 rf->hdr.srcnode   = ctdb->vnn;
172                 rf->hdr.reqid     = 0;
173
174                 ctdb_queue_packet(ctdb, &(rf->hdr));
175
176                 talloc_free(rf);
177         }
178
179         /* wait until all nodes have are prepared to shutdown */
180         while (ctdb->num_finished != ctdb->num_nodes) {
181                 event_loop_once(ctdb->ev);
182         }
183
184         /* all daemons have requested to finish - we now exit */
185         DEBUG(1,("All daemons finished - exiting\n"));
186         _exit(0);
187 }
188
189
190
191 /*
192   called when the daemon gets a connect wait request from a client
193  */
194 static void daemon_request_connect_wait(struct ctdb_client *client, 
195                                         struct ctdb_req_connect_wait *c)
196 {
197         struct ctdb_reply_connect_wait r;
198         int res;
199
200         /* first wait - in the daemon */
201         ctdb_daemon_connect_wait(client->ctdb);
202
203         /* now send the reply */
204         ZERO_STRUCT(r);
205
206         r.hdr.length     = sizeof(r);
207         r.hdr.ctdb_magic = CTDB_MAGIC;
208         r.hdr.ctdb_version = CTDB_VERSION;
209         r.hdr.operation = CTDB_REPLY_CONNECT_WAIT;
210         r.vnn           = ctdb_get_vnn(client->ctdb);
211         r.num_connected = client->ctdb->num_connected;
212         
213         res = ctdb_queue_send(client->queue, (uint8_t *)&r.hdr, r.hdr.length);
214         if (res != 0) {
215                 DEBUG(0,(__location__ " Failed to queue a connect wait response\n"));
216                 return;
217         }
218 }
219
220 /*
221   destroy a ctdb_client
222 */
223 static int ctdb_client_destructor(struct ctdb_client *client)
224 {
225         close(client->fd);
226         client->fd = -1;
227         return 0;
228 }
229
230
231 /*
232   this is called when the ctdb daemon received a ctdb request message
233   from a local client over the unix domain socket
234  */
235 static void daemon_request_message_from_client(struct ctdb_client *client, 
236                                                struct ctdb_req_message *c)
237 {
238         TDB_DATA data;
239         int res;
240
241         /* maybe the message is for another client on this node */
242         if (ctdb_get_vnn(client->ctdb)==c->hdr.destnode) {
243                 ctdb_request_message(client->ctdb, (struct ctdb_req_header *)c);
244                 return;
245         }
246         
247         /* its for a remote node */
248         data.dptr = &c->data[0];
249         data.dsize = c->datalen;
250         res = ctdb_daemon_send_message(client->ctdb, c->hdr.destnode,
251                                        c->srvid, data);
252         if (res != 0) {
253                 DEBUG(0,(__location__ " Failed to send message to remote node %u\n",
254                          c->hdr.destnode));
255         }
256 }
257
258
259 struct daemon_call_state {
260         struct ctdb_client *client;
261         uint32_t reqid;
262         struct ctdb_call *call;
263 };
264
265 /* 
266    complete a call from a client 
267 */
268 static void daemon_call_from_client_callback(struct ctdb_call_state *state)
269 {
270         struct daemon_call_state *dstate = talloc_get_type(state->async.private_data, 
271                                                            struct daemon_call_state);
272         struct ctdb_reply_call *r;
273         int res;
274         uint32_t length;
275         struct ctdb_client *client = dstate->client;
276
277         talloc_steal(client, dstate);
278         talloc_steal(dstate, dstate->call);
279
280         res = ctdb_daemon_call_recv(state, dstate->call);
281         if (res != 0) {
282                 DEBUG(0, (__location__ " ctdbd_call_recv() returned error\n"));
283                 return;
284         }
285
286         length = offsetof(struct ctdb_reply_call, data) + dstate->call->reply_data.dsize;
287         r = ctdbd_allocate_pkt(dstate, length);
288         if (r == NULL) {
289                 DEBUG(0, (__location__ " Failed to allocate reply_call in ctdb daemon\n"));
290                 return;
291         }
292         memset(r, 0, offsetof(struct ctdb_reply_call, data));
293         r->hdr.length       = length;
294         r->hdr.ctdb_magic   = CTDB_MAGIC;
295         r->hdr.ctdb_version = CTDB_VERSION;
296         r->hdr.operation    = CTDB_REPLY_CALL;
297         r->hdr.reqid        = dstate->reqid;
298         r->datalen          = dstate->call->reply_data.dsize;
299         memcpy(&r->data[0], dstate->call->reply_data.dptr, r->datalen);
300
301         res = ctdb_queue_send(client->queue, (uint8_t *)&r->hdr, r->hdr.length);
302         if (res != 0) {
303                 DEBUG(0, (__location__ "Failed to queue packet from daemon to client\n"));
304         }
305         talloc_free(dstate);
306 }
307
308
309 /*
310   this is called when the ctdb daemon received a ctdb request call
311   from a local client over the unix domain socket
312  */
313 static void daemon_request_call_from_client(struct ctdb_client *client, 
314                                             struct ctdb_req_call *c)
315 {
316         struct ctdb_call_state *state;
317         struct ctdb_db_context *ctdb_db;
318         struct daemon_call_state *dstate;
319         struct ctdb_call *call;
320         struct ctdb_ltdb_header header;
321         TDB_DATA key, data;
322         int ret;
323         struct ctdb_context *ctdb = client->ctdb;
324
325         ctdb_db = find_ctdb_db(client->ctdb, c->db_id);
326         if (!ctdb_db) {
327                 DEBUG(0, (__location__ " Unknown database in request. db_id==0x%08x",
328                           c->db_id));
329                 return;
330         }
331
332         key.dptr = c->data;
333         key.dsize = c->keylen;
334
335         ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header, 
336                                            (struct ctdb_req_header *)c, &data,
337                                            daemon_incoming_packet, client);
338         if (ret == -2) {
339                 /* will retry later */
340                 return;
341         }
342
343         if (ret != 0) {
344                 DEBUG(0,(__location__ " Unable to fetch record\n"));
345                 return;
346         }
347
348         dstate = talloc(client, struct daemon_call_state);
349         if (dstate == NULL) {
350                 ctdb_ltdb_unlock(ctdb_db, key);
351                 DEBUG(0,(__location__ " Unable to allocate dstate\n"));
352                 return;
353         }
354         dstate->client = client;
355         dstate->reqid  = c->hdr.reqid;
356         talloc_steal(dstate, data.dptr);
357
358         call = dstate->call = talloc_zero(dstate, struct ctdb_call);
359         if (call == NULL) {
360                 ctdb_ltdb_unlock(ctdb_db, key);
361                 DEBUG(0,(__location__ " Unable to allocate call\n"));
362                 return;
363         }
364
365         call->call_id = c->callid;
366         call->key = key;
367         call->call_data.dptr = c->data + c->keylen;
368         call->call_data.dsize = c->calldatalen;
369         call->flags = c->flags;
370
371         if (header.dmaster == ctdb->vnn && !(ctdb->flags & CTDB_FLAG_SELF_CONNECT)) {
372                 state = ctdb_call_local_send(ctdb_db, call, &header, &data);
373         } else {
374                 state = ctdb_daemon_call_send_remote(ctdb_db, call, &header);
375         }
376
377         ctdb_ltdb_unlock(ctdb_db, key);
378
379         if (state == NULL) {
380                 DEBUG(0,(__location__ " Unable to setup call send\n"));
381                 return;
382         }
383         talloc_steal(state, dstate);
384         talloc_steal(client, state);
385
386         state->async.fn = daemon_call_from_client_callback;
387         state->async.private_data = dstate;
388 }
389
390 /* data contains a packet from the client */
391 static void daemon_incoming_packet(void *p, uint8_t *data, uint32_t nread)
392 {
393         struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
394         struct ctdb_client *client = talloc_get_type(p, struct ctdb_client);
395         TALLOC_CTX *tmp_ctx;
396
397         /* place the packet as a child of a tmp_ctx. We then use
398            talloc_free() below to free it. If any of the calls want
399            to keep it, then they will steal it somewhere else, and the
400            talloc_free() will be a no-op */
401         tmp_ctx = talloc_new(client);
402         talloc_steal(tmp_ctx, hdr);
403
404         if (hdr->ctdb_magic != CTDB_MAGIC) {
405                 ctdb_set_error(client->ctdb, "Non CTDB packet rejected in daemon\n");
406                 goto done;
407         }
408
409         if (hdr->ctdb_version != CTDB_VERSION) {
410                 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
411                 goto done;
412         }
413
414         switch (hdr->operation) {
415         case CTDB_REQ_CALL:
416                 daemon_request_call_from_client(client, (struct ctdb_req_call *)hdr);
417                 break;
418
419         case CTDB_REQ_REGISTER:
420                 daemon_request_register_message_handler(client, 
421                                                         (struct ctdb_req_register *)hdr);
422                 break;
423         case CTDB_REQ_MESSAGE:
424                 daemon_request_message_from_client(client, (struct ctdb_req_message *)hdr);
425                 break;
426
427         case CTDB_REQ_CONNECT_WAIT:
428                 daemon_request_connect_wait(client, (struct ctdb_req_connect_wait *)hdr);
429                 break;
430
431         case CTDB_REQ_SHUTDOWN:
432                 daemon_request_shutdown(client, (struct ctdb_req_shutdown *)hdr);
433                 break;
434
435         default:
436                 DEBUG(0,(__location__ " daemon: unrecognized operation %d\n",
437                          hdr->operation));
438         }
439
440 done:
441         talloc_free(tmp_ctx);
442 }
443
444
445 static void ctdb_daemon_read_cb(uint8_t *data, size_t cnt, void *args)
446 {
447         struct ctdb_client *client = talloc_get_type(args, struct ctdb_client);
448         struct ctdb_req_header *hdr;
449
450         if (cnt == 0) {
451                 talloc_free(client);
452                 return;
453         }
454
455         if (cnt < sizeof(*hdr)) {
456                 ctdb_set_error(client->ctdb, "Bad packet length %d in daemon\n", cnt);
457                 return;
458         }
459         hdr = (struct ctdb_req_header *)data;
460         if (cnt != hdr->length) {
461                 ctdb_set_error(client->ctdb, "Bad header length %d expected %d\n in daemon", 
462                                hdr->length, cnt);
463                 return;
464         }
465
466         if (hdr->ctdb_magic != CTDB_MAGIC) {
467                 ctdb_set_error(client->ctdb, "Non CTDB packet rejected\n");
468                 return;
469         }
470
471         if (hdr->ctdb_version != CTDB_VERSION) {
472                 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
473                 return;
474         }
475
476         /* it is the responsibility of the incoming packet function to free 'data' */
477         daemon_incoming_packet(client, data, cnt);
478 }
479
480 static void ctdb_accept_client(struct event_context *ev, struct fd_event *fde, 
481                          uint16_t flags, void *private_data)
482 {
483         struct sockaddr_in addr;
484         socklen_t len;
485         int fd;
486         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
487         struct ctdb_client *client;
488
489         memset(&addr, 0, sizeof(addr));
490         len = sizeof(addr);
491         fd = accept(ctdb->daemon.sd, (struct sockaddr *)&addr, &len);
492         if (fd == -1) {
493                 return;
494         }
495         set_non_blocking(fd);
496
497         client = talloc_zero(ctdb, struct ctdb_client);
498         client->ctdb = ctdb;
499         client->fd = fd;
500
501         client->queue = ctdb_queue_setup(ctdb, client, fd, CTDB_DS_ALIGNMENT, 
502                                          ctdb_daemon_read_cb, client);
503
504         talloc_set_destructor(client, ctdb_client_destructor);
505 }
506
507
508
509 static void ctdb_read_from_parent(struct event_context *ev, struct fd_event *fde, 
510                          uint16_t flags, void *private_data)
511 {
512         int *fd = private_data;
513         int cnt;
514         char buf;
515
516         /* XXX this is a good place to try doing some cleaning up before exiting */
517         cnt = read(*fd, &buf, 1);
518         if (cnt==0) {
519                 DEBUG(2,(__location__ " parent process exited. filedescriptor dissappeared\n"));
520                 exit(1);
521         } else {
522                 DEBUG(0,(__location__ " ctdb: did not expect data from parent process\n"));
523                 exit(1);
524         }
525 }
526
527
528
529 /*
530   create a unix domain socket and bind it
531   return a file descriptor open on the socket 
532 */
533 static int ux_socket_bind(struct ctdb_context *ctdb)
534 {
535         struct sockaddr_un addr;
536
537         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
538         if (ctdb->daemon.sd == -1) {
539                 ctdb->daemon.sd = -1;
540                 return -1;
541         }
542
543         set_non_blocking(ctdb->daemon.sd);
544
545         memset(&addr, 0, sizeof(addr));
546         addr.sun_family = AF_UNIX;
547         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
548
549         if (bind(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
550                 close(ctdb->daemon.sd);
551                 ctdb->daemon.sd = -1;
552                 return -1;
553         }       
554         listen(ctdb->daemon.sd, 1);
555
556         return 0;
557 }
558
559 /*
560   delete the socket on exit - called on destruction of autofree context
561  */
562 static int unlink_destructor(const char *name)
563 {
564         unlink(name);
565         return 0;
566 }
567
568 /*
569   start the protocol going
570 */
571 int ctdb_start(struct ctdb_context *ctdb)
572 {
573         pid_t pid;
574         static int fd[2];
575         int res;
576         struct fd_event *fde;
577         const char *domain_socket_name;
578
579         /* generate a name to use for our local socket */
580         ctdb->daemon.name = talloc_asprintf(ctdb, "%s.%s", CTDB_PATH, ctdb->address.address);
581         /* get rid of any old sockets */
582         unlink(ctdb->daemon.name);
583
584         /* create a unix domain stream socket to listen to */
585         res = ux_socket_bind(ctdb);
586         if (res!=0) {
587                 DEBUG(0,(__location__ " Failed to open CTDB unix domain socket\n"));
588                 exit(10);
589         }
590
591         res = pipe(&fd[0]);
592         if (res) {
593                 DEBUG(0,(__location__ " Failed to open pipe for CTDB\n"));
594                 exit(1);
595         }
596         pid = fork();
597         if (pid==-1) {
598                 DEBUG(0,(__location__ " Failed to fork CTDB daemon\n"));
599                 exit(1);
600         }
601
602         if (pid) {
603                 close(fd[0]);
604                 close(ctdb->daemon.sd);
605                 ctdb->daemon.sd = -1;
606
607                 /* Added because of ctdb->methods->allocate_pkt calls */
608                 /* TODO: clean */
609                 int ctdb_tcp_init(struct ctdb_context *ctdb);
610                 ctdb_tcp_init(ctdb);
611
612                 return 0;
613         }
614
615         block_signal(SIGPIPE);
616
617         /* ensure the socket is deleted on exit of the daemon */
618         domain_socket_name = talloc_strdup(talloc_autofree_context(), ctdb->daemon.name);
619         talloc_set_destructor(domain_socket_name, unlink_destructor);   
620         
621         close(fd[1]);
622
623         ctdb->ev = event_context_init(NULL);
624         fde = event_add_fd(ctdb->ev, ctdb, fd[0], EVENT_FD_READ, ctdb_read_from_parent, &fd[0]);
625         fde = event_add_fd(ctdb->ev, ctdb, ctdb->daemon.sd, EVENT_FD_READ, ctdb_accept_client, ctdb);
626         ctdb_main_loop(ctdb);
627
628         return 0;
629 }
630
631 /*
632   allocate a packet for use in client<->daemon communication
633  */
634 void *ctdbd_allocate_pkt(TALLOC_CTX *mem_ctx, size_t len)
635 {
636         int size;
637
638         size = (len+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
639         return talloc_size(mem_ctx, size);
640 }
641
642 /*
643   called when a CTDB_REQ_FINISHED packet comes in
644 */
645 void ctdb_request_finished(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
646 {
647         ctdb->num_finished++;
648 }