private -> private_data for samba3
[samba.git] / ctdb / common / ctdb_daemon.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2006
5
6    This library is free software; you can redistribute it and/or
7    modify it under the terms of the GNU Lesser General Public
8    License as published by the Free Software Foundation; either
9    version 2 of the License, or (at your option) any later version.
10
11    This library is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14    Lesser General Public License for more details.
15
16    You should have received a copy of the GNU Lesser General Public
17    License along with this library; if not, write to the Free Software
18    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
19 */
20
21 #include "includes.h"
22 #include "db_wrap.h"
23 #include "lib/tdb/include/tdb.h"
24 #include "lib/events/events.h"
25 #include "lib/util/dlinklist.h"
26 #include "system/network.h"
27 #include "system/filesys.h"
28 #include "../include/ctdb.h"
29 #include "../include/ctdb_private.h"
30
31 static void ctdb_main_loop(struct ctdb_context *ctdb)
32 {
33         ctdb->methods->start(ctdb);
34
35         /* go into a wait loop to allow other nodes to complete */
36         event_loop_wait(ctdb->ev);
37
38         printf("event_loop_wait() returned. this should not happen\n");
39         exit(1);
40 }
41
42
43 static void set_non_blocking(int fd)
44 {
45         unsigned v;
46         v = fcntl(fd, F_GETFL, 0);
47         fcntl(fd, F_SETFL, v | O_NONBLOCK);
48 }
49
50
51 /*
52   structure describing a connected client in the daemon
53  */
54 struct ctdb_client {
55         struct ctdb_context *ctdb;
56         int fd;
57         struct ctdb_queue *queue;
58 };
59
60
61 /*
62   message handler for when we are in daemon mode. This redirects the message
63   to the right client
64  */
65 static void daemon_message_handler(struct ctdb_context *ctdb, uint32_t srvid, 
66                                     TDB_DATA data, void *private_data)
67 {
68         struct ctdb_client *client = talloc_get_type(private_data, struct ctdb_client);
69         struct ctdb_req_message *r;
70         int len;
71
72         /* construct a message to send to the client containing the data */
73         len = offsetof(struct ctdb_req_message, data) + data.dsize;
74         r = ctdbd_allocate_pkt(ctdb, len);
75
76 /*XXX cant use this since it returns an int     CTDB_NO_MEMORY(ctdb, r);*/
77         talloc_set_name_const(r, "req_message packet");
78
79         ZERO_STRUCT(*r);
80
81         r->hdr.length    = len;
82         r->hdr.ctdb_magic = CTDB_MAGIC;
83         r->hdr.ctdb_version = CTDB_VERSION;
84         r->hdr.operation = CTDB_REQ_MESSAGE;
85         r->srvid         = srvid;
86         r->datalen       = data.dsize;
87         memcpy(&r->data[0], data.dptr, data.dsize);
88         
89         ctdb_queue_send(client->queue, (uint8_t *)&r->hdr, len);
90
91         talloc_free(r);
92         return;
93 }
94                                            
95
96 /*
97   this is called when the ctdb daemon received a ctdb request to 
98   set the srvid from the client
99  */
100 static void daemon_request_register_message_handler(struct ctdb_client *client, 
101                                                     struct ctdb_req_register *c)
102 {
103         int res;
104         res = ctdb_register_message_handler(client->ctdb, client, 
105                                             c->srvid, daemon_message_handler, 
106                                             client);
107         if (res != 0) {
108                 printf("Failed to register handler %u in daemon\n", c->srvid);
109         }
110 }
111
112
113 static struct ctdb_call_state *ctdb_fetch_lock_send(struct ctdb_db_context *ctdb_db, 
114                                                     TALLOC_CTX *mem_ctx, 
115                                                     TDB_DATA key, TDB_DATA *data)
116 {
117         struct ctdb_call *call;
118         struct ctdb_record_handle *rec;
119         struct ctdb_call_state *state;
120
121         rec = talloc(mem_ctx, struct ctdb_record_handle);
122         CTDB_NO_MEMORY_NULL(ctdb_db->ctdb, rec);
123
124         
125         call = talloc(rec, struct ctdb_call);
126         ZERO_STRUCT(*call);
127         call->call_id = CTDB_FETCH_FUNC;
128         call->key = key;
129         call->flags = CTDB_IMMEDIATE_MIGRATION;
130
131
132         rec->ctdb_db = ctdb_db;
133         rec->key = key;
134         rec->key.dptr = talloc_memdup(rec, key.dptr, key.dsize);
135         rec->data = data;
136
137         state = ctdb_call_send(ctdb_db, call);
138         state->fetch_private = rec;
139
140         return state;
141 }
142
143 struct client_fetch_lock_data {
144         struct ctdb_client *client;
145         uint32_t reqid;
146 };
147 static void daemon_fetch_lock_complete(struct ctdb_call_state *state)
148 {
149         struct ctdb_reply_fetch_lock *r;
150         struct client_fetch_lock_data *data = talloc_get_type(state->async.private_data, struct client_fetch_lock_data);
151         struct ctdb_client *client = talloc_get_type(data->client, struct ctdb_client);
152         int length, res;
153
154         length = offsetof(struct ctdb_reply_fetch_lock, data) + state->call.reply_data.dsize;
155         r = ctdbd_allocate_pkt(client->ctdb, length);
156         if (r == NULL) {
157                 printf("Failed to allocate reply_call in ctdb daemon\n");
158                 return;
159         }
160         ZERO_STRUCT(*r);
161         r->hdr.length       = length;
162         r->hdr.ctdb_magic   = CTDB_MAGIC;
163         r->hdr.ctdb_version = CTDB_VERSION;
164         r->hdr.operation    = CTDB_REPLY_FETCH_LOCK;
165         r->hdr.reqid        = data->reqid;
166         r->state            = state->state;
167         r->datalen          = state->call.reply_data.dsize;
168         memcpy(&r->data[0], state->call.reply_data.dptr, r->datalen);
169
170         res = ctdb_queue_send(client->queue, (uint8_t *)&r->hdr, r->hdr.length);
171         if (res != 0) {
172                 printf("Failed to queue packet from daemon to client\n");
173         }
174         talloc_free(r);
175 }
176
177 /*
178   called when the daemon gets a fetch lock request from a client
179  */
180 static void daemon_request_fetch_lock(struct ctdb_client *client, 
181                                         struct ctdb_req_fetch_lock *f)
182 {
183         struct ctdb_call_state *state;
184         TDB_DATA key, *data;
185         struct ctdb_db_context *ctdb_db;
186         struct client_fetch_lock_data *fl_data;
187
188         ctdb_db = find_ctdb_db(client->ctdb, f->db_id);
189
190         key.dsize = f->keylen;
191         key.dptr = &f->key[0];
192
193         data        = talloc(client, TDB_DATA);
194         data->dptr  = NULL;
195         data->dsize = 0;
196
197         state = ctdb_fetch_lock_send(ctdb_db, client, key, data);
198         talloc_steal(state, data);
199
200         fl_data = talloc(state, struct client_fetch_lock_data);
201         fl_data->client = client;
202         fl_data->reqid  = f->hdr.reqid;
203         state->async.fn = daemon_fetch_lock_complete;
204         state->async.private_data = fl_data;
205 }
206
207 /*
208   called when the daemon gets a store unlock request from a client
209
210   this would never block?
211  */
212 static void daemon_request_store_unlock(struct ctdb_client *client, 
213                                         struct ctdb_req_store_unlock *f)
214 {
215         struct ctdb_db_context *ctdb_db;
216         struct ctdb_reply_store_unlock r;
217         int res;
218
219         ctdb_db = find_ctdb_db(client->ctdb, f->db_id);
220         /* write the data to ltdb */
221 /*XXX*/
222
223         /* now send the reply */
224         ZERO_STRUCT(r);
225
226         r.hdr.length     = sizeof(r);
227         r.hdr.ctdb_magic = CTDB_MAGIC;
228         r.hdr.ctdb_version = CTDB_VERSION;
229         r.hdr.operation  = CTDB_REPLY_STORE_UNLOCK;
230         r.hdr.reqid      = f->hdr.reqid;
231         r.state          = CTDB_CALL_DONE;
232         
233         res = ctdb_queue_send(client->queue, (uint8_t *)&r.hdr, r.hdr.length);
234         if (res != 0) {
235                 printf("Failed to queue a store unlock response\n");
236                 return;
237         }
238 }
239
240 /*
241   called when the daemon gets a connect wait request from a client
242  */
243 static void daemon_request_connect_wait(struct ctdb_client *client, 
244                                         struct ctdb_req_connect_wait *c)
245 {
246         struct ctdb_reply_connect_wait r;
247         int res;
248
249         /* first wait - in the daemon */
250         ctdb_daemon_connect_wait(client->ctdb);
251
252         /* now send the reply */
253         ZERO_STRUCT(r);
254
255         r.hdr.length     = sizeof(r);
256         r.hdr.ctdb_magic = CTDB_MAGIC;
257         r.hdr.ctdb_version = CTDB_VERSION;
258         r.hdr.operation = CTDB_REPLY_CONNECT_WAIT;
259         r.vnn           = ctdb_get_vnn(client->ctdb);
260         r.num_connected = client->ctdb->num_connected;
261         
262         res = ctdb_queue_send(client->queue, (uint8_t *)&r.hdr, r.hdr.length);
263         if (res != 0) {
264                 printf("Failed to queue a connect wait response\n");
265                 return;
266         }
267 }
268
269 /*
270   destroy a ctdb_client
271 */
272 static int ctdb_client_destructor(struct ctdb_client *client)
273 {
274         close(client->fd);
275         client->fd = -1;
276         return 0;
277 }
278
279
280 /*
281   this is called when the ctdb daemon received a ctdb request message
282   from a local client over the unix domain socket
283  */
284 static void daemon_request_message_from_client(struct ctdb_client *client, 
285                                                struct ctdb_req_message *c)
286 {
287         TDB_DATA data;
288         int res;
289
290         /* maybe the message is for another client on this node */
291         if (ctdb_get_vnn(client->ctdb)==c->hdr.destnode) {
292                 ctdb_request_message(client->ctdb, (struct ctdb_req_header *)c);
293                 return;
294         }
295         
296         /* its for a remote node */
297         data.dptr = &c->data[0];
298         data.dsize = c->datalen;
299         res = ctdb_daemon_send_message(client->ctdb, c->hdr.destnode,
300                                        c->srvid, data);
301         if (res != 0) {
302                 printf("Failed to send message to remote node %u\n",
303                        c->hdr.destnode);
304         }
305 }
306
307 /*
308   this is called when the ctdb daemon received a ctdb request call
309   from a local client over the unix domain socket
310  */
311 static void daemon_request_call_from_client(struct ctdb_client *client, 
312                                             struct ctdb_req_call *c)
313 {
314         struct ctdb_call_state *state;
315         struct ctdb_db_context *ctdb_db;
316         struct ctdb_call call;
317         struct ctdb_reply_call *r;
318         int res;
319         uint32_t length;
320
321         ctdb_db = find_ctdb_db(client->ctdb, c->db_id);
322         if (!ctdb_db) {
323                 printf("Unknown database in request. db_id==0x%08x",c->db_id);
324                 return;
325         }
326
327         ZERO_STRUCT(call);
328         call.call_id = c->callid;
329         call.key.dptr = c->data;
330         call.key.dsize = c->keylen;
331         call.call_data.dptr = c->data + c->keylen;
332         call.call_data.dsize = c->calldatalen;
333
334         state = ctdb_call_send(ctdb_db, &call);
335
336 /* XXX this must be converted to fully async */
337         res = ctdb_call_recv(state, &call);
338         if (res != 0) {
339                 printf("ctdbd_call_recv() returned error\n");
340                 exit(1);
341         }
342
343         length = offsetof(struct ctdb_reply_call, data) + call.reply_data.dsize;
344         r = ctdbd_allocate_pkt(client->ctdb, length);
345         if (r == NULL) {
346                 printf("Failed to allocate reply_call in ctdb daemon\n");
347                 return;
348         }
349         ZERO_STRUCT(*r);
350         r->hdr.length       = length;
351         r->hdr.ctdb_magic   = CTDB_MAGIC;
352         r->hdr.ctdb_version = CTDB_VERSION;
353         r->hdr.operation    = CTDB_REPLY_CALL;
354         r->hdr.reqid        = c->hdr.reqid;
355         r->datalen          = call.reply_data.dsize;
356         memcpy(&r->data[0], call.reply_data.dptr, r->datalen);
357
358         res = ctdb_queue_send(client->queue, (uint8_t *)&r, r->hdr.length);
359         if (res != 0) {
360                 printf("Failed to queue packet from daemon to client\n");
361         }
362         talloc_free(r);
363 }
364
365
366 /* data contains a packet from the client */
367 static void client_incoming_packet(struct ctdb_client *client, void *data, size_t nread)
368 {
369         struct ctdb_req_header *hdr = data;
370
371         if (hdr->ctdb_magic != CTDB_MAGIC) {
372                 ctdb_set_error(client->ctdb, "Non CTDB packet rejected\n");
373                 goto done;
374         }
375
376         if (hdr->ctdb_version != CTDB_VERSION) {
377                 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected\n", hdr->ctdb_version);
378                 goto done;
379         }
380
381         switch (hdr->operation) {
382         case CTDB_REQ_CALL:
383                 daemon_request_call_from_client(client, (struct ctdb_req_call *)hdr);
384                 break;
385
386         case CTDB_REQ_REGISTER:
387                 daemon_request_register_message_handler(client, 
388                                                         (struct ctdb_req_register *)hdr);
389                 break;
390         case CTDB_REQ_MESSAGE:
391                 daemon_request_message_from_client(client, (struct ctdb_req_message *)hdr);
392                 break;
393
394         case CTDB_REQ_CONNECT_WAIT:
395                 daemon_request_connect_wait(client, (struct ctdb_req_connect_wait *)hdr);
396                 break;
397         case CTDB_REQ_FETCH_LOCK:
398                 daemon_request_fetch_lock(client, (struct ctdb_req_fetch_lock *)hdr);
399                 break;
400         case CTDB_REQ_STORE_UNLOCK:
401                 daemon_request_store_unlock(client, (struct ctdb_req_store_unlock *)hdr);
402                 break;
403         default:
404                 printf("daemon: unrecognized operation:%d\n",hdr->operation);
405         }
406
407 done:
408         talloc_free(data);
409 }
410
411
412 static void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args)
413 {
414         struct ctdb_client *client = talloc_get_type(args, struct ctdb_client);
415         struct ctdb_req_header *hdr;
416
417         if (cnt == 0) {
418                 talloc_free(client);
419                 return;
420         }
421
422         if (cnt < sizeof(*hdr)) {
423                 ctdb_set_error(client->ctdb, "Bad packet length %d\n", cnt);
424                 return;
425         }
426         hdr = (struct ctdb_req_header *)data;
427         if (cnt != hdr->length) {
428                 ctdb_set_error(client->ctdb, "Bad header length %d expected %d\n", 
429                                hdr->length, cnt);
430                 return;
431         }
432
433         if (hdr->ctdb_magic != CTDB_MAGIC) {
434                 ctdb_set_error(client->ctdb, "Non CTDB packet rejected\n");
435                 return;
436         }
437
438         if (hdr->ctdb_version != CTDB_VERSION) {
439                 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected\n", hdr->ctdb_version);
440                 return;
441         }
442
443         /* it is the responsibility of the incoming packet function to free 'data' */
444         client_incoming_packet(client, data, cnt);
445 }
446
447 static void ctdb_accept_client(struct event_context *ev, struct fd_event *fde, 
448                          uint16_t flags, void *private_data)
449 {
450         struct sockaddr_in addr;
451         socklen_t len;
452         int fd;
453         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
454         struct ctdb_client *client;
455
456         memset(&addr, 0, sizeof(addr));
457         len = sizeof(addr);
458         fd = accept(ctdb->daemon.sd, (struct sockaddr *)&addr, &len);
459         if (fd == -1) {
460                 return;
461         }
462         set_non_blocking(fd);
463
464         client = talloc_zero(ctdb, struct ctdb_client);
465         client->ctdb = ctdb;
466         client->fd = fd;
467
468         client->queue = ctdb_queue_setup(ctdb, client, fd, CTDB_DS_ALIGNMENT, 
469                                          ctdb_client_read_cb, client);
470
471         talloc_set_destructor(client, ctdb_client_destructor);
472 }
473
474
475
476 static void ctdb_read_from_parent(struct event_context *ev, struct fd_event *fde, 
477                          uint16_t flags, void *private_data)
478 {
479         int *fd = private_data;
480         int cnt;
481         char buf;
482
483         /* XXX this is a good place to try doing some cleaning up before exiting */
484         cnt = read(*fd, &buf, 1);
485         if (cnt==0) {
486                 printf("parent process exited. filedescriptor dissappeared\n");
487                 exit(1);
488         } else {
489                 printf("ctdb: did not expect data from parent process\n");
490                 exit(1);
491         }
492 }
493
494
495
496 /*
497   create a unix domain socket and bind it
498   return a file descriptor open on the socket 
499 */
500 static int ux_socket_bind(struct ctdb_context *ctdb)
501 {
502         struct sockaddr_un addr;
503
504         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
505         if (ctdb->daemon.sd == -1) {
506                 ctdb->daemon.sd = -1;
507                 return -1;
508         }
509
510         set_non_blocking(ctdb->daemon.sd);
511
512         memset(&addr, 0, sizeof(addr));
513         addr.sun_family = AF_UNIX;
514         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
515
516         if (bind(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
517                 close(ctdb->daemon.sd);
518                 ctdb->daemon.sd = -1;
519                 return -1;
520         }       
521         listen(ctdb->daemon.sd, 1);
522
523         return 0;
524 }
525
526 /*
527   delete the socket on exit - called on destruction of autofree context
528  */
529 static int unlink_destructor(const char *name)
530 {
531         unlink(name);
532         return 0;
533 }
534
535 /*
536   start the protocol going
537 */
538 int ctdbd_start(struct ctdb_context *ctdb)
539 {
540         pid_t pid;
541         static int fd[2];
542         int res;
543         struct fd_event *fde;
544         const char *domain_socket_name;
545
546         /* generate a name to use for our local socket */
547         ctdb->daemon.name = talloc_asprintf(ctdb, "%s.%s", CTDB_PATH, ctdb->address.address);
548         /* get rid of any old sockets */
549         unlink(ctdb->daemon.name);
550
551         /* create a unix domain stream socket to listen to */
552         res = ux_socket_bind(ctdb);
553         if (res!=0) {
554                 printf("Failed to open CTDB unix domain socket\n");
555                 exit(10);
556         }
557
558         res = pipe(&fd[0]);
559         if (res) {
560                 printf("Failed to open pipe for CTDB\n");
561                 exit(1);
562         }
563         pid = fork();
564         if (pid==-1) {
565                 printf("Failed to fork CTDB daemon\n");
566                 exit(1);
567         }
568
569         if (pid) {
570                 close(fd[0]);
571                 close(ctdb->daemon.sd);
572                 ctdb->daemon.sd = -1;
573                 return 0;
574         }
575
576         /* ensure the socket is deleted on exit of the daemon */
577         domain_socket_name = talloc_strdup(talloc_autofree_context(), ctdb->daemon.name);
578         talloc_set_destructor(domain_socket_name, unlink_destructor);   
579         
580         close(fd[1]);
581         ctdb_clear_flags(ctdb, CTDB_FLAG_DAEMON_MODE);
582         ctdb->ev = event_context_init(NULL);
583         fde = event_add_fd(ctdb->ev, ctdb, fd[0], EVENT_FD_READ, ctdb_read_from_parent, &fd[0]);
584         fde = event_add_fd(ctdb->ev, ctdb, ctdb->daemon.sd, EVENT_FD_READ, ctdb_accept_client, ctdb);
585         ctdb_main_loop(ctdb);
586
587         return 0;
588 }
589
590 /*
591   allocate a packet for use in client<->daemon communication
592  */
593 void *ctdbd_allocate_pkt(struct ctdb_context *ctdb, size_t len)
594 {
595         int size;
596
597         size = (len+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
598         return talloc_size(ctdb, size);
599 }
600
601 int ctdb_daemon_set_message_handler(struct ctdb_context *ctdb, uint32_t srvid, 
602                              ctdb_message_fn_t handler,
603                              void *private_data)
604 {
605         return ctdb_register_message_handler(ctdb, ctdb, srvid, handler, private_data);
606 }
607