merge from tridge
[samba.git] / ctdb / common / ctdb_daemon.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2006
5
6    This library is free software; you can redistribute it and/or
7    modify it under the terms of the GNU Lesser General Public
8    License as published by the Free Software Foundation; either
9    version 2 of the License, or (at your option) any later version.
10
11    This library is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14    Lesser General Public License for more details.
15
16    You should have received a copy of the GNU Lesser General Public
17    License along with this library; if not, write to the Free Software
18    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
19 */
20
21 #include "includes.h"
22 #include "db_wrap.h"
23 #include "lib/tdb/include/tdb.h"
24 #include "lib/events/events.h"
25 #include "lib/util/dlinklist.h"
26 #include "system/network.h"
27 #include "system/filesys.h"
28 #include "../include/ctdb.h"
29 #include "../include/ctdb_private.h"
30
31 static void ctdb_main_loop(struct ctdb_context *ctdb)
32 {
33         ctdb->methods->start(ctdb);
34
35         /* go into a wait loop to allow other nodes to complete */
36         event_loop_wait(ctdb->ev);
37
38         printf("event_loop_wait() returned. this should not happen\n");
39         exit(1);
40 }
41
42
43 static void set_non_blocking(int fd)
44 {
45         unsigned v;
46         v = fcntl(fd, F_GETFL, 0);
47         fcntl(fd, F_SETFL, v | O_NONBLOCK);
48 }
49
50
51 /*
52   structure describing a connected client in the daemon
53  */
54 struct ctdb_client {
55         struct ctdb_context *ctdb;
56         int fd;
57         struct ctdb_queue *queue;
58 };
59
60
61 /*
62   destroy a ctdb_client
63 */
64 static int ctdb_client_destructor(struct ctdb_client *client)
65 {
66         close(client->fd);
67         client->fd = -1;
68         return 0;
69 }
70
71
72 /*
73   this is called when the ctdb daemon received a ctdb request call
74   from a local client over the unix domain socket
75  */
76 static void daemon_request_call_from_client(struct ctdb_client *client, 
77                                             struct ctdb_req_call *c)
78 {
79         struct ctdb_call_state *state;
80         struct ctdb_db_context *ctdb_db;
81         struct ctdb_call call;
82         struct ctdb_reply_call *r;
83         int res;
84         uint32_t length;
85
86         for (ctdb_db=client->ctdb->db_list; ctdb_db; ctdb_db=ctdb_db->next) {
87                 if (ctdb_db->db_id == c->db_id) {
88                         break;
89                 }
90         }
91         if (!ctdb_db) {
92                 printf("Unknown database in request. db_id==0x%08x",c->db_id);
93                 return;
94         }
95
96         ZERO_STRUCT(call);
97         call.call_id = c->callid;
98         call.key.dptr = c->data;
99         call.key.dsize = c->keylen;
100         call.call_data.dptr = c->data + c->keylen;
101         call.call_data.dsize = c->calldatalen;
102
103         state = ctdb_call_send(ctdb_db, &call);
104
105 /* XXX this must be converted to fully async */
106         res = ctdb_call_recv(state, &call);
107         if (res != 0) {
108                 printf("ctdbd_call_recv() returned error\n");
109                 exit(1);
110         }
111
112         length = offsetof(struct ctdb_reply_call, data) + call.reply_data.dsize;
113         r = ctdbd_allocate_pkt(client->ctdb, length);
114         if (r == NULL) {
115                 printf("Failed to allocate reply_call in ctdb daemon\n");
116                 return;
117         }
118         ZERO_STRUCT(*r);
119         r->hdr.length       = length;
120         r->hdr.ctdb_magic   = CTDB_MAGIC;
121         r->hdr.ctdb_version = CTDB_VERSION;
122         r->hdr.operation    = CTDB_REPLY_CALL;
123         r->hdr.reqid        = c->hdr.reqid;
124         r->datalen          = call.reply_data.dsize;
125         memcpy(&r->data[0], call.reply_data.dptr, r->datalen);
126
127         res = ctdb_queue_send(client->queue, (uint8_t *)&r, r->hdr.length);
128         if (res != 0) {
129                 printf("Failed to queue packet from daemon to client\n");
130         }
131         talloc_free(r);
132 }
133
134
135 /* data contains a packet from the client */
136 static void client_incoming_packet(struct ctdb_client *client, void *data, size_t nread)
137 {
138         struct ctdb_req_header *hdr = data;
139
140         if (hdr->ctdb_magic != CTDB_MAGIC) {
141                 ctdb_set_error(client->ctdb, "Non CTDB packet rejected\n");
142                 return;
143         }
144
145         if (hdr->ctdb_version != CTDB_VERSION) {
146                 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected\n", hdr->ctdb_version);
147                 return;
148         }
149
150         switch (hdr->operation) {
151         case CTDB_REQ_CALL:
152                 daemon_request_call_from_client(client, (struct ctdb_req_call *)hdr);
153                 break;
154
155         }
156
157         talloc_free(data);
158 }
159
160
161 static void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args)
162 {
163         struct ctdb_client *client = talloc_get_type(args, struct ctdb_client);
164         struct ctdb_req_header *hdr;
165
166         if (cnt < sizeof(*hdr)) {
167                 ctdb_set_error(client->ctdb, "Bad packet length %d\n", cnt);
168                 return;
169         }
170         hdr = (struct ctdb_req_header *)data;
171         if (cnt != hdr->length) {
172                 ctdb_set_error(client->ctdb, "Bad header length %d expected %d\n", 
173                                hdr->length, cnt);
174                 return;
175         }
176
177         if (hdr->ctdb_magic != CTDB_MAGIC) {
178                 ctdb_set_error(client->ctdb, "Non CTDB packet rejected\n");
179                 return;
180         }
181
182         if (hdr->ctdb_version != CTDB_VERSION) {
183                 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected\n", hdr->ctdb_version);
184                 return;
185         }
186
187         /* it is the responsibility of the incoming packet function to free 'data' */
188         client_incoming_packet(client, data, cnt);
189 }
190
191 static void ctdb_accept_client(struct event_context *ev, struct fd_event *fde, 
192                          uint16_t flags, void *private)
193 {
194         struct sockaddr_in addr;
195         socklen_t len;
196         int fd;
197         struct ctdb_context *ctdb = talloc_get_type(private, struct ctdb_context);
198         struct ctdb_client *client;
199
200         memset(&addr, 0, sizeof(addr));
201         len = sizeof(addr);
202         fd = accept(ctdb->daemon.sd, (struct sockaddr *)&addr, &len);
203         if (fd == -1) {
204                 return;
205         }
206         set_non_blocking(fd);
207
208         client = talloc_zero(ctdb, struct ctdb_client);
209         client->ctdb = ctdb;
210         client->fd = fd;
211
212         client->queue = ctdb_queue_setup(ctdb, client, fd, CTDB_DS_ALIGNMENT, 
213                                          ctdb_client_read_cb, client);
214
215         talloc_set_destructor(client, ctdb_client_destructor);
216 }
217
218
219
220 static void ctdb_read_from_parent(struct event_context *ev, struct fd_event *fde, 
221                          uint16_t flags, void *private)
222 {
223         int *fd = private;
224         int cnt;
225         char buf;
226
227         /* XXX this is a good place to try doing some cleaning up before exiting */
228         cnt = read(*fd, &buf, 1);
229         if (cnt==0) {
230                 printf("parent process exited. filedescriptor dissappeared\n");
231                 exit(1);
232         } else {
233                 printf("ctdb: did not expect data from parent process\n");
234                 exit(1);
235         }
236 }
237
238
239
240 /*
241   create a unix domain socket and bind it
242   return a file descriptor open on the socket 
243 */
244 static int ux_socket_bind(struct ctdb_context *ctdb)
245 {
246         struct sockaddr_un addr;
247
248         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
249         if (ctdb->daemon.sd == -1) {
250                 ctdb->daemon.sd = -1;
251                 return -1;
252         }
253
254         set_non_blocking(ctdb->daemon.sd);
255
256         memset(&addr, 0, sizeof(addr));
257         addr.sun_family = AF_UNIX;
258         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
259
260         if (bind(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
261                 close(ctdb->daemon.sd);
262                 ctdb->daemon.sd = -1;
263                 return -1;
264         }       
265         listen(ctdb->daemon.sd, 1);
266
267         return 0;
268 }
269
270 /*
271   delete the socket on exit - called on destruction of autofree context
272  */
273 static int unlink_destructor(const char *name)
274 {
275         unlink(name);
276         return 0;
277 }
278
279 /*
280   start the protocol going
281 */
282 int ctdbd_start(struct ctdb_context *ctdb)
283 {
284         pid_t pid;
285         static int fd[2];
286         int res;
287         struct fd_event *fde;
288         const char *domain_socket_name;
289
290         /* generate a name to use for our local socket */
291         ctdb->daemon.name = talloc_asprintf(ctdb, "%s.%s", CTDB_PATH, ctdb->address.address);
292         /* get rid of any old sockets */
293         unlink(ctdb->daemon.name);
294
295         /* create a unix domain stream socket to listen to */
296         res = ux_socket_bind(ctdb);
297         if (res!=0) {
298                 printf("Failed to open CTDB unix domain socket\n");
299                 exit(10);
300         }
301
302         res = pipe(&fd[0]);
303         if (res) {
304                 printf("Failed to open pipe for CTDB\n");
305                 exit(1);
306         }
307         pid = fork();
308         if (pid==-1) {
309                 printf("Failed to fork CTDB daemon\n");
310                 exit(1);
311         }
312
313         if (pid) {
314                 close(fd[0]);
315                 close(ctdb->daemon.sd);
316                 ctdb->daemon.sd = -1;
317                 return 0;
318         }
319
320         /* ensure the socket is deleted on exit of the daemon */
321         domain_socket_name = talloc_strdup(talloc_autofree_context(), ctdb->daemon.name);
322         talloc_set_destructor(domain_socket_name, unlink_destructor);   
323         
324         close(fd[1]);
325         ctdb_clear_flags(ctdb, CTDB_FLAG_DAEMON_MODE);
326         ctdb->ev = event_context_init(NULL);
327         fde = event_add_fd(ctdb->ev, ctdb, fd[0], EVENT_FD_READ, ctdb_read_from_parent, &fd[0]);
328         fde = event_add_fd(ctdb->ev, ctdb, ctdb->daemon.sd, EVENT_FD_READ, ctdb_accept_client, ctdb);
329         ctdb_main_loop(ctdb);
330
331         return 0;
332 }
333
334 /*
335   allocate a packet for use in client<->daemon communication
336  */
337 void *ctdbd_allocate_pkt(struct ctdb_context *ctdb, size_t len)
338 {
339         int size;
340
341         size = (len+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
342         return talloc_size(ctdb, size);
343 }
344