minor debug changes
[samba.git] / ctdb / common / ctdb_daemon.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2006
5
6    This library is free software; you can redistribute it and/or
7    modify it under the terms of the GNU Lesser General Public
8    License as published by the Free Software Foundation; either
9    version 2 of the License, or (at your option) any later version.
10
11    This library is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14    Lesser General Public License for more details.
15
16    You should have received a copy of the GNU Lesser General Public
17    License along with this library; if not, write to the Free Software
18    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
19 */
20
21 #include "includes.h"
22 #include "db_wrap.h"
23 #include "lib/tdb/include/tdb.h"
24 #include "lib/events/events.h"
25 #include "lib/util/dlinklist.h"
26 #include "system/network.h"
27 #include "system/filesys.h"
28 #include "system/wait.h"
29 #include "../include/ctdb.h"
30 #include "../include/ctdb_private.h"
31
32 /*
33   structure describing a connected client in the daemon
34  */
35 struct ctdb_client {
36         struct ctdb_context *ctdb;
37         int fd;
38         struct ctdb_queue *queue;
39 };
40
41
42
43 static void daemon_incoming_packet(void *, uint8_t *, uint32_t );
44
45 static void ctdb_main_loop(struct ctdb_context *ctdb)
46 {
47         ctdb->methods->start(ctdb);
48
49         /* go into a wait loop to allow other nodes to complete */
50         event_loop_wait(ctdb->ev);
51
52         DEBUG(0,("event_loop_wait() returned. this should not happen\n"));
53         exit(1);
54 }
55
56
57 static void set_non_blocking(int fd)
58 {
59         unsigned v;
60         v = fcntl(fd, F_GETFL, 0);
61         fcntl(fd, F_SETFL, v | O_NONBLOCK);
62 }
63
64 static void block_signal(int signum)
65 {
66         struct sigaction act;
67
68         memset(&act, 0, sizeof(act));
69
70         act.sa_handler = SIG_IGN;
71         sigemptyset(&act.sa_mask);
72         sigaddset(&act.sa_mask, signum);
73         sigaction(signum, &act, NULL);
74 }
75
76
77 /*
78   message handler for when we are in daemon mode. This redirects the message
79   to the right client
80  */
81 static void daemon_message_handler(struct ctdb_context *ctdb, uint32_t srvid, 
82                                     TDB_DATA data, void *private_data)
83 {
84         struct ctdb_client *client = talloc_get_type(private_data, struct ctdb_client);
85         struct ctdb_req_message *r;
86         int len;
87
88         /* construct a message to send to the client containing the data */
89         len = offsetof(struct ctdb_req_message, data) + data.dsize;
90         r = ctdbd_allocate_pkt(ctdb, len);
91
92         talloc_set_name_const(r, "req_message packet");
93
94         memset(r, 0, offsetof(struct ctdb_req_message, data));
95
96         r->hdr.length    = len;
97         r->hdr.ctdb_magic = CTDB_MAGIC;
98         r->hdr.ctdb_version = CTDB_VERSION;
99         r->hdr.operation = CTDB_REQ_MESSAGE;
100         r->srvid         = srvid;
101         r->datalen       = data.dsize;
102         memcpy(&r->data[0], data.dptr, data.dsize);
103
104         ctdb_queue_send(client->queue, (uint8_t *)&r->hdr, len);
105
106         talloc_free(r);
107 }
108                                            
109
110 /*
111   this is called when the ctdb daemon received a ctdb request to 
112   set the srvid from the client
113  */
114 static void daemon_request_register_message_handler(struct ctdb_client *client, 
115                                                     struct ctdb_req_register *c)
116 {
117         int res;
118         res = ctdb_register_message_handler(client->ctdb, client, 
119                                             c->srvid, daemon_message_handler, 
120                                             client);
121         if (res != 0) {
122                 DEBUG(0,(__location__ " Failed to register handler %u in daemon\n", 
123                          c->srvid));
124         } else {
125                 DEBUG(2,(__location__ " Registered message handler for srvid=%u\n", 
126                          c->srvid));
127         }
128 }
129
130
131 /*
132   called when the daemon gets a shutdown request from a client
133  */
134 static void daemon_request_shutdown(struct ctdb_client *client, 
135                                       struct ctdb_req_shutdown *f)
136 {
137         struct ctdb_context *ctdb = talloc_get_type(client->ctdb, struct ctdb_context);
138         int len;
139         uint32_t node;
140
141         /* we dont send to ourself so we can already count one daemon as
142            exiting */
143         ctdb->num_finished++;
144
145
146         /* loop over all nodes of the cluster */
147         for (node=0; node<ctdb->num_nodes;node++) {
148                 struct ctdb_req_finished *rf;
149
150                 /* dont send a message to ourself */
151                 if (ctdb->vnn == node) {
152                         continue;
153                 }
154
155                 len = sizeof(struct ctdb_req_finished);
156                 rf = ctdb->methods->allocate_pkt(ctdb, len);
157                 CTDB_NO_MEMORY_FATAL(ctdb, rf);
158                 talloc_set_name_const(rf, "ctdb_req_finished packet");
159
160                 ZERO_STRUCT(*rf);
161                 rf->hdr.length    = len;
162                 rf->hdr.ctdb_magic = CTDB_MAGIC;
163                 rf->hdr.ctdb_version = CTDB_VERSION;
164                 rf->hdr.operation = CTDB_REQ_FINISHED;
165                 rf->hdr.destnode  = node;
166                 rf->hdr.srcnode   = ctdb->vnn;
167                 rf->hdr.reqid     = 0;
168
169                 ctdb_queue_packet(ctdb, &(rf->hdr));
170
171                 talloc_free(rf);
172         }
173
174         /* wait until all nodes have are prepared to shutdown */
175         while (ctdb->num_finished != ctdb->num_nodes) {
176                 event_loop_once(ctdb->ev);
177         }
178
179         /* all daemons have requested to finish - we now exit */
180         DEBUG(1,("All daemons finished - exiting\n"));
181         _exit(0);
182 }
183
184
185
186 /*
187   called when the daemon gets a connect wait request from a client
188  */
189 static void daemon_request_connect_wait(struct ctdb_client *client, 
190                                         struct ctdb_req_connect_wait *c)
191 {
192         struct ctdb_reply_connect_wait r;
193         int res;
194
195         /* first wait - in the daemon */
196         ctdb_daemon_connect_wait(client->ctdb);
197
198         /* now send the reply */
199         ZERO_STRUCT(r);
200
201         r.hdr.length     = sizeof(r);
202         r.hdr.ctdb_magic = CTDB_MAGIC;
203         r.hdr.ctdb_version = CTDB_VERSION;
204         r.hdr.operation = CTDB_REPLY_CONNECT_WAIT;
205         r.vnn           = ctdb_get_vnn(client->ctdb);
206         r.num_connected = client->ctdb->num_connected;
207         
208         res = ctdb_queue_send(client->queue, (uint8_t *)&r.hdr, r.hdr.length);
209         if (res != 0) {
210                 DEBUG(0,(__location__ " Failed to queue a connect wait response\n"));
211                 return;
212         }
213 }
214
215 /*
216   destroy a ctdb_client
217 */
218 static int ctdb_client_destructor(struct ctdb_client *client)
219 {
220         close(client->fd);
221         client->fd = -1;
222         return 0;
223 }
224
225
226 /*
227   this is called when the ctdb daemon received a ctdb request message
228   from a local client over the unix domain socket
229  */
230 static void daemon_request_message_from_client(struct ctdb_client *client, 
231                                                struct ctdb_req_message *c)
232 {
233         TDB_DATA data;
234         int res;
235
236         /* maybe the message is for another client on this node */
237         if (ctdb_get_vnn(client->ctdb)==c->hdr.destnode) {
238                 ctdb_request_message(client->ctdb, (struct ctdb_req_header *)c);
239                 return;
240         }
241         
242         /* its for a remote node */
243         data.dptr = &c->data[0];
244         data.dsize = c->datalen;
245         res = ctdb_daemon_send_message(client->ctdb, c->hdr.destnode,
246                                        c->srvid, data);
247         if (res != 0) {
248                 DEBUG(0,(__location__ " Failed to send message to remote node %u\n",
249                          c->hdr.destnode));
250         }
251 }
252
253
254 struct daemon_call_state {
255         struct ctdb_client *client;
256         uint32_t reqid;
257         struct ctdb_call *call;
258 };
259
260 /* 
261    complete a call from a client 
262 */
263 static void daemon_call_from_client_callback(struct ctdb_call_state *state)
264 {
265         struct daemon_call_state *dstate = talloc_get_type(state->async.private_data, 
266                                                            struct daemon_call_state);
267         struct ctdb_reply_call *r;
268         int res;
269         uint32_t length;
270         struct ctdb_client *client = dstate->client;
271
272         talloc_steal(client, dstate);
273         talloc_steal(dstate, dstate->call);
274
275         res = ctdb_daemon_call_recv(state, dstate->call);
276         if (res != 0) {
277                 DEBUG(0, (__location__ " ctdbd_call_recv() returned error\n"));
278                 return;
279         }
280
281         length = offsetof(struct ctdb_reply_call, data) + dstate->call->reply_data.dsize;
282         r = ctdbd_allocate_pkt(dstate, length);
283         if (r == NULL) {
284                 DEBUG(0, (__location__ " Failed to allocate reply_call in ctdb daemon\n"));
285                 return;
286         }
287         memset(r, 0, offsetof(struct ctdb_reply_call, data));
288         r->hdr.length       = length;
289         r->hdr.ctdb_magic   = CTDB_MAGIC;
290         r->hdr.ctdb_version = CTDB_VERSION;
291         r->hdr.operation    = CTDB_REPLY_CALL;
292         r->hdr.reqid        = dstate->reqid;
293         r->datalen          = dstate->call->reply_data.dsize;
294         memcpy(&r->data[0], dstate->call->reply_data.dptr, r->datalen);
295
296         res = ctdb_queue_send(client->queue, (uint8_t *)&r->hdr, r->hdr.length);
297         if (res != 0) {
298                 DEBUG(0, (__location__ "Failed to queue packet from daemon to client\n"));
299         }
300         talloc_free(dstate);
301 }
302
303
304 /*
305   this is called when the ctdb daemon received a ctdb request call
306   from a local client over the unix domain socket
307  */
308 static void daemon_request_call_from_client(struct ctdb_client *client, 
309                                             struct ctdb_req_call *c)
310 {
311         struct ctdb_call_state *state;
312         struct ctdb_db_context *ctdb_db;
313         struct daemon_call_state *dstate;
314         struct ctdb_call *call;
315         struct ctdb_ltdb_header header;
316         TDB_DATA key, data;
317         int ret;
318         struct ctdb_context *ctdb = client->ctdb;
319
320         ctdb_db = find_ctdb_db(client->ctdb, c->db_id);
321         if (!ctdb_db) {
322                 DEBUG(0, (__location__ " Unknown database in request. db_id==0x%08x",
323                           c->db_id));
324                 return;
325         }
326
327         key.dptr = c->data;
328         key.dsize = c->keylen;
329
330         ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header, 
331                                            (struct ctdb_req_header *)c, &data,
332                                            daemon_incoming_packet, client);
333         if (ret == -2) {
334                 /* will retry later */
335                 return;
336         }
337
338         if (ret != 0) {
339                 DEBUG(0,(__location__ " Unable to fetch record\n"));
340                 return;
341         }
342
343         dstate = talloc(client, struct daemon_call_state);
344         if (dstate == NULL) {
345                 ctdb_ltdb_unlock(ctdb_db, key);
346                 DEBUG(0,(__location__ " Unable to allocate dstate\n"));
347                 return;
348         }
349         dstate->client = client;
350         dstate->reqid  = c->hdr.reqid;
351         talloc_steal(dstate, data.dptr);
352
353         call = dstate->call = talloc_zero(dstate, struct ctdb_call);
354         if (call == NULL) {
355                 ctdb_ltdb_unlock(ctdb_db, key);
356                 DEBUG(0,(__location__ " Unable to allocate call\n"));
357                 return;
358         }
359
360         call->call_id = c->callid;
361         call->key = key;
362         call->call_data.dptr = c->data + c->keylen;
363         call->call_data.dsize = c->calldatalen;
364
365         if (header.dmaster == ctdb->vnn && !(ctdb->flags & CTDB_FLAG_SELF_CONNECT)) {
366                 state = ctdb_call_local_send(ctdb_db, call, &header, &data);
367         } else {
368                 state = ctdb_daemon_call_send_remote(ctdb_db, call, &header);
369         }
370
371         ctdb_ltdb_unlock(ctdb_db, key);
372
373         if (state == NULL) {
374                 DEBUG(0,(__location__ " Unable to setup call send\n"));
375                 return;
376         }
377         talloc_steal(state, dstate);
378         talloc_steal(client, state);
379
380         state->async.fn = daemon_call_from_client_callback;
381         state->async.private_data = dstate;
382 }
383
384 /* data contains a packet from the client */
385 static void daemon_incoming_packet(void *p, uint8_t *data, uint32_t nread)
386 {
387         struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
388         struct ctdb_client *client = talloc_get_type(p, struct ctdb_client);
389         TALLOC_CTX *tmp_ctx;
390
391         /* place the packet as a child of a tmp_ctx. We then use
392            talloc_free() below to free it. If any of the calls want
393            to keep it, then they will steal it somewhere else, and the
394            talloc_free() will be a no-op */
395         tmp_ctx = talloc_new(client);
396         talloc_steal(tmp_ctx, hdr);
397
398         if (hdr->ctdb_magic != CTDB_MAGIC) {
399                 ctdb_set_error(client->ctdb, "Non CTDB packet rejected in daemon\n");
400                 goto done;
401         }
402
403         if (hdr->ctdb_version != CTDB_VERSION) {
404                 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
405                 goto done;
406         }
407
408         switch (hdr->operation) {
409         case CTDB_REQ_CALL:
410                 daemon_request_call_from_client(client, (struct ctdb_req_call *)hdr);
411                 break;
412
413         case CTDB_REQ_REGISTER:
414                 daemon_request_register_message_handler(client, 
415                                                         (struct ctdb_req_register *)hdr);
416                 break;
417         case CTDB_REQ_MESSAGE:
418                 daemon_request_message_from_client(client, (struct ctdb_req_message *)hdr);
419                 break;
420
421         case CTDB_REQ_CONNECT_WAIT:
422                 daemon_request_connect_wait(client, (struct ctdb_req_connect_wait *)hdr);
423                 break;
424
425         case CTDB_REQ_SHUTDOWN:
426                 daemon_request_shutdown(client, (struct ctdb_req_shutdown *)hdr);
427                 break;
428
429         default:
430                 DEBUG(0,(__location__ " daemon: unrecognized operation %d\n",
431                          hdr->operation));
432         }
433
434 done:
435         talloc_free(tmp_ctx);
436 }
437
438
439 static void ctdb_daemon_read_cb(uint8_t *data, size_t cnt, void *args)
440 {
441         struct ctdb_client *client = talloc_get_type(args, struct ctdb_client);
442         struct ctdb_req_header *hdr;
443
444         if (cnt == 0) {
445                 talloc_free(client);
446                 return;
447         }
448
449         if (cnt < sizeof(*hdr)) {
450                 ctdb_set_error(client->ctdb, "Bad packet length %d in daemon\n", cnt);
451                 return;
452         }
453         hdr = (struct ctdb_req_header *)data;
454         if (cnt != hdr->length) {
455                 ctdb_set_error(client->ctdb, "Bad header length %d expected %d\n in daemon", 
456                                hdr->length, cnt);
457                 return;
458         }
459
460         if (hdr->ctdb_magic != CTDB_MAGIC) {
461                 ctdb_set_error(client->ctdb, "Non CTDB packet rejected\n");
462                 return;
463         }
464
465         if (hdr->ctdb_version != CTDB_VERSION) {
466                 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
467                 return;
468         }
469
470         /* it is the responsibility of the incoming packet function to free 'data' */
471         daemon_incoming_packet(client, data, cnt);
472 }
473
474 static void ctdb_accept_client(struct event_context *ev, struct fd_event *fde, 
475                          uint16_t flags, void *private_data)
476 {
477         struct sockaddr_in addr;
478         socklen_t len;
479         int fd;
480         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
481         struct ctdb_client *client;
482
483         memset(&addr, 0, sizeof(addr));
484         len = sizeof(addr);
485         fd = accept(ctdb->daemon.sd, (struct sockaddr *)&addr, &len);
486         if (fd == -1) {
487                 return;
488         }
489         set_non_blocking(fd);
490
491         client = talloc_zero(ctdb, struct ctdb_client);
492         client->ctdb = ctdb;
493         client->fd = fd;
494
495         client->queue = ctdb_queue_setup(ctdb, client, fd, CTDB_DS_ALIGNMENT, 
496                                          ctdb_daemon_read_cb, client);
497
498         talloc_set_destructor(client, ctdb_client_destructor);
499 }
500
501
502
503 static void ctdb_read_from_parent(struct event_context *ev, struct fd_event *fde, 
504                          uint16_t flags, void *private_data)
505 {
506         int *fd = private_data;
507         int cnt;
508         char buf;
509
510         /* XXX this is a good place to try doing some cleaning up before exiting */
511         cnt = read(*fd, &buf, 1);
512         if (cnt==0) {
513                 DEBUG(2,(__location__ " parent process exited. filedescriptor dissappeared\n"));
514                 exit(1);
515         } else {
516                 DEBUG(0,(__location__ " ctdb: did not expect data from parent process\n"));
517                 exit(1);
518         }
519 }
520
521
522
523 /*
524   create a unix domain socket and bind it
525   return a file descriptor open on the socket 
526 */
527 static int ux_socket_bind(struct ctdb_context *ctdb)
528 {
529         struct sockaddr_un addr;
530
531         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
532         if (ctdb->daemon.sd == -1) {
533                 ctdb->daemon.sd = -1;
534                 return -1;
535         }
536
537         set_non_blocking(ctdb->daemon.sd);
538
539         memset(&addr, 0, sizeof(addr));
540         addr.sun_family = AF_UNIX;
541         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
542
543         if (bind(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
544                 close(ctdb->daemon.sd);
545                 ctdb->daemon.sd = -1;
546                 return -1;
547         }       
548         listen(ctdb->daemon.sd, 1);
549
550         return 0;
551 }
552
553 /*
554   delete the socket on exit - called on destruction of autofree context
555  */
556 static int unlink_destructor(const char *name)
557 {
558         unlink(name);
559         return 0;
560 }
561
562 /*
563   start the protocol going
564 */
565 int ctdb_start(struct ctdb_context *ctdb)
566 {
567         pid_t pid;
568         static int fd[2];
569         int res;
570         struct fd_event *fde;
571         const char *domain_socket_name;
572
573         /* generate a name to use for our local socket */
574         ctdb->daemon.name = talloc_asprintf(ctdb, "%s.%s", CTDB_PATH, ctdb->address.address);
575         /* get rid of any old sockets */
576         unlink(ctdb->daemon.name);
577
578         /* create a unix domain stream socket to listen to */
579         res = ux_socket_bind(ctdb);
580         if (res!=0) {
581                 DEBUG(0,(__location__ " Failed to open CTDB unix domain socket\n"));
582                 exit(10);
583         }
584
585         res = pipe(&fd[0]);
586         if (res) {
587                 DEBUG(0,(__location__ " Failed to open pipe for CTDB\n"));
588                 exit(1);
589         }
590         pid = fork();
591         if (pid==-1) {
592                 DEBUG(0,(__location__ " Failed to fork CTDB daemon\n"));
593                 exit(1);
594         }
595
596         if (pid) {
597                 close(fd[0]);
598                 close(ctdb->daemon.sd);
599                 ctdb->daemon.sd = -1;
600                 return 0;
601         }
602
603         block_signal(SIGPIPE);
604
605         /* ensure the socket is deleted on exit of the daemon */
606         domain_socket_name = talloc_strdup(talloc_autofree_context(), ctdb->daemon.name);
607         talloc_set_destructor(domain_socket_name, unlink_destructor);   
608         
609         close(fd[1]);
610
611         ctdb->ev = event_context_init(NULL);
612         fde = event_add_fd(ctdb->ev, ctdb, fd[0], EVENT_FD_READ, ctdb_read_from_parent, &fd[0]);
613         fde = event_add_fd(ctdb->ev, ctdb, ctdb->daemon.sd, EVENT_FD_READ, ctdb_accept_client, ctdb);
614         ctdb_main_loop(ctdb);
615
616         return 0;
617 }
618
619 /*
620   allocate a packet for use in client<->daemon communication
621  */
622 void *ctdbd_allocate_pkt(TALLOC_CTX *mem_ctx, size_t len)
623 {
624         int size;
625
626         size = (len+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
627         return talloc_size(mem_ctx, size);
628 }
629
630 /*
631   called when a CTDB_REQ_FINISHED packet comes in
632 */
633 void ctdb_request_finished(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
634 {
635         ctdb->num_finished++;
636 }