merged tridge's branch
[samba.git] / ctdb / common / ctdb_daemon.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2006
5
6    This library is free software; you can redistribute it and/or
7    modify it under the terms of the GNU Lesser General Public
8    License as published by the Free Software Foundation; either
9    version 2 of the License, or (at your option) any later version.
10
11    This library is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14    Lesser General Public License for more details.
15
16    You should have received a copy of the GNU Lesser General Public
17    License along with this library; if not, write to the Free Software
18    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
19 */
20
21 #include "includes.h"
22 #include "db_wrap.h"
23 #include "lib/tdb/include/tdb.h"
24 #include "lib/events/events.h"
25 #include "lib/util/dlinklist.h"
26 #include "system/network.h"
27 #include "system/filesys.h"
28 #include "../include/ctdb.h"
29 #include "../include/ctdb_private.h"
30
31 static void ctdb_main_loop(struct ctdb_context *ctdb)
32 {
33         ctdb->methods->start(ctdb);
34
35         /* go into a wait loop to allow other nodes to complete */
36         event_loop_wait(ctdb->ev);
37
38         printf("event_loop_wait() returned. this should not happen\n");
39         exit(1);
40 }
41
42
43 static void set_non_blocking(int fd)
44 {
45         unsigned v;
46         v = fcntl(fd, F_GETFL, 0);
47         fcntl(fd, F_SETFL, v | O_NONBLOCK);
48 }
49
50
51 /*
52   structure describing a connected client in the daemon
53  */
54 struct ctdb_client {
55         struct ctdb_context *ctdb;
56         int fd;
57         struct ctdb_queue *queue;
58 };
59
60
61 /*
62   message handler for when we are in daemon mode. This redirects the message
63   to the right client
64  */
65 static void daemon_message_handler(struct ctdb_context *ctdb, uint32_t srvid, 
66                                     TDB_DATA data, void *private_data)
67 {
68         struct ctdb_client *client = talloc_get_type(private_data, struct ctdb_client);
69         struct ctdb_req_message *r;
70         int len;
71
72         /* construct a message to send to the client containing the data */
73         len = offsetof(struct ctdb_req_message, data) + data.dsize;
74         r = ctdbd_allocate_pkt(ctdb, len);
75
76 /*XXX cant use this since it returns an int     CTDB_NO_MEMORY(ctdb, r);*/
77         talloc_set_name_const(r, "req_message packet");
78
79         ZERO_STRUCT(*r);
80
81         r->hdr.length    = len;
82         r->hdr.ctdb_magic = CTDB_MAGIC;
83         r->hdr.ctdb_version = CTDB_VERSION;
84         r->hdr.operation = CTDB_REQ_MESSAGE;
85         r->srvid         = srvid;
86         r->datalen       = data.dsize;
87         memcpy(&r->data[0], data.dptr, data.dsize);
88         
89         ctdb_queue_send(client->queue, (uint8_t *)&r->hdr, len);
90
91         talloc_free(r);
92         return;
93 }
94                                            
95
96 /*
97   this is called when the ctdb daemon received a ctdb request to 
98   set the srvid from the client
99  */
100 static void daemon_request_register_message_handler(struct ctdb_client *client, 
101                                                     struct ctdb_req_register *c)
102 {
103         int res;
104         res = ctdb_register_message_handler(client->ctdb, client, 
105                                             c->srvid, daemon_message_handler, 
106                                             client);
107         if (res != 0) {
108                 printf("Failed to register handler %u in daemon\n", c->srvid);
109         }
110 }
111
112
113 static struct ctdb_call_state *ctdb_fetch_lock_send(struct ctdb_db_context *ctdb_db, 
114                                                     TALLOC_CTX *mem_ctx, 
115                                                     TDB_DATA key, TDB_DATA *data)
116 {
117         struct ctdb_call *call;
118         struct ctdb_record_handle *rec;
119         struct ctdb_call_state *state;
120
121         rec = talloc(mem_ctx, struct ctdb_record_handle);
122         CTDB_NO_MEMORY_NULL(ctdb_db->ctdb, rec);
123
124         
125         call = talloc(rec, struct ctdb_call);
126         ZERO_STRUCT(*call);
127         call->call_id = CTDB_FETCH_FUNC;
128         call->key = key;
129         call->flags = CTDB_IMMEDIATE_MIGRATION;
130
131
132         rec->ctdb_db = ctdb_db;
133         rec->key = key;
134         rec->key.dptr = talloc_memdup(rec, key.dptr, key.dsize);
135         rec->data = data;
136
137         state = ctdb_call_send(ctdb_db, call);
138         state->fetch_private = rec;
139
140         return state;
141 }
142
143 struct client_fetch_lock_data {
144         struct ctdb_client *client;
145         uint32_t reqid;
146 };
147 static void daemon_fetch_lock_complete(struct ctdb_call_state *state)
148 {
149         struct ctdb_reply_fetch_lock *r;
150         struct client_fetch_lock_data *data = talloc_get_type(state->async.private_data, struct client_fetch_lock_data);
151         struct ctdb_client *client = talloc_get_type(data->client, struct ctdb_client);
152         int length, res;
153
154         length = offsetof(struct ctdb_reply_fetch_lock, data) + state->call.reply_data.dsize;
155         r = ctdbd_allocate_pkt(client->ctdb, length);
156         if (r == NULL) {
157                 printf("Failed to allocate reply_call in ctdb daemon\n");
158                 return;
159         }
160         ZERO_STRUCT(*r);
161         r->hdr.length       = length;
162         r->hdr.ctdb_magic   = CTDB_MAGIC;
163         r->hdr.ctdb_version = CTDB_VERSION;
164         r->hdr.operation    = CTDB_REPLY_FETCH_LOCK;
165         r->hdr.reqid        = data->reqid;
166         r->state            = state->state;
167         r->datalen          = state->call.reply_data.dsize;
168         memcpy(&r->data[0], state->call.reply_data.dptr, r->datalen);
169
170         res = ctdb_queue_send(client->queue, (uint8_t *)&r->hdr, r->hdr.length);
171         if (res != 0) {
172                 printf("Failed to queue packet from daemon to client\n");
173         }
174         talloc_free(r);
175 }
176
177 /*
178   called when the daemon gets a fetch lock request from a client
179  */
180 static void daemon_request_fetch_lock(struct ctdb_client *client, 
181                                         struct ctdb_req_fetch_lock *f)
182 {
183         struct ctdb_call_state *state;
184         TDB_DATA key, *data;
185         struct ctdb_db_context *ctdb_db;
186         struct client_fetch_lock_data *fl_data;
187
188         ctdb_db = find_ctdb_db(client->ctdb, f->db_id);
189
190         key.dsize = f->keylen;
191         key.dptr = &f->key[0];
192
193         data        = talloc(client, TDB_DATA);
194         data->dptr  = NULL;
195         data->dsize = 0;
196
197         state = ctdb_fetch_lock_send(ctdb_db, client, key, data);
198         talloc_steal(state, data);
199
200         fl_data = talloc(state, struct client_fetch_lock_data);
201         fl_data->client = client;
202         fl_data->reqid  = f->hdr.reqid;
203         state->async.fn = daemon_fetch_lock_complete;
204         state->async.private_data = fl_data;
205 }
206
207 /*
208   called when the daemon gets a store unlock request from a client
209
210   this would never block?
211  */
212 static void daemon_request_store_unlock(struct ctdb_client *client, 
213                                         struct ctdb_req_store_unlock *f)
214 {
215         struct ctdb_db_context *ctdb_db;
216         struct ctdb_reply_store_unlock r;
217         uint32_t caller = ctdb_get_vnn(client->ctdb);
218         struct ctdb_ltdb_header header;
219         TDB_DATA key, data;
220         int res;
221
222         ctdb_db = find_ctdb_db(client->ctdb, f->db_id);
223
224         /* write the data to ltdb */
225         key.dsize = f->keylen;
226         key.dptr  = &f->data[0];
227         res = ctdb_ltdb_fetch(ctdb_db, key, &header, NULL, NULL);
228         if (res) {
229                 ctdb_set_error(ctdb_db->ctdb, "Fetch of locally held record failed");
230                 res = -1;
231                 goto done;
232         }
233         if (header.laccessor != caller) {
234                 header.lacount = 0;
235         }
236         header.laccessor = caller;
237         header.lacount++;
238         data.dsize = f->datalen;
239         data.dptr  = &f->data[f->keylen];
240         res = ctdb_ltdb_store(ctdb_db, key, &header, data);
241         if ( res != 0) {
242                 ctdb_set_error(ctdb_db->ctdb, "ctdb_call tdb_store failed\n");
243         }
244
245
246 done:
247         /* now send the reply */
248         ZERO_STRUCT(r);
249
250         r.hdr.length     = sizeof(r);
251         r.hdr.ctdb_magic = CTDB_MAGIC;
252         r.hdr.ctdb_version = CTDB_VERSION;
253         r.hdr.operation  = CTDB_REPLY_STORE_UNLOCK;
254         r.hdr.reqid      = f->hdr.reqid;
255         r.state          = res;
256         
257         res = ctdb_queue_send(client->queue, (uint8_t *)&r.hdr, r.hdr.length);
258         if (res != 0) {
259                 printf("Failed to queue a store unlock response\n");
260                 return;
261         }
262 }
263
264 /*
265   called when the daemon gets a connect wait request from a client
266  */
267 static void daemon_request_connect_wait(struct ctdb_client *client, 
268                                         struct ctdb_req_connect_wait *c)
269 {
270         struct ctdb_reply_connect_wait r;
271         int res;
272
273         /* first wait - in the daemon */
274         ctdb_daemon_connect_wait(client->ctdb);
275
276         /* now send the reply */
277         ZERO_STRUCT(r);
278
279         r.hdr.length     = sizeof(r);
280         r.hdr.ctdb_magic = CTDB_MAGIC;
281         r.hdr.ctdb_version = CTDB_VERSION;
282         r.hdr.operation = CTDB_REPLY_CONNECT_WAIT;
283         r.vnn           = ctdb_get_vnn(client->ctdb);
284         r.num_connected = client->ctdb->num_connected;
285         
286         res = ctdb_queue_send(client->queue, (uint8_t *)&r.hdr, r.hdr.length);
287         if (res != 0) {
288                 printf("Failed to queue a connect wait response\n");
289                 return;
290         }
291 }
292
293 /*
294   destroy a ctdb_client
295 */
296 static int ctdb_client_destructor(struct ctdb_client *client)
297 {
298         close(client->fd);
299         client->fd = -1;
300         return 0;
301 }
302
303
304 /*
305   this is called when the ctdb daemon received a ctdb request message
306   from a local client over the unix domain socket
307  */
308 static void daemon_request_message_from_client(struct ctdb_client *client, 
309                                                struct ctdb_req_message *c)
310 {
311         TDB_DATA data;
312         int res;
313
314         /* maybe the message is for another client on this node */
315         if (ctdb_get_vnn(client->ctdb)==c->hdr.destnode) {
316                 ctdb_request_message(client->ctdb, (struct ctdb_req_header *)c);
317                 return;
318         }
319         
320         /* its for a remote node */
321         data.dptr = &c->data[0];
322         data.dsize = c->datalen;
323         res = ctdb_daemon_send_message(client->ctdb, c->hdr.destnode,
324                                        c->srvid, data);
325         if (res != 0) {
326                 printf("Failed to send message to remote node %u\n",
327                        c->hdr.destnode);
328         }
329 }
330
331 /*
332   this is called when the ctdb daemon received a ctdb request call
333   from a local client over the unix domain socket
334  */
335 static void daemon_request_call_from_client(struct ctdb_client *client, 
336                                             struct ctdb_req_call *c)
337 {
338         struct ctdb_call_state *state;
339         struct ctdb_db_context *ctdb_db;
340         struct ctdb_call call;
341         struct ctdb_reply_call *r;
342         int res;
343         uint32_t length;
344
345         ctdb_db = find_ctdb_db(client->ctdb, c->db_id);
346         if (!ctdb_db) {
347                 printf("Unknown database in request. db_id==0x%08x",c->db_id);
348                 return;
349         }
350
351         ZERO_STRUCT(call);
352         call.call_id = c->callid;
353         call.key.dptr = c->data;
354         call.key.dsize = c->keylen;
355         call.call_data.dptr = c->data + c->keylen;
356         call.call_data.dsize = c->calldatalen;
357
358         state = ctdb_call_send(ctdb_db, &call);
359
360 /* XXX this must be converted to fully async */
361         res = ctdb_call_recv(state, &call);
362         if (res != 0) {
363                 printf("ctdbd_call_recv() returned error\n");
364                 exit(1);
365         }
366
367         length = offsetof(struct ctdb_reply_call, data) + call.reply_data.dsize;
368         r = ctdbd_allocate_pkt(client->ctdb, length);
369         if (r == NULL) {
370                 printf("Failed to allocate reply_call in ctdb daemon\n");
371                 return;
372         }
373         ZERO_STRUCT(*r);
374         r->hdr.length       = length;
375         r->hdr.ctdb_magic   = CTDB_MAGIC;
376         r->hdr.ctdb_version = CTDB_VERSION;
377         r->hdr.operation    = CTDB_REPLY_CALL;
378         r->hdr.reqid        = c->hdr.reqid;
379         r->datalen          = call.reply_data.dsize;
380         memcpy(&r->data[0], call.reply_data.dptr, r->datalen);
381
382         res = ctdb_queue_send(client->queue, (uint8_t *)&r, r->hdr.length);
383         if (res != 0) {
384                 printf("Failed to queue packet from daemon to client\n");
385         }
386         talloc_free(r);
387 }
388
389
390 /* data contains a packet from the client */
391 static void client_incoming_packet(struct ctdb_client *client, void *data, size_t nread)
392 {
393         struct ctdb_req_header *hdr = data;
394
395         if (hdr->ctdb_magic != CTDB_MAGIC) {
396                 ctdb_set_error(client->ctdb, "Non CTDB packet rejected\n");
397                 goto done;
398         }
399
400         if (hdr->ctdb_version != CTDB_VERSION) {
401                 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected\n", hdr->ctdb_version);
402                 goto done;
403         }
404
405         switch (hdr->operation) {
406         case CTDB_REQ_CALL:
407                 daemon_request_call_from_client(client, (struct ctdb_req_call *)hdr);
408                 break;
409
410         case CTDB_REQ_REGISTER:
411                 daemon_request_register_message_handler(client, 
412                                                         (struct ctdb_req_register *)hdr);
413                 break;
414         case CTDB_REQ_MESSAGE:
415                 daemon_request_message_from_client(client, (struct ctdb_req_message *)hdr);
416                 break;
417
418         case CTDB_REQ_CONNECT_WAIT:
419                 daemon_request_connect_wait(client, (struct ctdb_req_connect_wait *)hdr);
420                 break;
421         case CTDB_REQ_FETCH_LOCK:
422                 daemon_request_fetch_lock(client, (struct ctdb_req_fetch_lock *)hdr);
423                 break;
424         case CTDB_REQ_STORE_UNLOCK:
425                 daemon_request_store_unlock(client, (struct ctdb_req_store_unlock *)hdr);
426                 break;
427         default:
428                 printf("daemon: unrecognized operation:%d\n",hdr->operation);
429         }
430
431 done:
432         talloc_free(data);
433 }
434
435
436 static void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args)
437 {
438         struct ctdb_client *client = talloc_get_type(args, struct ctdb_client);
439         struct ctdb_req_header *hdr;
440
441         if (cnt == 0) {
442                 talloc_free(client);
443                 return;
444         }
445
446         if (cnt < sizeof(*hdr)) {
447                 ctdb_set_error(client->ctdb, "Bad packet length %d\n", cnt);
448                 return;
449         }
450         hdr = (struct ctdb_req_header *)data;
451         if (cnt != hdr->length) {
452                 ctdb_set_error(client->ctdb, "Bad header length %d expected %d\n", 
453                                hdr->length, cnt);
454                 return;
455         }
456
457         if (hdr->ctdb_magic != CTDB_MAGIC) {
458                 ctdb_set_error(client->ctdb, "Non CTDB packet rejected\n");
459                 return;
460         }
461
462         if (hdr->ctdb_version != CTDB_VERSION) {
463                 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected\n", hdr->ctdb_version);
464                 return;
465         }
466
467         /* it is the responsibility of the incoming packet function to free 'data' */
468         client_incoming_packet(client, data, cnt);
469 }
470
471 static void ctdb_accept_client(struct event_context *ev, struct fd_event *fde, 
472                          uint16_t flags, void *private_data)
473 {
474         struct sockaddr_in addr;
475         socklen_t len;
476         int fd;
477         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
478         struct ctdb_client *client;
479
480         memset(&addr, 0, sizeof(addr));
481         len = sizeof(addr);
482         fd = accept(ctdb->daemon.sd, (struct sockaddr *)&addr, &len);
483         if (fd == -1) {
484                 return;
485         }
486         set_non_blocking(fd);
487
488         client = talloc_zero(ctdb, struct ctdb_client);
489         client->ctdb = ctdb;
490         client->fd = fd;
491
492         client->queue = ctdb_queue_setup(ctdb, client, fd, CTDB_DS_ALIGNMENT, 
493                                          ctdb_client_read_cb, client);
494
495         talloc_set_destructor(client, ctdb_client_destructor);
496 }
497
498
499
500 static void ctdb_read_from_parent(struct event_context *ev, struct fd_event *fde, 
501                          uint16_t flags, void *private_data)
502 {
503         int *fd = private_data;
504         int cnt;
505         char buf;
506
507         /* XXX this is a good place to try doing some cleaning up before exiting */
508         cnt = read(*fd, &buf, 1);
509         if (cnt==0) {
510                 printf("parent process exited. filedescriptor dissappeared\n");
511                 exit(1);
512         } else {
513                 printf("ctdb: did not expect data from parent process\n");
514                 exit(1);
515         }
516 }
517
518
519
520 /*
521   create a unix domain socket and bind it
522   return a file descriptor open on the socket 
523 */
524 static int ux_socket_bind(struct ctdb_context *ctdb)
525 {
526         struct sockaddr_un addr;
527
528         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
529         if (ctdb->daemon.sd == -1) {
530                 ctdb->daemon.sd = -1;
531                 return -1;
532         }
533
534         set_non_blocking(ctdb->daemon.sd);
535
536         memset(&addr, 0, sizeof(addr));
537         addr.sun_family = AF_UNIX;
538         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
539
540         if (bind(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
541                 close(ctdb->daemon.sd);
542                 ctdb->daemon.sd = -1;
543                 return -1;
544         }       
545         listen(ctdb->daemon.sd, 1);
546
547         return 0;
548 }
549
550 /*
551   delete the socket on exit - called on destruction of autofree context
552  */
553 static int unlink_destructor(const char *name)
554 {
555         unlink(name);
556         return 0;
557 }
558
559 /*
560   start the protocol going
561 */
562 int ctdbd_start(struct ctdb_context *ctdb)
563 {
564         pid_t pid;
565         static int fd[2];
566         int res;
567         struct fd_event *fde;
568         const char *domain_socket_name;
569
570         /* generate a name to use for our local socket */
571         ctdb->daemon.name = talloc_asprintf(ctdb, "%s.%s", CTDB_PATH, ctdb->address.address);
572         /* get rid of any old sockets */
573         unlink(ctdb->daemon.name);
574
575         /* create a unix domain stream socket to listen to */
576         res = ux_socket_bind(ctdb);
577         if (res!=0) {
578                 printf("Failed to open CTDB unix domain socket\n");
579                 exit(10);
580         }
581
582         res = pipe(&fd[0]);
583         if (res) {
584                 printf("Failed to open pipe for CTDB\n");
585                 exit(1);
586         }
587         pid = fork();
588         if (pid==-1) {
589                 printf("Failed to fork CTDB daemon\n");
590                 exit(1);
591         }
592
593         if (pid) {
594                 close(fd[0]);
595                 close(ctdb->daemon.sd);
596                 ctdb->daemon.sd = -1;
597                 return 0;
598         }
599
600         /* ensure the socket is deleted on exit of the daemon */
601         domain_socket_name = talloc_strdup(talloc_autofree_context(), ctdb->daemon.name);
602         talloc_set_destructor(domain_socket_name, unlink_destructor);   
603         
604         close(fd[1]);
605         ctdb_clear_flags(ctdb, CTDB_FLAG_DAEMON_MODE);
606         ctdb->ev = event_context_init(NULL);
607         fde = event_add_fd(ctdb->ev, ctdb, fd[0], EVENT_FD_READ, ctdb_read_from_parent, &fd[0]);
608         fde = event_add_fd(ctdb->ev, ctdb, ctdb->daemon.sd, EVENT_FD_READ, ctdb_accept_client, ctdb);
609         ctdb_main_loop(ctdb);
610
611         return 0;
612 }
613
614 /*
615   allocate a packet for use in client<->daemon communication
616  */
617 void *ctdbd_allocate_pkt(struct ctdb_context *ctdb, size_t len)
618 {
619         int size;
620
621         size = (len+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
622         return talloc_size(ctdb, size);
623 }
624
625 int ctdb_daemon_set_message_handler(struct ctdb_context *ctdb, uint32_t srvid, 
626                              ctdb_message_fn_t handler,
627                              void *private_data)
628 {
629         return ctdb_register_message_handler(ctdb, ctdb, srvid, handler, private_data);
630 }
631