Add rolling statistics that are collected across 10 second intervals.
[metze/ctdb/wip.git] / server / ctdb_daemon.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2006
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "db_wrap.h"
22 #include "lib/tdb/include/tdb.h"
23 #include "lib/tevent/tevent.h"
24 #include "lib/util/dlinklist.h"
25 #include "system/network.h"
26 #include "system/filesys.h"
27 #include "system/wait.h"
28 #include "../include/ctdb_client.h"
29 #include "../include/ctdb_private.h"
30 #include <sys/socket.h>
31
32 struct ctdb_client_pid_list {
33         struct ctdb_client_pid_list *next, *prev;
34         struct ctdb_context *ctdb;
35         pid_t pid;
36         struct ctdb_client *client;
37 };
38
39 static void daemon_incoming_packet(void *, struct ctdb_req_header *);
40
41 static void print_exit_message(void)
42 {
43         DEBUG(DEBUG_NOTICE,("CTDB daemon shutting down\n"));
44 }
45
46 /* called when the "startup" event script has finished */
47 static void ctdb_start_transport(struct ctdb_context *ctdb)
48 {
49         if (ctdb->methods == NULL) {
50                 DEBUG(DEBUG_ALERT,(__location__ " startup event finished but transport is DOWN.\n"));
51                 ctdb_fatal(ctdb, "transport is not initialized but startup completed");
52         }
53
54         /* start the transport running */
55         if (ctdb->methods->start(ctdb) != 0) {
56                 DEBUG(DEBUG_ALERT,("transport failed to start!\n"));
57                 ctdb_fatal(ctdb, "transport failed to start");
58         }
59
60         /* start the recovery daemon process */
61         if (ctdb_start_recoverd(ctdb) != 0) {
62                 DEBUG(DEBUG_ALERT,("Failed to start recovery daemon\n"));
63                 exit(11);
64         }
65
66         /* Make sure we log something when the daemon terminates */
67         atexit(print_exit_message);
68
69         /* start monitoring for connected/disconnected nodes */
70         ctdb_start_keepalive(ctdb);
71
72         /* start monitoring for node health */
73         ctdb_start_monitoring(ctdb);
74
75         /* start periodic update of tcp tickle lists */
76         ctdb_start_tcp_tickle_update(ctdb);
77
78         /* start listening for recovery daemon pings */
79         ctdb_control_recd_ping(ctdb);
80 }
81
82 static void block_signal(int signum)
83 {
84         struct sigaction act;
85
86         memset(&act, 0, sizeof(act));
87
88         act.sa_handler = SIG_IGN;
89         sigemptyset(&act.sa_mask);
90         sigaddset(&act.sa_mask, signum);
91         sigaction(signum, &act, NULL);
92 }
93
94
95 /*
96   send a packet to a client
97  */
98 static int daemon_queue_send(struct ctdb_client *client, struct ctdb_req_header *hdr)
99 {
100         CTDB_INCREMENT_STAT(client->ctdb, client_packets_sent);
101         if (hdr->operation == CTDB_REQ_MESSAGE) {
102                 if (ctdb_queue_length(client->queue) > client->ctdb->tunable.max_queue_depth_drop_msg) {
103                         DEBUG(DEBUG_ERR,("CTDB_REQ_MESSAGE queue full - killing client connection.\n"));
104                         talloc_free(client);
105                         return -1;
106                 }
107         }
108         return ctdb_queue_send(client->queue, (uint8_t *)hdr, hdr->length);
109 }
110
111 /*
112   message handler for when we are in daemon mode. This redirects the message
113   to the right client
114  */
115 static void daemon_message_handler(struct ctdb_context *ctdb, uint64_t srvid, 
116                                     TDB_DATA data, void *private_data)
117 {
118         struct ctdb_client *client = talloc_get_type(private_data, struct ctdb_client);
119         struct ctdb_req_message *r;
120         int len;
121
122         /* construct a message to send to the client containing the data */
123         len = offsetof(struct ctdb_req_message, data) + data.dsize;
124         r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE, 
125                                len, struct ctdb_req_message);
126         CTDB_NO_MEMORY_VOID(ctdb, r);
127
128         talloc_set_name_const(r, "req_message packet");
129
130         r->srvid         = srvid;
131         r->datalen       = data.dsize;
132         memcpy(&r->data[0], data.dptr, data.dsize);
133
134         daemon_queue_send(client, &r->hdr);
135
136         talloc_free(r);
137 }
138
139 /*
140   this is called when the ctdb daemon received a ctdb request to 
141   set the srvid from the client
142  */
143 int daemon_register_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
144 {
145         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
146         int res;
147         if (client == NULL) {
148                 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_register_message_handler\n"));
149                 return -1;
150         }
151         res = ctdb_register_message_handler(ctdb, client, srvid, daemon_message_handler, client);
152         if (res != 0) {
153                 DEBUG(DEBUG_ERR,(__location__ " Failed to register handler %llu in daemon\n", 
154                          (unsigned long long)srvid));
155         } else {
156                 DEBUG(DEBUG_INFO,(__location__ " Registered message handler for srvid=%llu\n", 
157                          (unsigned long long)srvid));
158         }
159
160         return res;
161 }
162
163 /*
164   this is called when the ctdb daemon received a ctdb request to 
165   remove a srvid from the client
166  */
167 int daemon_deregister_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
168 {
169         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
170         if (client == NULL) {
171                 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_deregister_message_handler\n"));
172                 return -1;
173         }
174         return ctdb_deregister_message_handler(ctdb, srvid, client);
175 }
176
177
178 /*
179   destroy a ctdb_client
180 */
181 static int ctdb_client_destructor(struct ctdb_client *client)
182 {
183         struct ctdb_db_context *ctdb_db;
184
185         ctdb_takeover_client_destructor_hook(client);
186         ctdb_reqid_remove(client->ctdb, client->client_id);
187         CTDB_DECREMENT_STAT(client->ctdb, num_clients);
188
189         if (client->num_persistent_updates != 0) {
190                 DEBUG(DEBUG_ERR,(__location__ " Client disconnecting with %u persistent updates in flight. Starting recovery\n", client->num_persistent_updates));
191                 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
192         }
193         ctdb_db = find_ctdb_db(client->ctdb, client->db_id);
194         if (ctdb_db) {
195                 DEBUG(DEBUG_ERR, (__location__ " client exit while transaction "
196                                   "commit active. Forcing recovery.\n"));
197                 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
198                 ctdb_db->transaction_active = false;
199         }
200
201         return 0;
202 }
203
204
205 /*
206   this is called when the ctdb daemon received a ctdb request message
207   from a local client over the unix domain socket
208  */
209 static void daemon_request_message_from_client(struct ctdb_client *client, 
210                                                struct ctdb_req_message *c)
211 {
212         TDB_DATA data;
213         int res;
214
215         /* maybe the message is for another client on this node */
216         if (ctdb_get_pnn(client->ctdb)==c->hdr.destnode) {
217                 ctdb_request_message(client->ctdb, (struct ctdb_req_header *)c);
218                 return;
219         }
220
221         /* its for a remote node */
222         data.dptr = &c->data[0];
223         data.dsize = c->datalen;
224         res = ctdb_daemon_send_message(client->ctdb, c->hdr.destnode,
225                                        c->srvid, data);
226         if (res != 0) {
227                 DEBUG(DEBUG_ERR,(__location__ " Failed to send message to remote node %u\n",
228                          c->hdr.destnode));
229         }
230 }
231
232
233 struct daemon_call_state {
234         struct ctdb_client *client;
235         uint32_t reqid;
236         struct ctdb_call *call;
237         struct timeval start_time;
238 };
239
240 /* 
241    complete a call from a client 
242 */
243 static void daemon_call_from_client_callback(struct ctdb_call_state *state)
244 {
245         struct daemon_call_state *dstate = talloc_get_type(state->async.private_data, 
246                                                            struct daemon_call_state);
247         struct ctdb_reply_call *r;
248         int res;
249         uint32_t length;
250         struct ctdb_client *client = dstate->client;
251         struct ctdb_db_context *ctdb_db = state->ctdb_db;
252
253         talloc_steal(client, dstate);
254         talloc_steal(dstate, dstate->call);
255
256         res = ctdb_daemon_call_recv(state, dstate->call);
257         if (res != 0) {
258                 DEBUG(DEBUG_ERR, (__location__ " ctdbd_call_recv() returned error\n"));
259                 CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
260
261                 CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 1", max_call_latency, dstate->start_time);
262                 return;
263         }
264
265         length = offsetof(struct ctdb_reply_call, data) + dstate->call->reply_data.dsize;
266         r = ctdbd_allocate_pkt(client->ctdb, dstate, CTDB_REPLY_CALL, 
267                                length, struct ctdb_reply_call);
268         if (r == NULL) {
269                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate reply_call in ctdb daemon\n"));
270                 CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
271                 CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 2", max_call_latency, dstate->start_time);
272                 return;
273         }
274         r->hdr.reqid        = dstate->reqid;
275         r->datalen          = dstate->call->reply_data.dsize;
276         memcpy(&r->data[0], dstate->call->reply_data.dptr, r->datalen);
277
278         res = daemon_queue_send(client, &r->hdr);
279         if (res == -1) {
280                 /* client is dead - return immediately */
281                 return;
282         }
283         if (res != 0) {
284                 DEBUG(DEBUG_ERR, (__location__ " Failed to queue packet from daemon to client\n"));
285         }
286         CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 3", max_call_latency, dstate->start_time);
287         CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
288         talloc_free(dstate);
289 }
290
291 struct ctdb_daemon_packet_wrap {
292         struct ctdb_context *ctdb;
293         uint32_t client_id;
294 };
295
296 /*
297   a wrapper to catch disconnected clients
298  */
299 static void daemon_incoming_packet_wrap(void *p, struct ctdb_req_header *hdr)
300 {
301         struct ctdb_client *client;
302         struct ctdb_daemon_packet_wrap *w = talloc_get_type(p, 
303                                                             struct ctdb_daemon_packet_wrap);
304         if (w == NULL) {
305                 DEBUG(DEBUG_CRIT,(__location__ " Bad packet type '%s'\n", talloc_get_name(p)));
306                 return;
307         }
308
309         client = ctdb_reqid_find(w->ctdb, w->client_id, struct ctdb_client);
310         if (client == NULL) {
311                 DEBUG(DEBUG_ERR,(__location__ " Packet for disconnected client %u\n",
312                          w->client_id));
313                 talloc_free(w);
314                 return;
315         }
316         talloc_free(w);
317
318         /* process it */
319         daemon_incoming_packet(client, hdr);    
320 }
321
322
323 /*
324   this is called when the ctdb daemon received a ctdb request call
325   from a local client over the unix domain socket
326  */
327 static void daemon_request_call_from_client(struct ctdb_client *client, 
328                                             struct ctdb_req_call *c)
329 {
330         struct ctdb_call_state *state;
331         struct ctdb_db_context *ctdb_db;
332         struct daemon_call_state *dstate;
333         struct ctdb_call *call;
334         struct ctdb_ltdb_header header;
335         TDB_DATA key, data;
336         int ret;
337         struct ctdb_context *ctdb = client->ctdb;
338         struct ctdb_daemon_packet_wrap *w;
339
340         CTDB_INCREMENT_STAT(ctdb, total_calls);
341         CTDB_DECREMENT_STAT(ctdb, pending_calls);
342
343         ctdb_db = find_ctdb_db(client->ctdb, c->db_id);
344         if (!ctdb_db) {
345                 DEBUG(DEBUG_ERR, (__location__ " Unknown database in request. db_id==0x%08x",
346                           c->db_id));
347                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
348                 return;
349         }
350
351         if (ctdb_db->unhealthy_reason) {
352                 /*
353                  * this is just a warning, as the tdb should be empty anyway,
354                  * and only persistent databases can be unhealthy, which doesn't
355                  * use this code patch
356                  */
357                 DEBUG(DEBUG_WARNING,("warn: db(%s) unhealty in daemon_request_call_from_client(): %s\n",
358                                      ctdb_db->db_name, ctdb_db->unhealthy_reason));
359         }
360
361         key.dptr = c->data;
362         key.dsize = c->keylen;
363
364         w = talloc(ctdb, struct ctdb_daemon_packet_wrap);
365         CTDB_NO_MEMORY_VOID(ctdb, w);   
366
367         w->ctdb = ctdb;
368         w->client_id = client->client_id;
369
370         ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header, 
371                                            (struct ctdb_req_header *)c, &data,
372                                            daemon_incoming_packet_wrap, w, True);
373         if (ret == -2) {
374                 /* will retry later */
375                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
376                 return;
377         }
378
379         talloc_free(w);
380
381         if (ret != 0) {
382                 DEBUG(DEBUG_ERR,(__location__ " Unable to fetch record\n"));
383                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
384                 return;
385         }
386
387         dstate = talloc(client, struct daemon_call_state);
388         if (dstate == NULL) {
389                 ret = ctdb_ltdb_unlock(ctdb_db, key);
390                 if (ret != 0) {
391                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
392                 }
393
394                 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate dstate\n"));
395                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
396                 return;
397         }
398         dstate->start_time = timeval_current();
399         dstate->client = client;
400         dstate->reqid  = c->hdr.reqid;
401         talloc_steal(dstate, data.dptr);
402
403         call = dstate->call = talloc_zero(dstate, struct ctdb_call);
404         if (call == NULL) {
405                 ret = ctdb_ltdb_unlock(ctdb_db, key);
406                 if (ret != 0) {
407                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
408                 }
409
410                 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate call\n"));
411                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
412                 CTDB_UPDATE_LATENCY(ctdb, ctdb_db, "call_from_client 1", max_call_latency, dstate->start_time);
413                 return;
414         }
415
416         call->call_id = c->callid;
417         call->key = key;
418         call->call_data.dptr = c->data + c->keylen;
419         call->call_data.dsize = c->calldatalen;
420         call->flags = c->flags;
421
422         if (header.dmaster == ctdb->pnn) {
423                 state = ctdb_call_local_send(ctdb_db, call, &header, &data);
424         } else {
425                 state = ctdb_daemon_call_send_remote(ctdb_db, call, &header);
426         }
427
428         ret = ctdb_ltdb_unlock(ctdb_db, key);
429         if (ret != 0) {
430                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
431         }
432
433         if (state == NULL) {
434                 DEBUG(DEBUG_ERR,(__location__ " Unable to setup call send\n"));
435                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
436                 CTDB_UPDATE_LATENCY(ctdb, ctdb_db, "call_from_client 2", max_call_latency, dstate->start_time);
437                 return;
438         }
439         talloc_steal(state, dstate);
440         talloc_steal(client, state);
441
442         state->async.fn = daemon_call_from_client_callback;
443         state->async.private_data = dstate;
444 }
445
446
447 static void daemon_request_control_from_client(struct ctdb_client *client, 
448                                                struct ctdb_req_control *c);
449
450 /* data contains a packet from the client */
451 static void daemon_incoming_packet(void *p, struct ctdb_req_header *hdr)
452 {
453         struct ctdb_client *client = talloc_get_type(p, struct ctdb_client);
454         TALLOC_CTX *tmp_ctx;
455         struct ctdb_context *ctdb = client->ctdb;
456
457         /* place the packet as a child of a tmp_ctx. We then use
458            talloc_free() below to free it. If any of the calls want
459            to keep it, then they will steal it somewhere else, and the
460            talloc_free() will be a no-op */
461         tmp_ctx = talloc_new(client);
462         talloc_steal(tmp_ctx, hdr);
463
464         if (hdr->ctdb_magic != CTDB_MAGIC) {
465                 ctdb_set_error(client->ctdb, "Non CTDB packet rejected in daemon\n");
466                 goto done;
467         }
468
469         if (hdr->ctdb_version != CTDB_VERSION) {
470                 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
471                 goto done;
472         }
473
474         switch (hdr->operation) {
475         case CTDB_REQ_CALL:
476                 CTDB_INCREMENT_STAT(ctdb, client.req_call);
477                 daemon_request_call_from_client(client, (struct ctdb_req_call *)hdr);
478                 break;
479
480         case CTDB_REQ_MESSAGE:
481                 CTDB_INCREMENT_STAT(ctdb, client.req_message);
482                 daemon_request_message_from_client(client, (struct ctdb_req_message *)hdr);
483                 break;
484
485         case CTDB_REQ_CONTROL:
486                 CTDB_INCREMENT_STAT(ctdb, client.req_control);
487                 daemon_request_control_from_client(client, (struct ctdb_req_control *)hdr);
488                 break;
489
490         default:
491                 DEBUG(DEBUG_CRIT,(__location__ " daemon: unrecognized operation %u\n",
492                          hdr->operation));
493         }
494
495 done:
496         talloc_free(tmp_ctx);
497 }
498
499 /*
500   called when the daemon gets a incoming packet
501  */
502 static void ctdb_daemon_read_cb(uint8_t *data, size_t cnt, void *args)
503 {
504         struct ctdb_client *client = talloc_get_type(args, struct ctdb_client);
505         struct ctdb_req_header *hdr;
506
507         if (cnt == 0) {
508                 talloc_free(client);
509                 return;
510         }
511
512         CTDB_INCREMENT_STAT(client->ctdb, client_packets_recv);
513
514         if (cnt < sizeof(*hdr)) {
515                 ctdb_set_error(client->ctdb, "Bad packet length %u in daemon\n", 
516                                (unsigned)cnt);
517                 return;
518         }
519         hdr = (struct ctdb_req_header *)data;
520         if (cnt != hdr->length) {
521                 ctdb_set_error(client->ctdb, "Bad header length %u expected %u\n in daemon", 
522                                (unsigned)hdr->length, (unsigned)cnt);
523                 return;
524         }
525
526         if (hdr->ctdb_magic != CTDB_MAGIC) {
527                 ctdb_set_error(client->ctdb, "Non CTDB packet rejected\n");
528                 return;
529         }
530
531         if (hdr->ctdb_version != CTDB_VERSION) {
532                 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
533                 return;
534         }
535
536         DEBUG(DEBUG_DEBUG,(__location__ " client request %u of type %u length %u from "
537                  "node %u to %u\n", hdr->reqid, hdr->operation, hdr->length,
538                  hdr->srcnode, hdr->destnode));
539
540         /* it is the responsibility of the incoming packet function to free 'data' */
541         daemon_incoming_packet(client, hdr);
542 }
543
544
545 static int ctdb_clientpid_destructor(struct ctdb_client_pid_list *client_pid)
546 {
547         if (client_pid->ctdb->client_pids != NULL) {
548                 DLIST_REMOVE(client_pid->ctdb->client_pids, client_pid);
549         }
550
551         return 0;
552 }
553
554
555 static void ctdb_accept_client(struct event_context *ev, struct fd_event *fde, 
556                          uint16_t flags, void *private_data)
557 {
558         struct sockaddr_un addr;
559         socklen_t len;
560         int fd;
561         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
562         struct ctdb_client *client;
563         struct ctdb_client_pid_list *client_pid;
564 #ifdef _AIX
565         struct peercred_struct cr;
566         socklen_t crl = sizeof(struct peercred_struct);
567 #else
568         struct ucred cr;
569         socklen_t crl = sizeof(struct ucred);
570 #endif
571
572         memset(&addr, 0, sizeof(addr));
573         len = sizeof(addr);
574         fd = accept(ctdb->daemon.sd, (struct sockaddr *)&addr, &len);
575         if (fd == -1) {
576                 return;
577         }
578
579         set_nonblocking(fd);
580         set_close_on_exec(fd);
581
582         DEBUG(DEBUG_DEBUG,(__location__ " Created SOCKET FD:%d to connected child\n", fd));
583
584         client = talloc_zero(ctdb, struct ctdb_client);
585 #ifdef _AIX
586         if (getsockopt(fd, SOL_SOCKET, SO_PEERID, &cr, &crl) == 0) {
587 #else
588         if (getsockopt(fd, SOL_SOCKET, SO_PEERCRED, &cr, &crl) == 0) {
589 #endif
590                 DEBUG(DEBUG_INFO,("Connected client with pid:%u\n", (unsigned)cr.pid));
591         }
592
593         client->ctdb = ctdb;
594         client->fd = fd;
595         client->client_id = ctdb_reqid_new(ctdb, client);
596         client->pid = cr.pid;
597
598         client_pid = talloc(client, struct ctdb_client_pid_list);
599         if (client_pid == NULL) {
600                 DEBUG(DEBUG_ERR,("Failed to allocate client pid structure\n"));
601                 close(fd);
602                 talloc_free(client);
603                 return;
604         }               
605         client_pid->ctdb   = ctdb;
606         client_pid->pid    = cr.pid;
607         client_pid->client = client;
608
609         DLIST_ADD(ctdb->client_pids, client_pid);
610
611         client->queue = ctdb_queue_setup(ctdb, client, fd, CTDB_DS_ALIGNMENT, 
612                                          ctdb_daemon_read_cb, client,
613                                          "client-%u", client->pid);
614
615         talloc_set_destructor(client, ctdb_client_destructor);
616         talloc_set_destructor(client_pid, ctdb_clientpid_destructor);
617         CTDB_INCREMENT_STAT(ctdb, num_clients);
618 }
619
620
621
622 /*
623   create a unix domain socket and bind it
624   return a file descriptor open on the socket 
625 */
626 static int ux_socket_bind(struct ctdb_context *ctdb)
627 {
628         struct sockaddr_un addr;
629
630         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
631         if (ctdb->daemon.sd == -1) {
632                 return -1;
633         }
634
635         set_close_on_exec(ctdb->daemon.sd);
636         set_nonblocking(ctdb->daemon.sd);
637
638         memset(&addr, 0, sizeof(addr));
639         addr.sun_family = AF_UNIX;
640         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
641
642         if (bind(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
643                 DEBUG(DEBUG_CRIT,("Unable to bind on ctdb socket '%s'\n", ctdb->daemon.name));
644                 goto failed;
645         }       
646
647         if (chown(ctdb->daemon.name, geteuid(), getegid()) != 0 ||
648             chmod(ctdb->daemon.name, 0700) != 0) {
649                 DEBUG(DEBUG_CRIT,("Unable to secure ctdb socket '%s', ctdb->daemon.name\n", ctdb->daemon.name));
650                 goto failed;
651         } 
652
653
654         if (listen(ctdb->daemon.sd, 100) != 0) {
655                 DEBUG(DEBUG_CRIT,("Unable to listen on ctdb socket '%s'\n", ctdb->daemon.name));
656                 goto failed;
657         }
658
659         return 0;
660
661 failed:
662         close(ctdb->daemon.sd);
663         ctdb->daemon.sd = -1;
664         return -1;      
665 }
666
667 static void sig_child_handler(struct event_context *ev,
668         struct signal_event *se, int signum, int count,
669         void *dont_care, 
670         void *private_data)
671 {
672 //      struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
673         int status;
674         pid_t pid = -1;
675
676         while (pid != 0) {
677                 pid = waitpid(-1, &status, WNOHANG);
678                 if (pid == -1) {
679                         DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%d\n", errno));
680                         return;
681                 }
682                 if (pid > 0) {
683                         DEBUG(DEBUG_DEBUG, ("SIGCHLD from %d\n", (int)pid));
684                 }
685         }
686 }
687
688 static void ctdb_setup_event_callback(struct ctdb_context *ctdb, int status,
689                                       void *private_data)
690 {
691         if (status != 0) {
692                 ctdb_fatal(ctdb, "Failed to run setup event\n");
693                 return;
694         }
695         ctdb_run_notification_script(ctdb, "setup");
696
697         /* tell all other nodes we've just started up */
698         ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL,
699                                  0, CTDB_CONTROL_STARTUP, 0,
700                                  CTDB_CTRL_FLAG_NOREPLY,
701                                  tdb_null, NULL, NULL);
702 }
703
704 /*
705   start the protocol going as a daemon
706 */
707 int ctdb_start_daemon(struct ctdb_context *ctdb, bool do_fork, bool use_syslog)
708 {
709         int res, ret = -1;
710         struct fd_event *fde;
711         const char *domain_socket_name;
712         struct signal_event *se;
713
714         /* get rid of any old sockets */
715         unlink(ctdb->daemon.name);
716
717         /* create a unix domain stream socket to listen to */
718         res = ux_socket_bind(ctdb);
719         if (res!=0) {
720                 DEBUG(DEBUG_ALERT,(__location__ " Failed to open CTDB unix domain socket\n"));
721                 exit(10);
722         }
723
724         if (do_fork && fork()) {
725                 return 0;
726         }
727
728         tdb_reopen_all(False);
729
730         if (do_fork) {
731                 setsid();
732                 close(0);
733                 if (open("/dev/null", O_RDONLY) != 0) {
734                         DEBUG(DEBUG_ALERT,(__location__ " Failed to setup stdin on /dev/null\n"));
735                         exit(11);
736                 }
737         }
738         block_signal(SIGPIPE);
739
740         ctdbd_pid = getpid();
741
742
743         DEBUG(DEBUG_ERR, ("Starting CTDBD as pid : %u\n", ctdbd_pid));
744
745         ctdb_high_priority(ctdb);
746
747         /* ensure the socket is deleted on exit of the daemon */
748         domain_socket_name = talloc_strdup(talloc_autofree_context(), ctdb->daemon.name);
749         if (domain_socket_name == NULL) {
750                 DEBUG(DEBUG_ALERT,(__location__ " talloc_strdup failed.\n"));
751                 exit(12);
752         }
753
754         ctdb->ev = event_context_init(NULL);
755         tevent_loop_allow_nesting(ctdb->ev);
756         ret = ctdb_init_tevent_logging(ctdb);
757         if (ret != 0) {
758                 DEBUG(DEBUG_ALERT,("Failed to initialize TEVENT logging\n"));
759                 exit(1);
760         }
761
762         ctdb_set_child_logging(ctdb);
763
764         /* initialize statistics collection */
765         ctdb_statistics_init(ctdb);
766
767         /* force initial recovery for election */
768         ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
769
770         if (strcmp(ctdb->transport, "tcp") == 0) {
771                 int ctdb_tcp_init(struct ctdb_context *);
772                 ret = ctdb_tcp_init(ctdb);
773         }
774 #ifdef USE_INFINIBAND
775         if (strcmp(ctdb->transport, "ib") == 0) {
776                 int ctdb_ibw_init(struct ctdb_context *);
777                 ret = ctdb_ibw_init(ctdb);
778         }
779 #endif
780         if (ret != 0) {
781                 DEBUG(DEBUG_ERR,("Failed to initialise transport '%s'\n", ctdb->transport));
782                 return -1;
783         }
784
785         if (ctdb->methods == NULL) {
786                 DEBUG(DEBUG_ALERT,(__location__ " Can not initialize transport. ctdb->methods is NULL\n"));
787                 ctdb_fatal(ctdb, "transport is unavailable. can not initialize.");
788         }
789
790         /* initialise the transport  */
791         if (ctdb->methods->initialise(ctdb) != 0) {
792                 ctdb_fatal(ctdb, "transport failed to initialise");
793         }
794
795         /* attach to existing databases */
796         if (ctdb_attach_databases(ctdb) != 0) {
797                 ctdb_fatal(ctdb, "Failed to attach to databases\n");
798         }
799
800         ret = ctdb_event_script(ctdb, CTDB_EVENT_INIT);
801         if (ret != 0) {
802                 ctdb_fatal(ctdb, "Failed to run init event\n");
803         }
804         ctdb_run_notification_script(ctdb, "init");
805
806         /* start frozen, then let the first election sort things out */
807         if (ctdb_blocking_freeze(ctdb)) {
808                 ctdb_fatal(ctdb, "Failed to get initial freeze\n");
809         }
810
811         /* now start accepting clients, only can do this once frozen */
812         fde = event_add_fd(ctdb->ev, ctdb, ctdb->daemon.sd, 
813                            EVENT_FD_READ,
814                            ctdb_accept_client, ctdb);
815         tevent_fd_set_auto_close(fde);
816
817         /* release any IPs we hold from previous runs of the daemon */
818         ctdb_release_all_ips(ctdb);
819
820         /* start the transport going */
821         ctdb_start_transport(ctdb);
822
823         /* set up a handler to pick up sigchld */
824         se = event_add_signal(ctdb->ev, ctdb,
825                                      SIGCHLD, 0,
826                                      sig_child_handler,
827                                      ctdb);
828         if (se == NULL) {
829                 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD\n"));
830                 exit(1);
831         }
832
833         ret = ctdb_event_script_callback(ctdb,
834                                          ctdb,
835                                          ctdb_setup_event_callback,
836                                          ctdb,
837                                          false,
838                                          CTDB_EVENT_SETUP,
839                                          "");
840         if (ret != 0) {
841                 DEBUG(DEBUG_CRIT,("Failed to set up 'setup' event\n"));
842                 exit(1);
843         }
844
845         if (use_syslog) {
846                 if (start_syslog_daemon(ctdb)) {
847                         DEBUG(DEBUG_CRIT, ("Failed to start syslog daemon\n"));
848                         exit(10);
849                 }
850         }
851
852         ctdb_lockdown_memory(ctdb);
853           
854         /* go into a wait loop to allow other nodes to complete */
855         event_loop_wait(ctdb->ev);
856
857         DEBUG(DEBUG_CRIT,("event_loop_wait() returned. this should not happen\n"));
858         exit(1);
859 }
860
861 /*
862   allocate a packet for use in daemon<->daemon communication
863  */
864 struct ctdb_req_header *_ctdb_transport_allocate(struct ctdb_context *ctdb,
865                                                  TALLOC_CTX *mem_ctx, 
866                                                  enum ctdb_operation operation, 
867                                                  size_t length, size_t slength,
868                                                  const char *type)
869 {
870         int size;
871         struct ctdb_req_header *hdr;
872
873         length = MAX(length, slength);
874         size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
875
876         if (ctdb->methods == NULL) {
877                 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate transport packet for operation %u of length %u. Transport is DOWN.\n",
878                          operation, (unsigned)length));
879                 return NULL;
880         }
881
882         hdr = (struct ctdb_req_header *)ctdb->methods->allocate_pkt(mem_ctx, size);
883         if (hdr == NULL) {
884                 DEBUG(DEBUG_ERR,("Unable to allocate transport packet for operation %u of length %u\n",
885                          operation, (unsigned)length));
886                 return NULL;
887         }
888         talloc_set_name_const(hdr, type);
889         memset(hdr, 0, slength);
890         hdr->length       = length;
891         hdr->operation    = operation;
892         hdr->ctdb_magic   = CTDB_MAGIC;
893         hdr->ctdb_version = CTDB_VERSION;
894         hdr->generation   = ctdb->vnn_map->generation;
895         hdr->srcnode      = ctdb->pnn;
896
897         return hdr;     
898 }
899
900 struct daemon_control_state {
901         struct daemon_control_state *next, *prev;
902         struct ctdb_client *client;
903         struct ctdb_req_control *c;
904         uint32_t reqid;
905         struct ctdb_node *node;
906 };
907
908 /*
909   callback when a control reply comes in
910  */
911 static void daemon_control_callback(struct ctdb_context *ctdb,
912                                     int32_t status, TDB_DATA data, 
913                                     const char *errormsg,
914                                     void *private_data)
915 {
916         struct daemon_control_state *state = talloc_get_type(private_data, 
917                                                              struct daemon_control_state);
918         struct ctdb_client *client = state->client;
919         struct ctdb_reply_control *r;
920         size_t len;
921         int ret;
922
923         /* construct a message to send to the client containing the data */
924         len = offsetof(struct ctdb_reply_control, data) + data.dsize;
925         if (errormsg) {
926                 len += strlen(errormsg);
927         }
928         r = ctdbd_allocate_pkt(ctdb, state, CTDB_REPLY_CONTROL, len, 
929                                struct ctdb_reply_control);
930         CTDB_NO_MEMORY_VOID(ctdb, r);
931
932         r->hdr.reqid     = state->reqid;
933         r->status        = status;
934         r->datalen       = data.dsize;
935         r->errorlen = 0;
936         memcpy(&r->data[0], data.dptr, data.dsize);
937         if (errormsg) {
938                 r->errorlen = strlen(errormsg);
939                 memcpy(&r->data[r->datalen], errormsg, r->errorlen);
940         }
941
942         ret = daemon_queue_send(client, &r->hdr);
943         if (ret != -1) {
944                 talloc_free(state);
945         }
946 }
947
948 /*
949   fail all pending controls to a disconnected node
950  */
951 void ctdb_daemon_cancel_controls(struct ctdb_context *ctdb, struct ctdb_node *node)
952 {
953         struct daemon_control_state *state;
954         while ((state = node->pending_controls)) {
955                 DLIST_REMOVE(node->pending_controls, state);
956                 daemon_control_callback(ctdb, (uint32_t)-1, tdb_null, 
957                                         "node is disconnected", state);
958         }
959 }
960
961 /*
962   destroy a daemon_control_state
963  */
964 static int daemon_control_destructor(struct daemon_control_state *state)
965 {
966         if (state->node) {
967                 DLIST_REMOVE(state->node->pending_controls, state);
968         }
969         return 0;
970 }
971
972 /*
973   this is called when the ctdb daemon received a ctdb request control
974   from a local client over the unix domain socket
975  */
976 static void daemon_request_control_from_client(struct ctdb_client *client, 
977                                                struct ctdb_req_control *c)
978 {
979         TDB_DATA data;
980         int res;
981         struct daemon_control_state *state;
982         TALLOC_CTX *tmp_ctx = talloc_new(client);
983
984         if (c->hdr.destnode == CTDB_CURRENT_NODE) {
985                 c->hdr.destnode = client->ctdb->pnn;
986         }
987
988         state = talloc(client, struct daemon_control_state);
989         CTDB_NO_MEMORY_VOID(client->ctdb, state);
990
991         state->client = client;
992         state->c = talloc_steal(state, c);
993         state->reqid = c->hdr.reqid;
994         if (ctdb_validate_pnn(client->ctdb, c->hdr.destnode)) {
995                 state->node = client->ctdb->nodes[c->hdr.destnode];
996                 DLIST_ADD(state->node->pending_controls, state);
997         } else {
998                 state->node = NULL;
999         }
1000
1001         talloc_set_destructor(state, daemon_control_destructor);
1002
1003         if (c->flags & CTDB_CTRL_FLAG_NOREPLY) {
1004                 talloc_steal(tmp_ctx, state);
1005         }
1006         
1007         data.dptr = &c->data[0];
1008         data.dsize = c->datalen;
1009         res = ctdb_daemon_send_control(client->ctdb, c->hdr.destnode,
1010                                        c->srvid, c->opcode, client->client_id,
1011                                        c->flags,
1012                                        data, daemon_control_callback,
1013                                        state);
1014         if (res != 0) {
1015                 DEBUG(DEBUG_ERR,(__location__ " Failed to send control to remote node %u\n",
1016                          c->hdr.destnode));
1017         }
1018
1019         talloc_free(tmp_ctx);
1020 }
1021
1022 /*
1023   register a call function
1024 */
1025 int ctdb_daemon_set_call(struct ctdb_context *ctdb, uint32_t db_id,
1026                          ctdb_fn_t fn, int id)
1027 {
1028         struct ctdb_registered_call *call;
1029         struct ctdb_db_context *ctdb_db;
1030
1031         ctdb_db = find_ctdb_db(ctdb, db_id);
1032         if (ctdb_db == NULL) {
1033                 return -1;
1034         }
1035
1036         call = talloc(ctdb_db, struct ctdb_registered_call);
1037         call->fn = fn;
1038         call->id = id;
1039
1040         DLIST_ADD(ctdb_db->calls, call);        
1041         return 0;
1042 }
1043
1044
1045
1046 /*
1047   this local messaging handler is ugly, but is needed to prevent
1048   recursion in ctdb_send_message() when the destination node is the
1049   same as the source node
1050  */
1051 struct ctdb_local_message {
1052         struct ctdb_context *ctdb;
1053         uint64_t srvid;
1054         TDB_DATA data;
1055 };
1056
1057 static void ctdb_local_message_trigger(struct event_context *ev, struct timed_event *te, 
1058                                        struct timeval t, void *private_data)
1059 {
1060         struct ctdb_local_message *m = talloc_get_type(private_data, 
1061                                                        struct ctdb_local_message);
1062         int res;
1063
1064         res = ctdb_dispatch_message(m->ctdb, m->srvid, m->data);
1065         if (res != 0) {
1066                 DEBUG(DEBUG_ERR, (__location__ " Failed to dispatch message for srvid=%llu\n", 
1067                           (unsigned long long)m->srvid));
1068         }
1069         talloc_free(m);
1070 }
1071
1072 static int ctdb_local_message(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data)
1073 {
1074         struct ctdb_local_message *m;
1075         m = talloc(ctdb, struct ctdb_local_message);
1076         CTDB_NO_MEMORY(ctdb, m);
1077
1078         m->ctdb = ctdb;
1079         m->srvid = srvid;
1080         m->data  = data;
1081         m->data.dptr = talloc_memdup(m, m->data.dptr, m->data.dsize);
1082         if (m->data.dptr == NULL) {
1083                 talloc_free(m);
1084                 return -1;
1085         }
1086
1087         /* this needs to be done as an event to prevent recursion */
1088         event_add_timed(ctdb->ev, m, timeval_zero(), ctdb_local_message_trigger, m);
1089         return 0;
1090 }
1091
1092 /*
1093   send a ctdb message
1094 */
1095 int ctdb_daemon_send_message(struct ctdb_context *ctdb, uint32_t pnn,
1096                              uint64_t srvid, TDB_DATA data)
1097 {
1098         struct ctdb_req_message *r;
1099         int len;
1100
1101         if (ctdb->methods == NULL) {
1102                 DEBUG(DEBUG_ERR,(__location__ " Failed to send message. Transport is DOWN\n"));
1103                 return -1;
1104         }
1105
1106         /* see if this is a message to ourselves */
1107         if (pnn == ctdb->pnn) {
1108                 return ctdb_local_message(ctdb, srvid, data);
1109         }
1110
1111         len = offsetof(struct ctdb_req_message, data) + data.dsize;
1112         r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_MESSAGE, len,
1113                                     struct ctdb_req_message);
1114         CTDB_NO_MEMORY(ctdb, r);
1115
1116         r->hdr.destnode  = pnn;
1117         r->srvid         = srvid;
1118         r->datalen       = data.dsize;
1119         memcpy(&r->data[0], data.dptr, data.dsize);
1120
1121         ctdb_queue_packet(ctdb, &r->hdr);
1122
1123         talloc_free(r);
1124         return 0;
1125 }
1126
1127
1128
1129 struct ctdb_client_notify_list {
1130         struct ctdb_client_notify_list *next, *prev;
1131         struct ctdb_context *ctdb;
1132         uint64_t srvid;
1133         TDB_DATA data;
1134 };
1135
1136
1137 static int ctdb_client_notify_destructor(struct ctdb_client_notify_list *nl)
1138 {
1139         int ret;
1140
1141         DEBUG(DEBUG_ERR,("Sending client notify message for srvid:%llu\n", (unsigned long long)nl->srvid));
1142
1143         ret = ctdb_daemon_send_message(nl->ctdb, CTDB_BROADCAST_CONNECTED, (unsigned long long)nl->srvid, nl->data);
1144         if (ret != 0) {
1145                 DEBUG(DEBUG_ERR,("Failed to send client notify message\n"));
1146         }
1147
1148         return 0;
1149 }
1150
1151 int32_t ctdb_control_register_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1152 {
1153         struct ctdb_client_notify_register *notify = (struct ctdb_client_notify_register *)indata.dptr;
1154         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client); 
1155         struct ctdb_client_notify_list *nl;
1156
1157         DEBUG(DEBUG_INFO,("Register srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
1158
1159         if (indata.dsize < offsetof(struct ctdb_client_notify_register, notify_data)) {
1160                 DEBUG(DEBUG_ERR,(__location__ " Too little data in control : %d\n", (int)indata.dsize));
1161                 return -1;
1162         }
1163
1164         if (indata.dsize != (notify->len + offsetof(struct ctdb_client_notify_register, notify_data))) {
1165                 DEBUG(DEBUG_ERR,(__location__ " Wrong amount of data in control. Got %d, expected %d\n", (int)indata.dsize, (int)(notify->len + offsetof(struct ctdb_client_notify_register, notify_data))));
1166                 return -1;
1167         }
1168
1169
1170         if (client == NULL) {
1171                 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1172                 return -1;
1173         }
1174
1175         for(nl=client->notify; nl; nl=nl->next) {
1176                 if (nl->srvid == notify->srvid) {
1177                         break;
1178                 }
1179         }
1180         if (nl != NULL) {
1181                 DEBUG(DEBUG_ERR,(__location__ " Notification for srvid:%llu already exists for this client\n", (unsigned long long)notify->srvid));
1182                 return -1;
1183         }
1184
1185         nl = talloc(client, struct ctdb_client_notify_list);
1186         CTDB_NO_MEMORY(ctdb, nl);
1187         nl->ctdb       = ctdb;
1188         nl->srvid      = notify->srvid;
1189         nl->data.dsize = notify->len;
1190         nl->data.dptr  = talloc_size(nl, nl->data.dsize);
1191         CTDB_NO_MEMORY(ctdb, nl->data.dptr);
1192         memcpy(nl->data.dptr, notify->notify_data, nl->data.dsize);
1193         
1194         DLIST_ADD(client->notify, nl);
1195         talloc_set_destructor(nl, ctdb_client_notify_destructor);
1196
1197         return 0;
1198 }
1199
1200 int32_t ctdb_control_deregister_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1201 {
1202         struct ctdb_client_notify_deregister *notify = (struct ctdb_client_notify_deregister *)indata.dptr;
1203         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client); 
1204         struct ctdb_client_notify_list *nl;
1205
1206         DEBUG(DEBUG_INFO,("Deregister srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
1207
1208         if (client == NULL) {
1209                 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1210                 return -1;
1211         }
1212
1213         for(nl=client->notify; nl; nl=nl->next) {
1214                 if (nl->srvid == notify->srvid) {
1215                         break;
1216                 }
1217         }
1218         if (nl == NULL) {
1219                 DEBUG(DEBUG_ERR,(__location__ " No notification for srvid:%llu found for this client\n", (unsigned long long)notify->srvid));
1220                 return -1;
1221         }
1222
1223         DLIST_REMOVE(client->notify, nl);
1224         talloc_set_destructor(nl, NULL);
1225         talloc_free(nl);
1226
1227         return 0;
1228 }
1229
1230 struct ctdb_client *ctdb_find_client_by_pid(struct ctdb_context *ctdb, pid_t pid)
1231 {
1232         struct ctdb_client_pid_list *client_pid;
1233
1234         for (client_pid = ctdb->client_pids; client_pid; client_pid=client_pid->next) {
1235                 if (client_pid->pid == pid) {
1236                         return client_pid->client;
1237                 }
1238         }
1239         return NULL;
1240 }
1241
1242
1243 /* This control is used by samba when probing if a process (of a samba daemon)
1244    exists on the node.
1245    Samba does this when it needs/wants to check if a subrecord in one of the
1246    databases is still valied, or if it is stale and can be removed.
1247    If the node is in unhealthy or stopped state we just kill of the samba
1248    process holding htis sub-record and return to the calling samba that
1249    the process does not exist.
1250    This allows us to forcefully recall subrecords registered by samba processes
1251    on banned and stopped nodes.
1252 */
1253 int32_t ctdb_control_process_exists(struct ctdb_context *ctdb, pid_t pid)
1254 {
1255         struct ctdb_client *client;
1256
1257         if (ctdb->nodes[ctdb->pnn]->flags & (NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
1258                 client = ctdb_find_client_by_pid(ctdb, pid);
1259                 if (client != NULL) {
1260                         DEBUG(DEBUG_NOTICE,(__location__ " Killing client with pid:%d on banned/stopped node\n", (int)pid));
1261                         talloc_free(client);
1262                 }
1263                 return -1;
1264         }
1265
1266         return kill(pid, 0);
1267 }