We use eventloop nesting in a couple of places, notably the sync
[metze/ctdb/wip.git] / server / ctdb_daemon.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2006
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "db_wrap.h"
22 #include "lib/tdb/include/tdb.h"
23 #include "lib/tevent/tevent.h"
24 #include "lib/util/dlinklist.h"
25 #include "system/network.h"
26 #include "system/filesys.h"
27 #include "system/wait.h"
28 #include "../include/ctdb_client.h"
29 #include "../include/ctdb_private.h"
30 #include <sys/socket.h>
31
32 struct ctdb_client_pid_list {
33         struct ctdb_client_pid_list *next, *prev;
34         struct ctdb_context *ctdb;
35         pid_t pid;
36         struct ctdb_client *client;
37 };
38
39 static void daemon_incoming_packet(void *, struct ctdb_req_header *);
40
41 static void print_exit_message(void)
42 {
43         DEBUG(DEBUG_NOTICE,("CTDB daemon shutting down\n"));
44 }
45
46 /* called when the "startup" event script has finished */
47 static void ctdb_start_transport(struct ctdb_context *ctdb)
48 {
49         if (ctdb->methods == NULL) {
50                 DEBUG(DEBUG_ALERT,(__location__ " startup event finished but transport is DOWN.\n"));
51                 ctdb_fatal(ctdb, "transport is not initialized but startup completed");
52         }
53
54         /* start the transport running */
55         if (ctdb->methods->start(ctdb) != 0) {
56                 DEBUG(DEBUG_ALERT,("transport failed to start!\n"));
57                 ctdb_fatal(ctdb, "transport failed to start");
58         }
59
60         /* start the recovery daemon process */
61         if (ctdb_start_recoverd(ctdb) != 0) {
62                 DEBUG(DEBUG_ALERT,("Failed to start recovery daemon\n"));
63                 exit(11);
64         }
65
66         /* Make sure we log something when the daemon terminates */
67         atexit(print_exit_message);
68
69         /* start monitoring for connected/disconnected nodes */
70         ctdb_start_keepalive(ctdb);
71
72         /* start monitoring for node health */
73         ctdb_start_monitoring(ctdb);
74
75         /* start periodic update of tcp tickle lists */
76         ctdb_start_tcp_tickle_update(ctdb);
77
78         /* start listening for recovery daemon pings */
79         ctdb_control_recd_ping(ctdb);
80 }
81
82 static void block_signal(int signum)
83 {
84         struct sigaction act;
85
86         memset(&act, 0, sizeof(act));
87
88         act.sa_handler = SIG_IGN;
89         sigemptyset(&act.sa_mask);
90         sigaddset(&act.sa_mask, signum);
91         sigaction(signum, &act, NULL);
92 }
93
94
95 /*
96   send a packet to a client
97  */
98 static int daemon_queue_send(struct ctdb_client *client, struct ctdb_req_header *hdr)
99 {
100         client->ctdb->statistics.client_packets_sent++;
101         if (hdr->operation == CTDB_REQ_MESSAGE) {
102                 if (ctdb_queue_length(client->queue) > client->ctdb->tunable.max_queue_depth_drop_msg) {
103                         DEBUG(DEBUG_ERR,("CTDB_REQ_MESSAGE queue full - killing client connection.\n"));
104                         talloc_free(client);
105                         return -1;
106                 }
107         }
108         return ctdb_queue_send(client->queue, (uint8_t *)hdr, hdr->length);
109 }
110
111 /*
112   message handler for when we are in daemon mode. This redirects the message
113   to the right client
114  */
115 static void daemon_message_handler(struct ctdb_context *ctdb, uint64_t srvid, 
116                                     TDB_DATA data, void *private_data)
117 {
118         struct ctdb_client *client = talloc_get_type(private_data, struct ctdb_client);
119         struct ctdb_req_message *r;
120         int len;
121
122         /* construct a message to send to the client containing the data */
123         len = offsetof(struct ctdb_req_message, data) + data.dsize;
124         r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE, 
125                                len, struct ctdb_req_message);
126         CTDB_NO_MEMORY_VOID(ctdb, r);
127
128         talloc_set_name_const(r, "req_message packet");
129
130         r->srvid         = srvid;
131         r->datalen       = data.dsize;
132         memcpy(&r->data[0], data.dptr, data.dsize);
133
134         daemon_queue_send(client, &r->hdr);
135
136         talloc_free(r);
137 }
138
139 /*
140   this is called when the ctdb daemon received a ctdb request to 
141   set the srvid from the client
142  */
143 int daemon_register_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
144 {
145         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
146         int res;
147         if (client == NULL) {
148                 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_register_message_handler\n"));
149                 return -1;
150         }
151         res = ctdb_register_message_handler(ctdb, client, srvid, daemon_message_handler, client);
152         if (res != 0) {
153                 DEBUG(DEBUG_ERR,(__location__ " Failed to register handler %llu in daemon\n", 
154                          (unsigned long long)srvid));
155         } else {
156                 DEBUG(DEBUG_INFO,(__location__ " Registered message handler for srvid=%llu\n", 
157                          (unsigned long long)srvid));
158         }
159
160         return res;
161 }
162
163 /*
164   this is called when the ctdb daemon received a ctdb request to 
165   remove a srvid from the client
166  */
167 int daemon_deregister_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
168 {
169         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
170         if (client == NULL) {
171                 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_deregister_message_handler\n"));
172                 return -1;
173         }
174         return ctdb_deregister_message_handler(ctdb, srvid, client);
175 }
176
177
178 /*
179   destroy a ctdb_client
180 */
181 static int ctdb_client_destructor(struct ctdb_client *client)
182 {
183         struct ctdb_db_context *ctdb_db;
184
185         ctdb_takeover_client_destructor_hook(client);
186         ctdb_reqid_remove(client->ctdb, client->client_id);
187         if (client->ctdb->statistics.num_clients) {
188                 client->ctdb->statistics.num_clients--;
189         }
190
191         if (client->num_persistent_updates != 0) {
192                 DEBUG(DEBUG_ERR,(__location__ " Client disconnecting with %u persistent updates in flight. Starting recovery\n", client->num_persistent_updates));
193                 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
194         }
195         ctdb_db = find_ctdb_db(client->ctdb, client->db_id);
196         if (ctdb_db) {
197                 DEBUG(DEBUG_ERR, (__location__ " client exit while transaction "
198                                   "commit active. Forcing recovery.\n"));
199                 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
200                 ctdb_db->transaction_active = false;
201         }
202
203         return 0;
204 }
205
206
207 /*
208   this is called when the ctdb daemon received a ctdb request message
209   from a local client over the unix domain socket
210  */
211 static void daemon_request_message_from_client(struct ctdb_client *client, 
212                                                struct ctdb_req_message *c)
213 {
214         TDB_DATA data;
215         int res;
216
217         /* maybe the message is for another client on this node */
218         if (ctdb_get_pnn(client->ctdb)==c->hdr.destnode) {
219                 ctdb_request_message(client->ctdb, (struct ctdb_req_header *)c);
220                 return;
221         }
222
223         /* its for a remote node */
224         data.dptr = &c->data[0];
225         data.dsize = c->datalen;
226         res = ctdb_daemon_send_message(client->ctdb, c->hdr.destnode,
227                                        c->srvid, data);
228         if (res != 0) {
229                 DEBUG(DEBUG_ERR,(__location__ " Failed to send message to remote node %u\n",
230                          c->hdr.destnode));
231         }
232 }
233
234
235 struct daemon_call_state {
236         struct ctdb_client *client;
237         uint32_t reqid;
238         struct ctdb_call *call;
239         struct timeval start_time;
240 };
241
242 /* 
243    complete a call from a client 
244 */
245 static void daemon_call_from_client_callback(struct ctdb_call_state *state)
246 {
247         struct daemon_call_state *dstate = talloc_get_type(state->async.private_data, 
248                                                            struct daemon_call_state);
249         struct ctdb_reply_call *r;
250         int res;
251         uint32_t length;
252         struct ctdb_client *client = dstate->client;
253         struct ctdb_db_context *ctdb_db = state->ctdb_db;
254
255         talloc_steal(client, dstate);
256         talloc_steal(dstate, dstate->call);
257
258         res = ctdb_daemon_call_recv(state, dstate->call);
259         if (res != 0) {
260                 DEBUG(DEBUG_ERR, (__location__ " ctdbd_call_recv() returned error\n"));
261                 if (client->ctdb->statistics.pending_calls > 0) {
262                         client->ctdb->statistics.pending_calls--;
263                 }
264                 ctdb_latency(ctdb_db, "call_from_client_cb 1", &client->ctdb->statistics.max_call_latency, dstate->start_time);
265                 return;
266         }
267
268         length = offsetof(struct ctdb_reply_call, data) + dstate->call->reply_data.dsize;
269         r = ctdbd_allocate_pkt(client->ctdb, dstate, CTDB_REPLY_CALL, 
270                                length, struct ctdb_reply_call);
271         if (r == NULL) {
272                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate reply_call in ctdb daemon\n"));
273                 if (client->ctdb->statistics.pending_calls > 0) {
274                         client->ctdb->statistics.pending_calls--;
275                 }
276                 ctdb_latency(ctdb_db, "call_from_client_cb 2", &client->ctdb->statistics.max_call_latency, dstate->start_time);
277                 return;
278         }
279         r->hdr.reqid        = dstate->reqid;
280         r->datalen          = dstate->call->reply_data.dsize;
281         memcpy(&r->data[0], dstate->call->reply_data.dptr, r->datalen);
282
283         res = daemon_queue_send(client, &r->hdr);
284         if (res == -1) {
285                 /* client is dead - return immediately */
286                 return;
287         }
288         if (res != 0) {
289                 DEBUG(DEBUG_ERR, (__location__ " Failed to queue packet from daemon to client\n"));
290         }
291         ctdb_latency(ctdb_db, "call_from_client_cb 3", &client->ctdb->statistics.max_call_latency, dstate->start_time);
292         talloc_free(dstate);
293         if (client->ctdb->statistics.pending_calls > 0) {
294                 client->ctdb->statistics.pending_calls--;
295         }
296 }
297
298 struct ctdb_daemon_packet_wrap {
299         struct ctdb_context *ctdb;
300         uint32_t client_id;
301 };
302
303 /*
304   a wrapper to catch disconnected clients
305  */
306 static void daemon_incoming_packet_wrap(void *p, struct ctdb_req_header *hdr)
307 {
308         struct ctdb_client *client;
309         struct ctdb_daemon_packet_wrap *w = talloc_get_type(p, 
310                                                             struct ctdb_daemon_packet_wrap);
311         if (w == NULL) {
312                 DEBUG(DEBUG_CRIT,(__location__ " Bad packet type '%s'\n", talloc_get_name(p)));
313                 return;
314         }
315
316         client = ctdb_reqid_find(w->ctdb, w->client_id, struct ctdb_client);
317         if (client == NULL) {
318                 DEBUG(DEBUG_ERR,(__location__ " Packet for disconnected client %u\n",
319                          w->client_id));
320                 talloc_free(w);
321                 return;
322         }
323         talloc_free(w);
324
325         /* process it */
326         daemon_incoming_packet(client, hdr);    
327 }
328
329
330 /*
331   this is called when the ctdb daemon received a ctdb request call
332   from a local client over the unix domain socket
333  */
334 static void daemon_request_call_from_client(struct ctdb_client *client, 
335                                             struct ctdb_req_call *c)
336 {
337         struct ctdb_call_state *state;
338         struct ctdb_db_context *ctdb_db;
339         struct daemon_call_state *dstate;
340         struct ctdb_call *call;
341         struct ctdb_ltdb_header header;
342         TDB_DATA key, data;
343         int ret;
344         struct ctdb_context *ctdb = client->ctdb;
345         struct ctdb_daemon_packet_wrap *w;
346
347         ctdb->statistics.total_calls++;
348         if (client->ctdb->statistics.pending_calls > 0) {
349                 ctdb->statistics.pending_calls++;
350         }
351
352         ctdb_db = find_ctdb_db(client->ctdb, c->db_id);
353         if (!ctdb_db) {
354                 DEBUG(DEBUG_ERR, (__location__ " Unknown database in request. db_id==0x%08x",
355                           c->db_id));
356                 if (client->ctdb->statistics.pending_calls > 0) {
357                         ctdb->statistics.pending_calls--;
358                 }
359                 return;
360         }
361
362         if (ctdb_db->unhealthy_reason) {
363                 /*
364                  * this is just a warning, as the tdb should be empty anyway,
365                  * and only persistent databases can be unhealthy, which doesn't
366                  * use this code patch
367                  */
368                 DEBUG(DEBUG_WARNING,("warn: db(%s) unhealty in daemon_request_call_from_client(): %s\n",
369                                      ctdb_db->db_name, ctdb_db->unhealthy_reason));
370         }
371
372         key.dptr = c->data;
373         key.dsize = c->keylen;
374
375         w = talloc(ctdb, struct ctdb_daemon_packet_wrap);
376         CTDB_NO_MEMORY_VOID(ctdb, w);   
377
378         w->ctdb = ctdb;
379         w->client_id = client->client_id;
380
381         ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header, 
382                                            (struct ctdb_req_header *)c, &data,
383                                            daemon_incoming_packet_wrap, w, True);
384         if (ret == -2) {
385                 /* will retry later */
386                 if (client->ctdb->statistics.pending_calls > 0) {
387                         ctdb->statistics.pending_calls--;
388                 }
389                 return;
390         }
391
392         talloc_free(w);
393
394         if (ret != 0) {
395                 DEBUG(DEBUG_ERR,(__location__ " Unable to fetch record\n"));
396                 if (client->ctdb->statistics.pending_calls > 0) {
397                         ctdb->statistics.pending_calls--;
398                 }
399                 return;
400         }
401
402         dstate = talloc(client, struct daemon_call_state);
403         if (dstate == NULL) {
404                 ret = ctdb_ltdb_unlock(ctdb_db, key);
405                 if (ret != 0) {
406                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
407                 }
408
409                 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate dstate\n"));
410                 if (client->ctdb->statistics.pending_calls > 0) {
411                         ctdb->statistics.pending_calls--;
412                 }
413                 return;
414         }
415         dstate->start_time = timeval_current();
416         dstate->client = client;
417         dstate->reqid  = c->hdr.reqid;
418         talloc_steal(dstate, data.dptr);
419
420         call = dstate->call = talloc_zero(dstate, struct ctdb_call);
421         if (call == NULL) {
422                 ret = ctdb_ltdb_unlock(ctdb_db, key);
423                 if (ret != 0) {
424                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
425                 }
426
427                 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate call\n"));
428                 if (client->ctdb->statistics.pending_calls > 0) {
429                         ctdb->statistics.pending_calls--;
430                 }
431                 ctdb_latency(ctdb_db, "call_from_client 1", &ctdb->statistics.max_call_latency, dstate->start_time);
432                 return;
433         }
434
435         call->call_id = c->callid;
436         call->key = key;
437         call->call_data.dptr = c->data + c->keylen;
438         call->call_data.dsize = c->calldatalen;
439         call->flags = c->flags;
440
441         if (header.dmaster == ctdb->pnn) {
442                 state = ctdb_call_local_send(ctdb_db, call, &header, &data);
443         } else {
444                 state = ctdb_daemon_call_send_remote(ctdb_db, call, &header);
445         }
446
447         ret = ctdb_ltdb_unlock(ctdb_db, key);
448         if (ret != 0) {
449                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
450         }
451
452         if (state == NULL) {
453                 DEBUG(DEBUG_ERR,(__location__ " Unable to setup call send\n"));
454                 if (client->ctdb->statistics.pending_calls > 0) {
455                         ctdb->statistics.pending_calls--;
456                 }
457                 ctdb_latency(ctdb_db, "call_from_client 2", &ctdb->statistics.max_call_latency, dstate->start_time);
458                 return;
459         }
460         talloc_steal(state, dstate);
461         talloc_steal(client, state);
462
463         state->async.fn = daemon_call_from_client_callback;
464         state->async.private_data = dstate;
465 }
466
467
468 static void daemon_request_control_from_client(struct ctdb_client *client, 
469                                                struct ctdb_req_control *c);
470
471 /* data contains a packet from the client */
472 static void daemon_incoming_packet(void *p, struct ctdb_req_header *hdr)
473 {
474         struct ctdb_client *client = talloc_get_type(p, struct ctdb_client);
475         TALLOC_CTX *tmp_ctx;
476         struct ctdb_context *ctdb = client->ctdb;
477
478         /* place the packet as a child of a tmp_ctx. We then use
479            talloc_free() below to free it. If any of the calls want
480            to keep it, then they will steal it somewhere else, and the
481            talloc_free() will be a no-op */
482         tmp_ctx = talloc_new(client);
483         talloc_steal(tmp_ctx, hdr);
484
485         if (hdr->ctdb_magic != CTDB_MAGIC) {
486                 ctdb_set_error(client->ctdb, "Non CTDB packet rejected in daemon\n");
487                 goto done;
488         }
489
490         if (hdr->ctdb_version != CTDB_VERSION) {
491                 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
492                 goto done;
493         }
494
495         switch (hdr->operation) {
496         case CTDB_REQ_CALL:
497                 ctdb->statistics.client.req_call++;
498                 daemon_request_call_from_client(client, (struct ctdb_req_call *)hdr);
499                 break;
500
501         case CTDB_REQ_MESSAGE:
502                 ctdb->statistics.client.req_message++;
503                 daemon_request_message_from_client(client, (struct ctdb_req_message *)hdr);
504                 break;
505
506         case CTDB_REQ_CONTROL:
507                 ctdb->statistics.client.req_control++;
508                 daemon_request_control_from_client(client, (struct ctdb_req_control *)hdr);
509                 break;
510
511         default:
512                 DEBUG(DEBUG_CRIT,(__location__ " daemon: unrecognized operation %u\n",
513                          hdr->operation));
514         }
515
516 done:
517         talloc_free(tmp_ctx);
518 }
519
520 /*
521   called when the daemon gets a incoming packet
522  */
523 static void ctdb_daemon_read_cb(uint8_t *data, size_t cnt, void *args)
524 {
525         struct ctdb_client *client = talloc_get_type(args, struct ctdb_client);
526         struct ctdb_req_header *hdr;
527
528         if (cnt == 0) {
529                 talloc_free(client);
530                 return;
531         }
532
533         client->ctdb->statistics.client_packets_recv++;
534
535         if (cnt < sizeof(*hdr)) {
536                 ctdb_set_error(client->ctdb, "Bad packet length %u in daemon\n", 
537                                (unsigned)cnt);
538                 return;
539         }
540         hdr = (struct ctdb_req_header *)data;
541         if (cnt != hdr->length) {
542                 ctdb_set_error(client->ctdb, "Bad header length %u expected %u\n in daemon", 
543                                (unsigned)hdr->length, (unsigned)cnt);
544                 return;
545         }
546
547         if (hdr->ctdb_magic != CTDB_MAGIC) {
548                 ctdb_set_error(client->ctdb, "Non CTDB packet rejected\n");
549                 return;
550         }
551
552         if (hdr->ctdb_version != CTDB_VERSION) {
553                 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
554                 return;
555         }
556
557         DEBUG(DEBUG_DEBUG,(__location__ " client request %u of type %u length %u from "
558                  "node %u to %u\n", hdr->reqid, hdr->operation, hdr->length,
559                  hdr->srcnode, hdr->destnode));
560
561         /* it is the responsibility of the incoming packet function to free 'data' */
562         daemon_incoming_packet(client, hdr);
563 }
564
565
566 static int ctdb_clientpid_destructor(struct ctdb_client_pid_list *client_pid)
567 {
568         if (client_pid->ctdb->client_pids != NULL) {
569                 DLIST_REMOVE(client_pid->ctdb->client_pids, client_pid);
570         }
571
572         return 0;
573 }
574
575
576 static void ctdb_accept_client(struct event_context *ev, struct fd_event *fde, 
577                          uint16_t flags, void *private_data)
578 {
579         struct sockaddr_un addr;
580         socklen_t len;
581         int fd;
582         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
583         struct ctdb_client *client;
584         struct ctdb_client_pid_list *client_pid;
585 #ifdef _AIX
586         struct peercred_struct cr;
587         socklen_t crl = sizeof(struct peercred_struct);
588 #else
589         struct ucred cr;
590         socklen_t crl = sizeof(struct ucred);
591 #endif
592
593         memset(&addr, 0, sizeof(addr));
594         len = sizeof(addr);
595         fd = accept(ctdb->daemon.sd, (struct sockaddr *)&addr, &len);
596         if (fd == -1) {
597                 return;
598         }
599
600         set_nonblocking(fd);
601         set_close_on_exec(fd);
602
603         DEBUG(DEBUG_DEBUG,(__location__ " Created SOCKET FD:%d to connected child\n", fd));
604
605         client = talloc_zero(ctdb, struct ctdb_client);
606 #ifdef _AIX
607         if (getsockopt(fd, SOL_SOCKET, SO_PEERID, &cr, &crl) == 0) {
608 #else
609         if (getsockopt(fd, SOL_SOCKET, SO_PEERCRED, &cr, &crl) == 0) {
610 #endif
611                 DEBUG(DEBUG_INFO,("Connected client with pid:%u\n", (unsigned)cr.pid));
612         }
613
614         client->ctdb = ctdb;
615         client->fd = fd;
616         client->client_id = ctdb_reqid_new(ctdb, client);
617         client->pid = cr.pid;
618
619         client_pid = talloc(client, struct ctdb_client_pid_list);
620         if (client_pid == NULL) {
621                 DEBUG(DEBUG_ERR,("Failed to allocate client pid structure\n"));
622                 close(fd);
623                 talloc_free(client);
624                 return;
625         }               
626         client_pid->ctdb   = ctdb;
627         client_pid->pid    = cr.pid;
628         client_pid->client = client;
629
630         DLIST_ADD(ctdb->client_pids, client_pid);
631
632         client->queue = ctdb_queue_setup(ctdb, client, fd, CTDB_DS_ALIGNMENT, 
633                                          ctdb_daemon_read_cb, client,
634                                          "client-%u", client->pid);
635
636         talloc_set_destructor(client, ctdb_client_destructor);
637         talloc_set_destructor(client_pid, ctdb_clientpid_destructor);
638         ctdb->statistics.num_clients++;
639 }
640
641
642
643 /*
644   create a unix domain socket and bind it
645   return a file descriptor open on the socket 
646 */
647 static int ux_socket_bind(struct ctdb_context *ctdb)
648 {
649         struct sockaddr_un addr;
650
651         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
652         if (ctdb->daemon.sd == -1) {
653                 return -1;
654         }
655
656         set_close_on_exec(ctdb->daemon.sd);
657         set_nonblocking(ctdb->daemon.sd);
658
659         memset(&addr, 0, sizeof(addr));
660         addr.sun_family = AF_UNIX;
661         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
662
663         if (bind(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
664                 DEBUG(DEBUG_CRIT,("Unable to bind on ctdb socket '%s'\n", ctdb->daemon.name));
665                 goto failed;
666         }       
667
668         if (chown(ctdb->daemon.name, geteuid(), getegid()) != 0 ||
669             chmod(ctdb->daemon.name, 0700) != 0) {
670                 DEBUG(DEBUG_CRIT,("Unable to secure ctdb socket '%s', ctdb->daemon.name\n", ctdb->daemon.name));
671                 goto failed;
672         } 
673
674
675         if (listen(ctdb->daemon.sd, 100) != 0) {
676                 DEBUG(DEBUG_CRIT,("Unable to listen on ctdb socket '%s'\n", ctdb->daemon.name));
677                 goto failed;
678         }
679
680         return 0;
681
682 failed:
683         close(ctdb->daemon.sd);
684         ctdb->daemon.sd = -1;
685         return -1;      
686 }
687
688 static void sig_child_handler(struct event_context *ev,
689         struct signal_event *se, int signum, int count,
690         void *dont_care, 
691         void *private_data)
692 {
693 //      struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
694         int status;
695         pid_t pid = -1;
696
697         while (pid != 0) {
698                 pid = waitpid(-1, &status, WNOHANG);
699                 if (pid == -1) {
700                         DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%d\n", errno));
701                         return;
702                 }
703                 if (pid > 0) {
704                         DEBUG(DEBUG_DEBUG, ("SIGCHLD from %d\n", (int)pid));
705                 }
706         }
707 }
708
709 static void ctdb_setup_event_callback(struct ctdb_context *ctdb, int status,
710                                       void *private_data)
711 {
712         if (status != 0) {
713                 ctdb_fatal(ctdb, "Failed to run setup event\n");
714                 return;
715         }
716         ctdb_run_notification_script(ctdb, "setup");
717
718         /* tell all other nodes we've just started up */
719         ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL,
720                                  0, CTDB_CONTROL_STARTUP, 0,
721                                  CTDB_CTRL_FLAG_NOREPLY,
722                                  tdb_null, NULL, NULL);
723 }
724
725 /*
726   start the protocol going as a daemon
727 */
728 int ctdb_start_daemon(struct ctdb_context *ctdb, bool do_fork, bool use_syslog)
729 {
730         int res, ret = -1;
731         struct fd_event *fde;
732         const char *domain_socket_name;
733         struct signal_event *se;
734
735         /* get rid of any old sockets */
736         unlink(ctdb->daemon.name);
737
738         /* create a unix domain stream socket to listen to */
739         res = ux_socket_bind(ctdb);
740         if (res!=0) {
741                 DEBUG(DEBUG_ALERT,(__location__ " Failed to open CTDB unix domain socket\n"));
742                 exit(10);
743         }
744
745         if (do_fork && fork()) {
746                 return 0;
747         }
748
749         tdb_reopen_all(False);
750
751         if (do_fork) {
752                 setsid();
753                 close(0);
754                 if (open("/dev/null", O_RDONLY) != 0) {
755                         DEBUG(DEBUG_ALERT,(__location__ " Failed to setup stdin on /dev/null\n"));
756                         exit(11);
757                 }
758         }
759         block_signal(SIGPIPE);
760
761         ctdbd_pid = getpid();
762
763
764         DEBUG(DEBUG_ERR, ("Starting CTDBD as pid : %u\n", ctdbd_pid));
765
766         ctdb_high_priority(ctdb);
767
768         /* ensure the socket is deleted on exit of the daemon */
769         domain_socket_name = talloc_strdup(talloc_autofree_context(), ctdb->daemon.name);
770         if (domain_socket_name == NULL) {
771                 DEBUG(DEBUG_ALERT,(__location__ " talloc_strdup failed.\n"));
772                 exit(12);
773         }
774
775         ctdb->ev = event_context_init(NULL);
776         tevent_loop_allow_nesting(ctdb->ev);
777
778         ctdb_set_child_logging(ctdb);
779
780         /* force initial recovery for election */
781         ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
782
783         if (strcmp(ctdb->transport, "tcp") == 0) {
784                 int ctdb_tcp_init(struct ctdb_context *);
785                 ret = ctdb_tcp_init(ctdb);
786         }
787 #ifdef USE_INFINIBAND
788         if (strcmp(ctdb->transport, "ib") == 0) {
789                 int ctdb_ibw_init(struct ctdb_context *);
790                 ret = ctdb_ibw_init(ctdb);
791         }
792 #endif
793         if (ret != 0) {
794                 DEBUG(DEBUG_ERR,("Failed to initialise transport '%s'\n", ctdb->transport));
795                 return -1;
796         }
797
798         if (ctdb->methods == NULL) {
799                 DEBUG(DEBUG_ALERT,(__location__ " Can not initialize transport. ctdb->methods is NULL\n"));
800                 ctdb_fatal(ctdb, "transport is unavailable. can not initialize.");
801         }
802
803         /* initialise the transport  */
804         if (ctdb->methods->initialise(ctdb) != 0) {
805                 ctdb_fatal(ctdb, "transport failed to initialise");
806         }
807
808         /* attach to existing databases */
809         if (ctdb_attach_databases(ctdb) != 0) {
810                 ctdb_fatal(ctdb, "Failed to attach to databases\n");
811         }
812
813         /* start frozen, then let the first election sort things out */
814         if (ctdb_blocking_freeze(ctdb)) {
815                 ctdb_fatal(ctdb, "Failed to get initial freeze\n");
816         }
817
818         ret = ctdb_event_script(ctdb, CTDB_EVENT_INIT);
819         if (ret != 0) {
820                 ctdb_fatal(ctdb, "Failed to run init event\n");
821         }
822         ctdb_run_notification_script(ctdb, "init");
823
824         /* now start accepting clients, only can do this once frozen */
825         fde = event_add_fd(ctdb->ev, ctdb, ctdb->daemon.sd, 
826                            EVENT_FD_READ,
827                            ctdb_accept_client, ctdb);
828         tevent_fd_set_auto_close(fde);
829
830         /* release any IPs we hold from previous runs of the daemon */
831         ctdb_release_all_ips(ctdb);
832
833         /* start the transport going */
834         ctdb_start_transport(ctdb);
835
836         /* set up a handler to pick up sigchld */
837         se = event_add_signal(ctdb->ev, ctdb,
838                                      SIGCHLD, 0,
839                                      sig_child_handler,
840                                      ctdb);
841         if (se == NULL) {
842                 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD\n"));
843                 exit(1);
844         }
845
846         ret = ctdb_event_script_callback(ctdb,
847                                          ctdb,
848                                          ctdb_setup_event_callback,
849                                          ctdb,
850                                          false,
851                                          CTDB_EVENT_SETUP,
852                                          "");
853         if (ret != 0) {
854                 DEBUG(DEBUG_CRIT,("Failed to set up 'setup' event\n"));
855                 exit(1);
856         }
857
858         if (use_syslog) {
859                 if (start_syslog_daemon(ctdb)) {
860                         DEBUG(DEBUG_CRIT, ("Failed to start syslog daemon\n"));
861                         exit(10);
862                 }
863         }
864
865         ctdb_lockdown_memory(ctdb);
866           
867         /* go into a wait loop to allow other nodes to complete */
868         event_loop_wait(ctdb->ev);
869
870         DEBUG(DEBUG_CRIT,("event_loop_wait() returned. this should not happen\n"));
871         exit(1);
872 }
873
874 /*
875   allocate a packet for use in daemon<->daemon communication
876  */
877 struct ctdb_req_header *_ctdb_transport_allocate(struct ctdb_context *ctdb,
878                                                  TALLOC_CTX *mem_ctx, 
879                                                  enum ctdb_operation operation, 
880                                                  size_t length, size_t slength,
881                                                  const char *type)
882 {
883         int size;
884         struct ctdb_req_header *hdr;
885
886         length = MAX(length, slength);
887         size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
888
889         if (ctdb->methods == NULL) {
890                 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate transport packet for operation %u of length %u. Transport is DOWN.\n",
891                          operation, (unsigned)length));
892                 return NULL;
893         }
894
895         hdr = (struct ctdb_req_header *)ctdb->methods->allocate_pkt(mem_ctx, size);
896         if (hdr == NULL) {
897                 DEBUG(DEBUG_ERR,("Unable to allocate transport packet for operation %u of length %u\n",
898                          operation, (unsigned)length));
899                 return NULL;
900         }
901         talloc_set_name_const(hdr, type);
902         memset(hdr, 0, slength);
903         hdr->length       = length;
904         hdr->operation    = operation;
905         hdr->ctdb_magic   = CTDB_MAGIC;
906         hdr->ctdb_version = CTDB_VERSION;
907         hdr->generation   = ctdb->vnn_map->generation;
908         hdr->srcnode      = ctdb->pnn;
909
910         return hdr;     
911 }
912
913 struct daemon_control_state {
914         struct daemon_control_state *next, *prev;
915         struct ctdb_client *client;
916         struct ctdb_req_control *c;
917         uint32_t reqid;
918         struct ctdb_node *node;
919 };
920
921 /*
922   callback when a control reply comes in
923  */
924 static void daemon_control_callback(struct ctdb_context *ctdb,
925                                     int32_t status, TDB_DATA data, 
926                                     const char *errormsg,
927                                     void *private_data)
928 {
929         struct daemon_control_state *state = talloc_get_type(private_data, 
930                                                              struct daemon_control_state);
931         struct ctdb_client *client = state->client;
932         struct ctdb_reply_control *r;
933         size_t len;
934         int ret;
935
936         /* construct a message to send to the client containing the data */
937         len = offsetof(struct ctdb_reply_control, data) + data.dsize;
938         if (errormsg) {
939                 len += strlen(errormsg);
940         }
941         r = ctdbd_allocate_pkt(ctdb, state, CTDB_REPLY_CONTROL, len, 
942                                struct ctdb_reply_control);
943         CTDB_NO_MEMORY_VOID(ctdb, r);
944
945         r->hdr.reqid     = state->reqid;
946         r->status        = status;
947         r->datalen       = data.dsize;
948         r->errorlen = 0;
949         memcpy(&r->data[0], data.dptr, data.dsize);
950         if (errormsg) {
951                 r->errorlen = strlen(errormsg);
952                 memcpy(&r->data[r->datalen], errormsg, r->errorlen);
953         }
954
955         ret = daemon_queue_send(client, &r->hdr);
956         if (ret != -1) {
957                 talloc_free(state);
958         }
959 }
960
961 /*
962   fail all pending controls to a disconnected node
963  */
964 void ctdb_daemon_cancel_controls(struct ctdb_context *ctdb, struct ctdb_node *node)
965 {
966         struct daemon_control_state *state;
967         while ((state = node->pending_controls)) {
968                 DLIST_REMOVE(node->pending_controls, state);
969                 daemon_control_callback(ctdb, (uint32_t)-1, tdb_null, 
970                                         "node is disconnected", state);
971         }
972 }
973
974 /*
975   destroy a daemon_control_state
976  */
977 static int daemon_control_destructor(struct daemon_control_state *state)
978 {
979         if (state->node) {
980                 DLIST_REMOVE(state->node->pending_controls, state);
981         }
982         return 0;
983 }
984
985 /*
986   this is called when the ctdb daemon received a ctdb request control
987   from a local client over the unix domain socket
988  */
989 static void daemon_request_control_from_client(struct ctdb_client *client, 
990                                                struct ctdb_req_control *c)
991 {
992         TDB_DATA data;
993         int res;
994         struct daemon_control_state *state;
995         TALLOC_CTX *tmp_ctx = talloc_new(client);
996
997         if (c->hdr.destnode == CTDB_CURRENT_NODE) {
998                 c->hdr.destnode = client->ctdb->pnn;
999         }
1000
1001         state = talloc(client, struct daemon_control_state);
1002         CTDB_NO_MEMORY_VOID(client->ctdb, state);
1003
1004         state->client = client;
1005         state->c = talloc_steal(state, c);
1006         state->reqid = c->hdr.reqid;
1007         if (ctdb_validate_pnn(client->ctdb, c->hdr.destnode)) {
1008                 state->node = client->ctdb->nodes[c->hdr.destnode];
1009                 DLIST_ADD(state->node->pending_controls, state);
1010         } else {
1011                 state->node = NULL;
1012         }
1013
1014         talloc_set_destructor(state, daemon_control_destructor);
1015
1016         if (c->flags & CTDB_CTRL_FLAG_NOREPLY) {
1017                 talloc_steal(tmp_ctx, state);
1018         }
1019         
1020         data.dptr = &c->data[0];
1021         data.dsize = c->datalen;
1022         res = ctdb_daemon_send_control(client->ctdb, c->hdr.destnode,
1023                                        c->srvid, c->opcode, client->client_id,
1024                                        c->flags,
1025                                        data, daemon_control_callback,
1026                                        state);
1027         if (res != 0) {
1028                 DEBUG(DEBUG_ERR,(__location__ " Failed to send control to remote node %u\n",
1029                          c->hdr.destnode));
1030         }
1031
1032         talloc_free(tmp_ctx);
1033 }
1034
1035 /*
1036   register a call function
1037 */
1038 int ctdb_daemon_set_call(struct ctdb_context *ctdb, uint32_t db_id,
1039                          ctdb_fn_t fn, int id)
1040 {
1041         struct ctdb_registered_call *call;
1042         struct ctdb_db_context *ctdb_db;
1043
1044         ctdb_db = find_ctdb_db(ctdb, db_id);
1045         if (ctdb_db == NULL) {
1046                 return -1;
1047         }
1048
1049         call = talloc(ctdb_db, struct ctdb_registered_call);
1050         call->fn = fn;
1051         call->id = id;
1052
1053         DLIST_ADD(ctdb_db->calls, call);        
1054         return 0;
1055 }
1056
1057
1058
1059 /*
1060   this local messaging handler is ugly, but is needed to prevent
1061   recursion in ctdb_send_message() when the destination node is the
1062   same as the source node
1063  */
1064 struct ctdb_local_message {
1065         struct ctdb_context *ctdb;
1066         uint64_t srvid;
1067         TDB_DATA data;
1068 };
1069
1070 static void ctdb_local_message_trigger(struct event_context *ev, struct timed_event *te, 
1071                                        struct timeval t, void *private_data)
1072 {
1073         struct ctdb_local_message *m = talloc_get_type(private_data, 
1074                                                        struct ctdb_local_message);
1075         int res;
1076
1077         res = ctdb_dispatch_message(m->ctdb, m->srvid, m->data);
1078         if (res != 0) {
1079                 DEBUG(DEBUG_ERR, (__location__ " Failed to dispatch message for srvid=%llu\n", 
1080                           (unsigned long long)m->srvid));
1081         }
1082         talloc_free(m);
1083 }
1084
1085 static int ctdb_local_message(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data)
1086 {
1087         struct ctdb_local_message *m;
1088         m = talloc(ctdb, struct ctdb_local_message);
1089         CTDB_NO_MEMORY(ctdb, m);
1090
1091         m->ctdb = ctdb;
1092         m->srvid = srvid;
1093         m->data  = data;
1094         m->data.dptr = talloc_memdup(m, m->data.dptr, m->data.dsize);
1095         if (m->data.dptr == NULL) {
1096                 talloc_free(m);
1097                 return -1;
1098         }
1099
1100         /* this needs to be done as an event to prevent recursion */
1101         event_add_timed(ctdb->ev, m, timeval_zero(), ctdb_local_message_trigger, m);
1102         return 0;
1103 }
1104
1105 /*
1106   send a ctdb message
1107 */
1108 int ctdb_daemon_send_message(struct ctdb_context *ctdb, uint32_t pnn,
1109                              uint64_t srvid, TDB_DATA data)
1110 {
1111         struct ctdb_req_message *r;
1112         int len;
1113
1114         if (ctdb->methods == NULL) {
1115                 DEBUG(DEBUG_ERR,(__location__ " Failed to send message. Transport is DOWN\n"));
1116                 return -1;
1117         }
1118
1119         /* see if this is a message to ourselves */
1120         if (pnn == ctdb->pnn) {
1121                 return ctdb_local_message(ctdb, srvid, data);
1122         }
1123
1124         len = offsetof(struct ctdb_req_message, data) + data.dsize;
1125         r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_MESSAGE, len,
1126                                     struct ctdb_req_message);
1127         CTDB_NO_MEMORY(ctdb, r);
1128
1129         r->hdr.destnode  = pnn;
1130         r->srvid         = srvid;
1131         r->datalen       = data.dsize;
1132         memcpy(&r->data[0], data.dptr, data.dsize);
1133
1134         ctdb_queue_packet(ctdb, &r->hdr);
1135
1136         talloc_free(r);
1137         return 0;
1138 }
1139
1140
1141
1142 struct ctdb_client_notify_list {
1143         struct ctdb_client_notify_list *next, *prev;
1144         struct ctdb_context *ctdb;
1145         uint64_t srvid;
1146         TDB_DATA data;
1147 };
1148
1149
1150 static int ctdb_client_notify_destructor(struct ctdb_client_notify_list *nl)
1151 {
1152         int ret;
1153
1154         DEBUG(DEBUG_ERR,("Sending client notify message for srvid:%llu\n", (unsigned long long)nl->srvid));
1155
1156         ret = ctdb_daemon_send_message(nl->ctdb, CTDB_BROADCAST_CONNECTED, (unsigned long long)nl->srvid, nl->data);
1157         if (ret != 0) {
1158                 DEBUG(DEBUG_ERR,("Failed to send client notify message\n"));
1159         }
1160
1161         return 0;
1162 }
1163
1164 int32_t ctdb_control_register_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1165 {
1166         struct ctdb_client_notify_register *notify = (struct ctdb_client_notify_register *)indata.dptr;
1167         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client); 
1168         struct ctdb_client_notify_list *nl;
1169
1170         DEBUG(DEBUG_INFO,("Register srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
1171
1172         if (indata.dsize < offsetof(struct ctdb_client_notify_register, notify_data)) {
1173                 DEBUG(DEBUG_ERR,(__location__ " Too little data in control : %d\n", (int)indata.dsize));
1174                 return -1;
1175         }
1176
1177         if (indata.dsize != (notify->len + offsetof(struct ctdb_client_notify_register, notify_data))) {
1178                 DEBUG(DEBUG_ERR,(__location__ " Wrong amount of data in control. Got %d, expected %d\n", (int)indata.dsize, (int)(notify->len + offsetof(struct ctdb_client_notify_register, notify_data))));
1179                 return -1;
1180         }
1181
1182
1183         if (client == NULL) {
1184                 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1185                 return -1;
1186         }
1187
1188         for(nl=client->notify; nl; nl=nl->next) {
1189                 if (nl->srvid == notify->srvid) {
1190                         break;
1191                 }
1192         }
1193         if (nl != NULL) {
1194                 DEBUG(DEBUG_ERR,(__location__ " Notification for srvid:%llu already exists for this client\n", (unsigned long long)notify->srvid));
1195                 return -1;
1196         }
1197
1198         nl = talloc(client, struct ctdb_client_notify_list);
1199         CTDB_NO_MEMORY(ctdb, nl);
1200         nl->ctdb       = ctdb;
1201         nl->srvid      = notify->srvid;
1202         nl->data.dsize = notify->len;
1203         nl->data.dptr  = talloc_size(nl, nl->data.dsize);
1204         CTDB_NO_MEMORY(ctdb, nl->data.dptr);
1205         memcpy(nl->data.dptr, notify->notify_data, nl->data.dsize);
1206         
1207         DLIST_ADD(client->notify, nl);
1208         talloc_set_destructor(nl, ctdb_client_notify_destructor);
1209
1210         return 0;
1211 }
1212
1213 int32_t ctdb_control_deregister_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1214 {
1215         struct ctdb_client_notify_deregister *notify = (struct ctdb_client_notify_deregister *)indata.dptr;
1216         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client); 
1217         struct ctdb_client_notify_list *nl;
1218
1219         DEBUG(DEBUG_INFO,("Deregister srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
1220
1221         if (client == NULL) {
1222                 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1223                 return -1;
1224         }
1225
1226         for(nl=client->notify; nl; nl=nl->next) {
1227                 if (nl->srvid == notify->srvid) {
1228                         break;
1229                 }
1230         }
1231         if (nl == NULL) {
1232                 DEBUG(DEBUG_ERR,(__location__ " No notification for srvid:%llu found for this client\n", (unsigned long long)notify->srvid));
1233                 return -1;
1234         }
1235
1236         DLIST_REMOVE(client->notify, nl);
1237         talloc_set_destructor(nl, NULL);
1238         talloc_free(nl);
1239
1240         return 0;
1241 }
1242
1243 struct ctdb_client *ctdb_find_client_by_pid(struct ctdb_context *ctdb, pid_t pid)
1244 {
1245         struct ctdb_client_pid_list *client_pid;
1246
1247         for (client_pid = ctdb->client_pids; client_pid; client_pid=client_pid->next) {
1248                 if (client_pid->pid == pid) {
1249                         return client_pid->client;
1250                 }
1251         }
1252         return NULL;
1253 }
1254
1255
1256 /* This control is used by samba when probing if a process (of a samba daemon)
1257    exists on the node.
1258    Samba does this when it needs/wants to check if a subrecord in one of the
1259    databases is still valied, or if it is stale and can be removed.
1260    If the node is in unhealthy or stopped state we just kill of the samba
1261    process holding htis sub-record and return to the calling samba that
1262    the process does not exist.
1263    This allows us to forcefully recall subrecords registered by samba processes
1264    on banned and stopped nodes.
1265 */
1266 int32_t ctdb_control_process_exists(struct ctdb_context *ctdb, pid_t pid)
1267 {
1268         struct ctdb_client *client;
1269
1270         if (ctdb->nodes[ctdb->pnn]->flags & (NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
1271                 client = ctdb_find_client_by_pid(ctdb, pid);
1272                 if (client != NULL) {
1273                         DEBUG(DEBUG_NOTICE,(__location__ " Killing client with pid:%d on banned/stopped node\n", (int)pid));
1274                         talloc_free(client);
1275                 }
1276                 return -1;
1277         }
1278
1279         return kill(pid, 0);
1280 }