merge from ronnie, plus complete the client side of inter-node messaging
[samba.git] / ctdb / common / ctdb_daemon.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2006
5
6    This library is free software; you can redistribute it and/or
7    modify it under the terms of the GNU Lesser General Public
8    License as published by the Free Software Foundation; either
9    version 2 of the License, or (at your option) any later version.
10
11    This library is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14    Lesser General Public License for more details.
15
16    You should have received a copy of the GNU Lesser General Public
17    License along with this library; if not, write to the Free Software
18    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
19 */
20
21 #include "includes.h"
22 #include "db_wrap.h"
23 #include "lib/tdb/include/tdb.h"
24 #include "lib/events/events.h"
25 #include "lib/util/dlinklist.h"
26 #include "system/network.h"
27 #include "system/filesys.h"
28 #include "../include/ctdb.h"
29 #include "../include/ctdb_private.h"
30
31 static void ctdb_main_loop(struct ctdb_context *ctdb)
32 {
33         ctdb->methods->start(ctdb);
34
35         /* go into a wait loop to allow other nodes to complete */
36         event_loop_wait(ctdb->ev);
37
38         printf("event_loop_wait() returned. this should not happen\n");
39         exit(1);
40 }
41
42
43 static void set_non_blocking(int fd)
44 {
45         unsigned v;
46         v = fcntl(fd, F_GETFL, 0);
47         fcntl(fd, F_SETFL, v | O_NONBLOCK);
48 }
49
50
51 /*
52   structure describing a connected client in the daemon
53  */
54 struct ctdb_client {
55         struct ctdb_context *ctdb;
56         int fd;
57         struct ctdb_queue *queue;
58 };
59
60
61 /*
62   message handler for when we are in daemon mode. This redirects the message
63   to the right client
64  */
65 static void daemon_message_handler(struct ctdb_context *ctdb, uint32_t srvid, 
66                                     TDB_DATA data, void *private)
67 {
68         struct ctdb_client *client = talloc_get_type(private, struct ctdb_client);
69         struct ctdb_req_message *r;
70         int len;
71
72         /* construct a message to send to the client containing the data */
73         len = offsetof(struct ctdb_req_message, data) + data.dsize;
74         r = ctdbd_allocate_pkt(ctdb, len);
75
76 /*XXX cant use this since it returns an int     CTDB_NO_MEMORY(ctdb, r);*/
77         talloc_set_name_const(r, "req_message packet");
78
79         r->hdr.length    = len;
80         r->hdr.ctdb_magic = CTDB_MAGIC;
81         r->hdr.ctdb_version = CTDB_VERSION;
82         r->hdr.operation = CTDB_REQ_MESSAGE;
83         r->srvid         = srvid;
84         r->datalen       = data.dsize;
85         memcpy(&r->data[0], data.dptr, data.dsize);
86         
87         ctdb_queue_send(client->queue, (uint8_t *)&r->hdr, len);
88
89         talloc_free(r);
90         return;
91 }
92                                            
93
94 /*
95   this is called when the ctdb daemon received a ctdb request to 
96   set the srvid from the client
97  */
98 static void daemon_request_register_message_handler(struct ctdb_client *client, 
99                                                     struct ctdb_req_register *c)
100 {
101         int res;
102         res = ctdb_register_message_handler(client->ctdb, client, 
103                                             c->srvid, daemon_message_handler, 
104                                             client);
105         if (res != 0) {
106                 printf("Failed to register handler %u in daemon\n", c->srvid);
107         }
108 }
109
110
111 /*
112   destroy a ctdb_client
113 */
114 static int ctdb_client_destructor(struct ctdb_client *client)
115 {
116         close(client->fd);
117         client->fd = -1;
118         return 0;
119 }
120
121
122 /*
123   this is called when the ctdb daemon received a ctdb request message
124   from a local client over the unix domain socket
125  */
126 static void daemon_request_message_from_client(struct ctdb_client *client, 
127                                                struct ctdb_req_message *c)
128 {
129         TDB_DATA data;
130         int res;
131
132         /* maybe the message is for another client on this node */
133         if (ctdb_get_vnn(client->ctdb)==c->hdr.destnode) {
134                 ctdb_request_message(client->ctdb, (struct ctdb_req_header *)c);
135                 return;
136         }
137         
138         /* its for a remote node */
139         data.dptr = &c->data[0];
140         data.dsize = c->datalen;
141         res = ctdb_daemon_send_message(client->ctdb, c->hdr.destnode,
142                                        c->srvid, data);
143         if (res != 0) {
144                 printf("Failed to send message to remote node %u\n",
145                        c->hdr.destnode);
146         }
147 }
148
149 /*
150   this is called when the ctdb daemon received a ctdb request call
151   from a local client over the unix domain socket
152  */
153 static void daemon_request_call_from_client(struct ctdb_client *client, 
154                                             struct ctdb_req_call *c)
155 {
156         struct ctdb_call_state *state;
157         struct ctdb_db_context *ctdb_db;
158         struct ctdb_call call;
159         struct ctdb_reply_call *r;
160         int res;
161         uint32_t length;
162
163         for (ctdb_db=client->ctdb->db_list; ctdb_db; ctdb_db=ctdb_db->next) {
164                 if (ctdb_db->db_id == c->db_id) {
165                         break;
166                 }
167         }
168         if (!ctdb_db) {
169                 printf("Unknown database in request. db_id==0x%08x",c->db_id);
170                 return;
171         }
172
173         ZERO_STRUCT(call);
174         call.call_id = c->callid;
175         call.key.dptr = c->data;
176         call.key.dsize = c->keylen;
177         call.call_data.dptr = c->data + c->keylen;
178         call.call_data.dsize = c->calldatalen;
179
180         state = ctdb_call_send(ctdb_db, &call);
181
182 /* XXX this must be converted to fully async */
183         res = ctdb_call_recv(state, &call);
184         if (res != 0) {
185                 printf("ctdbd_call_recv() returned error\n");
186                 exit(1);
187         }
188
189         length = offsetof(struct ctdb_reply_call, data) + call.reply_data.dsize;
190         r = ctdbd_allocate_pkt(client->ctdb, length);
191         if (r == NULL) {
192                 printf("Failed to allocate reply_call in ctdb daemon\n");
193                 return;
194         }
195         ZERO_STRUCT(*r);
196         r->hdr.length       = length;
197         r->hdr.ctdb_magic   = CTDB_MAGIC;
198         r->hdr.ctdb_version = CTDB_VERSION;
199         r->hdr.operation    = CTDB_REPLY_CALL;
200         r->hdr.reqid        = c->hdr.reqid;
201         r->datalen          = call.reply_data.dsize;
202         memcpy(&r->data[0], call.reply_data.dptr, r->datalen);
203
204         res = ctdb_queue_send(client->queue, (uint8_t *)&r, r->hdr.length);
205         if (res != 0) {
206                 printf("Failed to queue packet from daemon to client\n");
207         }
208         talloc_free(r);
209 }
210
211
212 /* data contains a packet from the client */
213 static void client_incoming_packet(struct ctdb_client *client, void *data, size_t nread)
214 {
215         struct ctdb_req_header *hdr = data;
216
217         if (hdr->ctdb_magic != CTDB_MAGIC) {
218                 ctdb_set_error(client->ctdb, "Non CTDB packet rejected\n");
219                 goto done;
220         }
221
222         if (hdr->ctdb_version != CTDB_VERSION) {
223                 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected\n", hdr->ctdb_version);
224                 goto done;
225         }
226
227         switch (hdr->operation) {
228         case CTDB_REQ_CALL:
229                 daemon_request_call_from_client(client, (struct ctdb_req_call *)hdr);
230                 break;
231
232         case CTDB_REQ_REGISTER:
233                 daemon_request_register_message_handler(client, 
234                                                         (struct ctdb_req_register *)hdr);
235                 break;
236         case CTDB_REQ_MESSAGE:
237                 daemon_request_message_from_client(client, (struct ctdb_req_message *)hdr);
238                 break;
239         }
240
241 done:
242         talloc_free(data);
243 }
244
245
246 static void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args)
247 {
248         struct ctdb_client *client = talloc_get_type(args, struct ctdb_client);
249         struct ctdb_req_header *hdr;
250
251         if (cnt < sizeof(*hdr)) {
252                 ctdb_set_error(client->ctdb, "Bad packet length %d\n", cnt);
253                 return;
254         }
255         hdr = (struct ctdb_req_header *)data;
256         if (cnt != hdr->length) {
257                 ctdb_set_error(client->ctdb, "Bad header length %d expected %d\n", 
258                                hdr->length, cnt);
259                 return;
260         }
261
262         if (hdr->ctdb_magic != CTDB_MAGIC) {
263                 ctdb_set_error(client->ctdb, "Non CTDB packet rejected\n");
264                 return;
265         }
266
267         if (hdr->ctdb_version != CTDB_VERSION) {
268                 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected\n", hdr->ctdb_version);
269                 return;
270         }
271
272         /* it is the responsibility of the incoming packet function to free 'data' */
273         client_incoming_packet(client, data, cnt);
274 }
275
276 static void ctdb_accept_client(struct event_context *ev, struct fd_event *fde, 
277                          uint16_t flags, void *private)
278 {
279         struct sockaddr_in addr;
280         socklen_t len;
281         int fd;
282         struct ctdb_context *ctdb = talloc_get_type(private, struct ctdb_context);
283         struct ctdb_client *client;
284
285         memset(&addr, 0, sizeof(addr));
286         len = sizeof(addr);
287         fd = accept(ctdb->daemon.sd, (struct sockaddr *)&addr, &len);
288         if (fd == -1) {
289                 return;
290         }
291         set_non_blocking(fd);
292
293         client = talloc_zero(ctdb, struct ctdb_client);
294         client->ctdb = ctdb;
295         client->fd = fd;
296
297         client->queue = ctdb_queue_setup(ctdb, client, fd, CTDB_DS_ALIGNMENT, 
298                                          ctdb_client_read_cb, client);
299
300         talloc_set_destructor(client, ctdb_client_destructor);
301 }
302
303
304
305 static void ctdb_read_from_parent(struct event_context *ev, struct fd_event *fde, 
306                          uint16_t flags, void *private)
307 {
308         int *fd = private;
309         int cnt;
310         char buf;
311
312         /* XXX this is a good place to try doing some cleaning up before exiting */
313         cnt = read(*fd, &buf, 1);
314         if (cnt==0) {
315                 printf("parent process exited. filedescriptor dissappeared\n");
316                 exit(1);
317         } else {
318                 printf("ctdb: did not expect data from parent process\n");
319                 exit(1);
320         }
321 }
322
323
324
325 /*
326   create a unix domain socket and bind it
327   return a file descriptor open on the socket 
328 */
329 static int ux_socket_bind(struct ctdb_context *ctdb)
330 {
331         struct sockaddr_un addr;
332
333         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
334         if (ctdb->daemon.sd == -1) {
335                 ctdb->daemon.sd = -1;
336                 return -1;
337         }
338
339         set_non_blocking(ctdb->daemon.sd);
340
341         memset(&addr, 0, sizeof(addr));
342         addr.sun_family = AF_UNIX;
343         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
344
345         if (bind(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
346                 close(ctdb->daemon.sd);
347                 ctdb->daemon.sd = -1;
348                 return -1;
349         }       
350         listen(ctdb->daemon.sd, 1);
351
352         return 0;
353 }
354
355 /*
356   delete the socket on exit - called on destruction of autofree context
357  */
358 static int unlink_destructor(const char *name)
359 {
360         unlink(name);
361         return 0;
362 }
363
364 /*
365   start the protocol going
366 */
367 int ctdbd_start(struct ctdb_context *ctdb)
368 {
369         pid_t pid;
370         static int fd[2];
371         int res;
372         struct fd_event *fde;
373         const char *domain_socket_name;
374
375         /* generate a name to use for our local socket */
376         ctdb->daemon.name = talloc_asprintf(ctdb, "%s.%s", CTDB_PATH, ctdb->address.address);
377         /* get rid of any old sockets */
378         unlink(ctdb->daemon.name);
379
380         /* create a unix domain stream socket to listen to */
381         res = ux_socket_bind(ctdb);
382         if (res!=0) {
383                 printf("Failed to open CTDB unix domain socket\n");
384                 exit(10);
385         }
386
387         res = pipe(&fd[0]);
388         if (res) {
389                 printf("Failed to open pipe for CTDB\n");
390                 exit(1);
391         }
392         pid = fork();
393         if (pid==-1) {
394                 printf("Failed to fork CTDB daemon\n");
395                 exit(1);
396         }
397
398         if (pid) {
399                 close(fd[0]);
400                 close(ctdb->daemon.sd);
401                 ctdb->daemon.sd = -1;
402                 return 0;
403         }
404
405         /* ensure the socket is deleted on exit of the daemon */
406         domain_socket_name = talloc_strdup(talloc_autofree_context(), ctdb->daemon.name);
407         talloc_set_destructor(domain_socket_name, unlink_destructor);   
408         
409         close(fd[1]);
410         ctdb_clear_flags(ctdb, CTDB_FLAG_DAEMON_MODE);
411         ctdb->ev = event_context_init(NULL);
412         fde = event_add_fd(ctdb->ev, ctdb, fd[0], EVENT_FD_READ, ctdb_read_from_parent, &fd[0]);
413         fde = event_add_fd(ctdb->ev, ctdb, ctdb->daemon.sd, EVENT_FD_READ, ctdb_accept_client, ctdb);
414         ctdb_main_loop(ctdb);
415
416         return 0;
417 }
418
419 /*
420   allocate a packet for use in client<->daemon communication
421  */
422 void *ctdbd_allocate_pkt(struct ctdb_context *ctdb, size_t len)
423 {
424         int size;
425
426         size = (len+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
427         return talloc_size(ctdb, size);
428 }
429
430 int ctdb_daemon_set_message_handler(struct ctdb_context *ctdb, uint32_t srvid, 
431                              ctdb_message_fn_t handler,
432                              void *private)
433 {
434         return ctdb_register_message_handler(ctdb, ctdb, srvid, handler, private);
435 }
436