initial change to remove store_unlock pdu and use tdb chainlock in the client
[samba.git] / ctdb / common / ctdb_daemon.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2006
5
6    This library is free software; you can redistribute it and/or
7    modify it under the terms of the GNU Lesser General Public
8    License as published by the Free Software Foundation; either
9    version 2 of the License, or (at your option) any later version.
10
11    This library is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14    Lesser General Public License for more details.
15
16    You should have received a copy of the GNU Lesser General Public
17    License along with this library; if not, write to the Free Software
18    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
19 */
20
21 #include "includes.h"
22 #include "db_wrap.h"
23 #include "lib/tdb/include/tdb.h"
24 #include "lib/events/events.h"
25 #include "lib/util/dlinklist.h"
26 #include "system/network.h"
27 #include "system/filesys.h"
28 #include "../include/ctdb.h"
29 #include "../include/ctdb_private.h"
30
31 static void ctdb_main_loop(struct ctdb_context *ctdb)
32 {
33         ctdb->methods->start(ctdb);
34
35         /* go into a wait loop to allow other nodes to complete */
36         event_loop_wait(ctdb->ev);
37
38         printf("event_loop_wait() returned. this should not happen\n");
39         exit(1);
40 }
41
42
43 static void set_non_blocking(int fd)
44 {
45         unsigned v;
46         v = fcntl(fd, F_GETFL, 0);
47         fcntl(fd, F_SETFL, v | O_NONBLOCK);
48 }
49
50
51 /*
52   structure describing a connected client in the daemon
53  */
54 struct ctdb_client {
55         struct ctdb_context *ctdb;
56         int fd;
57         struct ctdb_queue *queue;
58 };
59
60
61 /*
62   message handler for when we are in daemon mode. This redirects the message
63   to the right client
64  */
65 static void daemon_message_handler(struct ctdb_context *ctdb, uint32_t srvid, 
66                                     TDB_DATA data, void *private_data)
67 {
68         struct ctdb_client *client = talloc_get_type(private_data, struct ctdb_client);
69         struct ctdb_req_message *r;
70         int len;
71
72         /* construct a message to send to the client containing the data */
73         len = offsetof(struct ctdb_req_message, data) + data.dsize;
74         r = ctdbd_allocate_pkt(ctdb, len);
75
76 /*XXX cant use this since it returns an int     CTDB_NO_MEMORY(ctdb, r);*/
77         talloc_set_name_const(r, "req_message packet");
78
79         ZERO_STRUCT(*r);
80
81         r->hdr.length    = len;
82         r->hdr.ctdb_magic = CTDB_MAGIC;
83         r->hdr.ctdb_version = CTDB_VERSION;
84         r->hdr.operation = CTDB_REQ_MESSAGE;
85         r->srvid         = srvid;
86         r->datalen       = data.dsize;
87         memcpy(&r->data[0], data.dptr, data.dsize);
88
89         ctdb_queue_send(client->queue, (uint8_t *)&r->hdr, len);
90
91         talloc_free(r);
92         return;
93 }
94                                            
95
96 /*
97   this is called when the ctdb daemon received a ctdb request to 
98   set the srvid from the client
99  */
100 static void daemon_request_register_message_handler(struct ctdb_client *client, 
101                                                     struct ctdb_req_register *c)
102 {
103         int res;
104         res = ctdb_register_message_handler(client->ctdb, client, 
105                                             c->srvid, daemon_message_handler, 
106                                             client);
107         if (res != 0) {
108                 printf("Failed to register handler %u in daemon\n", c->srvid);
109         }
110 }
111
112
113 static struct ctdb_call_state *ctdb_fetch_lock_send(struct ctdb_db_context *ctdb_db, 
114                                                     TALLOC_CTX *mem_ctx, 
115                                                     TDB_DATA key, TDB_DATA *data)
116 {
117         struct ctdb_call *call;
118         struct ctdb_record_handle *rec;
119         struct ctdb_call_state *state;
120
121         rec = talloc(mem_ctx, struct ctdb_record_handle);
122         CTDB_NO_MEMORY_NULL(ctdb_db->ctdb, rec);
123
124         
125         call = talloc(rec, struct ctdb_call);
126         ZERO_STRUCT(*call);
127         call->call_id = CTDB_FETCH_FUNC;
128         call->key = key;
129         call->flags = CTDB_IMMEDIATE_MIGRATION;
130
131
132         rec->ctdb_db = ctdb_db;
133         rec->key = key;
134         rec->key.dptr = talloc_memdup(rec, key.dptr, key.dsize);
135         rec->data = data;
136
137         state = ctdb_call_send(ctdb_db, call);
138         state->fetch_private = rec;
139
140         return state;
141 }
142
143 struct client_fetch_lock_data {
144         struct ctdb_client *client;
145         uint32_t reqid;
146 };
147 static void daemon_fetch_lock_complete(struct ctdb_call_state *state)
148 {
149         struct ctdb_reply_fetch_lock *r;
150         struct client_fetch_lock_data *data = talloc_get_type(state->async.private_data, struct client_fetch_lock_data);
151         struct ctdb_client *client = talloc_get_type(data->client, struct ctdb_client);
152         int length, res;
153
154         length = offsetof(struct ctdb_reply_fetch_lock, data) + state->call.reply_data.dsize;
155         r = ctdbd_allocate_pkt(client->ctdb, length);
156         if (r == NULL) {
157                 printf("Failed to allocate reply_call in ctdb daemon\n");
158                 return;
159         }
160         ZERO_STRUCT(*r);
161         r->hdr.length       = length;
162         r->hdr.ctdb_magic   = CTDB_MAGIC;
163         r->hdr.ctdb_version = CTDB_VERSION;
164         r->hdr.operation    = CTDB_REPLY_FETCH_LOCK;
165         r->hdr.reqid        = data->reqid;
166         r->state            = state->state;
167         r->datalen          = state->call.reply_data.dsize;
168         memcpy(&r->data[0], state->call.reply_data.dptr, r->datalen);
169
170         res = ctdb_queue_send(client->queue, (uint8_t *)&r->hdr, r->hdr.length);
171         if (res != 0) {
172                 printf("Failed to queue packet from daemon to client\n");
173         }
174         talloc_free(r);
175 }
176
177 /*
178   called when the daemon gets a fetch lock request from a client
179  */
180 static void daemon_request_fetch_lock(struct ctdb_client *client, 
181                                         struct ctdb_req_fetch_lock *f)
182 {
183         struct ctdb_call_state *state;
184         TDB_DATA key, *data;
185         struct ctdb_db_context *ctdb_db;
186         struct client_fetch_lock_data *fl_data;
187
188         ctdb_db = find_ctdb_db(client->ctdb, f->db_id);
189
190         key.dsize = f->keylen;
191         key.dptr = &f->key[0];
192
193         data        = talloc(client, TDB_DATA);
194         data->dptr  = NULL;
195         data->dsize = 0;
196
197         state = ctdb_fetch_lock_send(ctdb_db, client, key, data);
198         talloc_steal(state, data);
199
200         fl_data = talloc(state, struct client_fetch_lock_data);
201         fl_data->client = client;
202         fl_data->reqid  = f->hdr.reqid;
203         state->async.fn = daemon_fetch_lock_complete;
204         state->async.private_data = fl_data;
205 }
206
207 /*
208   called when the daemon gets a connect wait request from a client
209  */
210 static void daemon_request_connect_wait(struct ctdb_client *client, 
211                                         struct ctdb_req_connect_wait *c)
212 {
213         struct ctdb_reply_connect_wait r;
214         int res;
215
216         /* first wait - in the daemon */
217         ctdb_daemon_connect_wait(client->ctdb);
218
219         /* now send the reply */
220         ZERO_STRUCT(r);
221
222         r.hdr.length     = sizeof(r);
223         r.hdr.ctdb_magic = CTDB_MAGIC;
224         r.hdr.ctdb_version = CTDB_VERSION;
225         r.hdr.operation = CTDB_REPLY_CONNECT_WAIT;
226         r.vnn           = ctdb_get_vnn(client->ctdb);
227         r.num_connected = client->ctdb->num_connected;
228         
229         res = ctdb_queue_send(client->queue, (uint8_t *)&r.hdr, r.hdr.length);
230         if (res != 0) {
231                 printf("Failed to queue a connect wait response\n");
232                 return;
233         }
234 }
235
236 /*
237   destroy a ctdb_client
238 */
239 static int ctdb_client_destructor(struct ctdb_client *client)
240 {
241         close(client->fd);
242         client->fd = -1;
243         return 0;
244 }
245
246
247 /*
248   this is called when the ctdb daemon received a ctdb request message
249   from a local client over the unix domain socket
250  */
251 static void daemon_request_message_from_client(struct ctdb_client *client, 
252                                                struct ctdb_req_message *c)
253 {
254         TDB_DATA data;
255         int res;
256
257         /* maybe the message is for another client on this node */
258         if (ctdb_get_vnn(client->ctdb)==c->hdr.destnode) {
259                 ctdb_request_message(client->ctdb, (struct ctdb_req_header *)c);
260                 return;
261         }
262         
263         /* its for a remote node */
264         data.dptr = &c->data[0];
265         data.dsize = c->datalen;
266         res = ctdb_daemon_send_message(client->ctdb, c->hdr.destnode,
267                                        c->srvid, data);
268         if (res != 0) {
269                 printf("Failed to send message to remote node %u\n",
270                        c->hdr.destnode);
271         }
272 }
273
274 /*
275   this is called when the ctdb daemon received a ctdb request call
276   from a local client over the unix domain socket
277  */
278 static void daemon_request_call_from_client(struct ctdb_client *client, 
279                                             struct ctdb_req_call *c)
280 {
281         struct ctdb_call_state *state;
282         struct ctdb_db_context *ctdb_db;
283         struct ctdb_call call;
284         struct ctdb_reply_call *r;
285         int res;
286         uint32_t length;
287
288         ctdb_db = find_ctdb_db(client->ctdb, c->db_id);
289         if (!ctdb_db) {
290                 printf("Unknown database in request. db_id==0x%08x",c->db_id);
291                 return;
292         }
293
294         ZERO_STRUCT(call);
295         call.call_id = c->callid;
296         call.key.dptr = c->data;
297         call.key.dsize = c->keylen;
298         call.call_data.dptr = c->data + c->keylen;
299         call.call_data.dsize = c->calldatalen;
300
301         state = ctdb_call_send(ctdb_db, &call);
302
303 /* XXX this must be converted to fully async */
304         res = ctdb_call_recv(state, &call);
305         if (res != 0) {
306                 printf("ctdbd_call_recv() returned error\n");
307                 exit(1);
308         }
309
310         length = offsetof(struct ctdb_reply_call, data) + call.reply_data.dsize;
311         r = ctdbd_allocate_pkt(client->ctdb, length);
312         if (r == NULL) {
313                 printf("Failed to allocate reply_call in ctdb daemon\n");
314                 return;
315         }
316         ZERO_STRUCT(*r);
317         r->hdr.length       = length;
318         r->hdr.ctdb_magic   = CTDB_MAGIC;
319         r->hdr.ctdb_version = CTDB_VERSION;
320         r->hdr.operation    = CTDB_REPLY_CALL;
321         r->hdr.reqid        = c->hdr.reqid;
322         r->datalen          = call.reply_data.dsize;
323         memcpy(&r->data[0], call.reply_data.dptr, r->datalen);
324
325         res = ctdb_queue_send(client->queue, (uint8_t *)&r, r->hdr.length);
326         if (res != 0) {
327                 printf("Failed to queue packet from daemon to client\n");
328         }
329         talloc_free(r);
330 }
331
332
333 /* data contains a packet from the client */
334 static void client_incoming_packet(struct ctdb_client *client, void *data, size_t nread)
335 {
336         struct ctdb_req_header *hdr = data;
337
338         if (hdr->ctdb_magic != CTDB_MAGIC) {
339                 ctdb_set_error(client->ctdb, "Non CTDB packet rejected in daemon\n");
340                 goto done;
341         }
342
343         if (hdr->ctdb_version != CTDB_VERSION) {
344                 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
345                 goto done;
346         }
347
348         switch (hdr->operation) {
349         case CTDB_REQ_CALL:
350                 daemon_request_call_from_client(client, (struct ctdb_req_call *)hdr);
351                 break;
352
353         case CTDB_REQ_REGISTER:
354                 daemon_request_register_message_handler(client, 
355                                                         (struct ctdb_req_register *)hdr);
356                 break;
357         case CTDB_REQ_MESSAGE:
358                 daemon_request_message_from_client(client, (struct ctdb_req_message *)hdr);
359                 break;
360
361         case CTDB_REQ_CONNECT_WAIT:
362                 daemon_request_connect_wait(client, (struct ctdb_req_connect_wait *)hdr);
363                 break;
364         case CTDB_REQ_FETCH_LOCK:
365                 daemon_request_fetch_lock(client, (struct ctdb_req_fetch_lock *)hdr);
366                 break;
367         default:
368                 printf("daemon: unrecognized operation:%d\n",hdr->operation);
369         }
370
371 done:
372         talloc_free(data);
373 }
374
375
376 static void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args)
377 {
378         struct ctdb_client *client = talloc_get_type(args, struct ctdb_client);
379         struct ctdb_req_header *hdr;
380
381         if (cnt == 0) {
382                 talloc_free(client);
383                 return;
384         }
385
386         if (cnt < sizeof(*hdr)) {
387                 ctdb_set_error(client->ctdb, "Bad packet length %d in daemon\n", cnt);
388                 return;
389         }
390         hdr = (struct ctdb_req_header *)data;
391         if (cnt != hdr->length) {
392                 ctdb_set_error(client->ctdb, "Bad header length %d expected %d\n in daemon", 
393                                hdr->length, cnt);
394                 return;
395         }
396
397         if (hdr->ctdb_magic != CTDB_MAGIC) {
398                 ctdb_set_error(client->ctdb, "Non CTDB packet rejected\n");
399                 return;
400         }
401
402         if (hdr->ctdb_version != CTDB_VERSION) {
403                 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
404                 return;
405         }
406
407         /* it is the responsibility of the incoming packet function to free 'data' */
408         client_incoming_packet(client, data, cnt);
409 }
410
411 static void ctdb_accept_client(struct event_context *ev, struct fd_event *fde, 
412                          uint16_t flags, void *private_data)
413 {
414         struct sockaddr_in addr;
415         socklen_t len;
416         int fd;
417         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
418         struct ctdb_client *client;
419
420         memset(&addr, 0, sizeof(addr));
421         len = sizeof(addr);
422         fd = accept(ctdb->daemon.sd, (struct sockaddr *)&addr, &len);
423         if (fd == -1) {
424                 return;
425         }
426         set_non_blocking(fd);
427
428         client = talloc_zero(ctdb, struct ctdb_client);
429         client->ctdb = ctdb;
430         client->fd = fd;
431
432         client->queue = ctdb_queue_setup(ctdb, client, fd, CTDB_DS_ALIGNMENT, 
433                                          ctdb_client_read_cb, client);
434
435         talloc_set_destructor(client, ctdb_client_destructor);
436 }
437
438
439
440 static void ctdb_read_from_parent(struct event_context *ev, struct fd_event *fde, 
441                          uint16_t flags, void *private_data)
442 {
443         int *fd = private_data;
444         int cnt;
445         char buf;
446
447         /* XXX this is a good place to try doing some cleaning up before exiting */
448         cnt = read(*fd, &buf, 1);
449         if (cnt==0) {
450                 printf("parent process exited. filedescriptor dissappeared\n");
451                 exit(1);
452         } else {
453                 printf("ctdb: did not expect data from parent process\n");
454                 exit(1);
455         }
456 }
457
458
459
460 /*
461   create a unix domain socket and bind it
462   return a file descriptor open on the socket 
463 */
464 static int ux_socket_bind(struct ctdb_context *ctdb)
465 {
466         struct sockaddr_un addr;
467
468         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
469         if (ctdb->daemon.sd == -1) {
470                 ctdb->daemon.sd = -1;
471                 return -1;
472         }
473
474         set_non_blocking(ctdb->daemon.sd);
475
476         memset(&addr, 0, sizeof(addr));
477         addr.sun_family = AF_UNIX;
478         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
479
480         if (bind(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
481                 close(ctdb->daemon.sd);
482                 ctdb->daemon.sd = -1;
483                 return -1;
484         }       
485         listen(ctdb->daemon.sd, 1);
486
487         return 0;
488 }
489
490 /*
491   delete the socket on exit - called on destruction of autofree context
492  */
493 static int unlink_destructor(const char *name)
494 {
495         unlink(name);
496         return 0;
497 }
498
499 /*
500   start the protocol going
501 */
502 int ctdbd_start(struct ctdb_context *ctdb)
503 {
504         pid_t pid;
505         static int fd[2];
506         int res;
507         struct fd_event *fde;
508         const char *domain_socket_name;
509
510         /* generate a name to use for our local socket */
511         ctdb->daemon.name = talloc_asprintf(ctdb, "%s.%s", CTDB_PATH, ctdb->address.address);
512         /* get rid of any old sockets */
513         unlink(ctdb->daemon.name);
514
515         /* create a unix domain stream socket to listen to */
516         res = ux_socket_bind(ctdb);
517         if (res!=0) {
518                 printf("Failed to open CTDB unix domain socket\n");
519                 exit(10);
520         }
521
522         res = pipe(&fd[0]);
523         if (res) {
524                 printf("Failed to open pipe for CTDB\n");
525                 exit(1);
526         }
527         pid = fork();
528         if (pid==-1) {
529                 printf("Failed to fork CTDB daemon\n");
530                 exit(1);
531         }
532
533         if (pid) {
534                 close(fd[0]);
535                 close(ctdb->daemon.sd);
536                 ctdb->daemon.sd = -1;
537                 return 0;
538         }
539
540         /* ensure the socket is deleted on exit of the daemon */
541         domain_socket_name = talloc_strdup(talloc_autofree_context(), ctdb->daemon.name);
542         talloc_set_destructor(domain_socket_name, unlink_destructor);   
543         
544         close(fd[1]);
545         ctdb_clear_flags(ctdb, CTDB_FLAG_DAEMON_MODE);
546         ctdb->ev = event_context_init(NULL);
547         fde = event_add_fd(ctdb->ev, ctdb, fd[0], EVENT_FD_READ, ctdb_read_from_parent, &fd[0]);
548         fde = event_add_fd(ctdb->ev, ctdb, ctdb->daemon.sd, EVENT_FD_READ, ctdb_accept_client, ctdb);
549         ctdb_main_loop(ctdb);
550
551         return 0;
552 }
553
554 /*
555   allocate a packet for use in client<->daemon communication
556  */
557 void *ctdbd_allocate_pkt(struct ctdb_context *ctdb, size_t len)
558 {
559         int size;
560
561         size = (len+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
562         return talloc_size(ctdb, size);
563 }
564
565 int ctdb_daemon_set_message_handler(struct ctdb_context *ctdb, uint32_t srvid, 
566                              ctdb_message_fn_t handler,
567                              void *private_data)
568 {
569         return ctdb_register_message_handler(ctdb, ctdb, srvid, handler, private_data);
570 }
571