lower the loglevel for the message that a client has attached through a domian socket
[ctdb.git] / server / ctdb_daemon.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2006
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "db_wrap.h"
22 #include "lib/tdb/include/tdb.h"
23 #include "lib/events/events.h"
24 #include "lib/util/dlinklist.h"
25 #include "system/network.h"
26 #include "system/filesys.h"
27 #include "system/wait.h"
28 #include "../include/ctdb.h"
29 #include "../include/ctdb_private.h"
30 #include <sys/socket.h>
31
32 struct ctdb_client_pid_list {
33         struct ctdb_client_pid_list *next, *prev;
34         struct ctdb_context *ctdb;
35         pid_t pid;
36         struct ctdb_client *client;
37 };
38
39 static void daemon_incoming_packet(void *, struct ctdb_req_header *);
40
41 static void print_exit_message(void)
42 {
43         DEBUG(DEBUG_NOTICE,("CTDB daemon shutting down\n"));
44 }
45
46 /* called when the "startup" event script has finished */
47 static void ctdb_start_transport(struct ctdb_context *ctdb)
48 {
49         if (ctdb->methods == NULL) {
50                 DEBUG(DEBUG_ALERT,(__location__ " startup event finished but transport is DOWN.\n"));
51                 ctdb_fatal(ctdb, "transport is not initialized but startup completed");
52         }
53
54         /* start the transport running */
55         if (ctdb->methods->start(ctdb) != 0) {
56                 DEBUG(DEBUG_ALERT,("transport failed to start!\n"));
57                 ctdb_fatal(ctdb, "transport failed to start");
58         }
59
60         /* start the recovery daemon process */
61         if (ctdb_start_recoverd(ctdb) != 0) {
62                 DEBUG(DEBUG_ALERT,("Failed to start recovery daemon\n"));
63                 exit(11);
64         }
65
66         /* Make sure we log something when the daemon terminates */
67         atexit(print_exit_message);
68
69         /* start monitoring for connected/disconnected nodes */
70         ctdb_start_keepalive(ctdb);
71
72         /* start monitoring for node health */
73         ctdb_start_monitoring(ctdb);
74
75         /* start periodic update of tcp tickle lists */
76         ctdb_start_tcp_tickle_update(ctdb);
77
78         /* start listening for recovery daemon pings */
79         ctdb_control_recd_ping(ctdb);
80 }
81
82 static void block_signal(int signum)
83 {
84         struct sigaction act;
85
86         memset(&act, 0, sizeof(act));
87
88         act.sa_handler = SIG_IGN;
89         sigemptyset(&act.sa_mask);
90         sigaddset(&act.sa_mask, signum);
91         sigaction(signum, &act, NULL);
92 }
93
94
95 /*
96   send a packet to a client
97  */
98 static int daemon_queue_send(struct ctdb_client *client, struct ctdb_req_header *hdr)
99 {
100         client->ctdb->statistics.client_packets_sent++;
101         if (hdr->operation == CTDB_REQ_MESSAGE) {
102                 if (ctdb_queue_length(client->queue) > client->ctdb->tunable.max_queue_depth_drop_msg) {
103                         DEBUG(DEBUG_ERR,("Drop CTDB_REQ_MESSAGE to client. Queue full.\n"));
104                         return 0;
105                 }
106         }
107         return ctdb_queue_send(client->queue, (uint8_t *)hdr, hdr->length);
108 }
109
110 /*
111   message handler for when we are in daemon mode. This redirects the message
112   to the right client
113  */
114 static void daemon_message_handler(struct ctdb_context *ctdb, uint64_t srvid, 
115                                     TDB_DATA data, void *private_data)
116 {
117         struct ctdb_client *client = talloc_get_type(private_data, struct ctdb_client);
118         struct ctdb_req_message *r;
119         int len;
120
121         /* construct a message to send to the client containing the data */
122         len = offsetof(struct ctdb_req_message, data) + data.dsize;
123         r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE, 
124                                len, struct ctdb_req_message);
125         CTDB_NO_MEMORY_VOID(ctdb, r);
126
127         talloc_set_name_const(r, "req_message packet");
128
129         r->srvid         = srvid;
130         r->datalen       = data.dsize;
131         memcpy(&r->data[0], data.dptr, data.dsize);
132
133         daemon_queue_send(client, &r->hdr);
134
135         talloc_free(r);
136 }
137                                            
138
139 /*
140   this is called when the ctdb daemon received a ctdb request to 
141   set the srvid from the client
142  */
143 int daemon_register_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
144 {
145         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
146         int res;
147         if (client == NULL) {
148                 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_register_message_handler\n"));
149                 return -1;
150         }
151         res = ctdb_register_message_handler(ctdb, client, srvid, daemon_message_handler, client);
152         if (res != 0) {
153                 DEBUG(DEBUG_ERR,(__location__ " Failed to register handler %llu in daemon\n", 
154                          (unsigned long long)srvid));
155         } else {
156                 DEBUG(DEBUG_INFO,(__location__ " Registered message handler for srvid=%llu\n", 
157                          (unsigned long long)srvid));
158         }
159
160         return res;
161 }
162
163 /*
164   this is called when the ctdb daemon received a ctdb request to 
165   remove a srvid from the client
166  */
167 int daemon_deregister_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
168 {
169         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
170         if (client == NULL) {
171                 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_deregister_message_handler\n"));
172                 return -1;
173         }
174         return ctdb_deregister_message_handler(ctdb, srvid, client);
175 }
176
177
178 /*
179   destroy a ctdb_client
180 */
181 static int ctdb_client_destructor(struct ctdb_client *client)
182 {
183         struct ctdb_db_context *ctdb_db;
184
185         ctdb_takeover_client_destructor_hook(client);
186         ctdb_reqid_remove(client->ctdb, client->client_id);
187         if (client->ctdb->statistics.num_clients) {
188                 client->ctdb->statistics.num_clients--;
189         }
190
191         if (client->num_persistent_updates != 0) {
192                 DEBUG(DEBUG_ERR,(__location__ " Client disconnecting with %u persistent updates in flight. Starting recovery\n", client->num_persistent_updates));
193                 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
194         }
195         ctdb_db = find_ctdb_db(client->ctdb, client->db_id);
196         if (ctdb_db) {
197                 DEBUG(DEBUG_ERR, (__location__ " client exit while transaction "
198                                   "commit active. Forcing recovery.\n"));
199                 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
200                 ctdb_db->transaction_active = false;
201         }
202
203         return 0;
204 }
205
206
207 /*
208   this is called when the ctdb daemon received a ctdb request message
209   from a local client over the unix domain socket
210  */
211 static void daemon_request_message_from_client(struct ctdb_client *client, 
212                                                struct ctdb_req_message *c)
213 {
214         TDB_DATA data;
215         int res;
216
217         /* maybe the message is for another client on this node */
218         if (ctdb_get_pnn(client->ctdb)==c->hdr.destnode) {
219                 ctdb_request_message(client->ctdb, (struct ctdb_req_header *)c);
220                 return;
221         }
222
223         /* its for a remote node */
224         data.dptr = &c->data[0];
225         data.dsize = c->datalen;
226         res = ctdb_daemon_send_message(client->ctdb, c->hdr.destnode,
227                                        c->srvid, data);
228         if (res != 0) {
229                 DEBUG(DEBUG_ERR,(__location__ " Failed to send message to remote node %u\n",
230                          c->hdr.destnode));
231         }
232 }
233
234
235 struct daemon_call_state {
236         struct ctdb_client *client;
237         uint32_t reqid;
238         struct ctdb_call *call;
239         struct timeval start_time;
240 };
241
242 /* 
243    complete a call from a client 
244 */
245 static void daemon_call_from_client_callback(struct ctdb_call_state *state)
246 {
247         struct daemon_call_state *dstate = talloc_get_type(state->async.private_data, 
248                                                            struct daemon_call_state);
249         struct ctdb_reply_call *r;
250         int res;
251         uint32_t length;
252         struct ctdb_client *client = dstate->client;
253         struct ctdb_db_context *ctdb_db = state->ctdb_db;
254
255         talloc_steal(client, dstate);
256         talloc_steal(dstate, dstate->call);
257
258         res = ctdb_daemon_call_recv(state, dstate->call);
259         if (res != 0) {
260                 DEBUG(DEBUG_ERR, (__location__ " ctdbd_call_recv() returned error\n"));
261                 if (client->ctdb->statistics.pending_calls > 0) {
262                         client->ctdb->statistics.pending_calls--;
263                 }
264                 ctdb_latency(ctdb_db, "call_from_client_cb 1", &client->ctdb->statistics.max_call_latency, dstate->start_time);
265                 return;
266         }
267
268         length = offsetof(struct ctdb_reply_call, data) + dstate->call->reply_data.dsize;
269         r = ctdbd_allocate_pkt(client->ctdb, dstate, CTDB_REPLY_CALL, 
270                                length, struct ctdb_reply_call);
271         if (r == NULL) {
272                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate reply_call in ctdb daemon\n"));
273                 if (client->ctdb->statistics.pending_calls > 0) {
274                         client->ctdb->statistics.pending_calls--;
275                 }
276                 ctdb_latency(ctdb_db, "call_from_client_cb 2", &client->ctdb->statistics.max_call_latency, dstate->start_time);
277                 return;
278         }
279         r->hdr.reqid        = dstate->reqid;
280         r->datalen          = dstate->call->reply_data.dsize;
281         memcpy(&r->data[0], dstate->call->reply_data.dptr, r->datalen);
282
283         res = daemon_queue_send(client, &r->hdr);
284         if (res != 0) {
285                 DEBUG(DEBUG_ERR, (__location__ " Failed to queue packet from daemon to client\n"));
286         }
287         ctdb_latency(ctdb_db, "call_from_client_cb 3", &client->ctdb->statistics.max_call_latency, dstate->start_time);
288         talloc_free(dstate);
289         if (client->ctdb->statistics.pending_calls > 0) {
290                 client->ctdb->statistics.pending_calls--;
291         }
292 }
293
294 struct ctdb_daemon_packet_wrap {
295         struct ctdb_context *ctdb;
296         uint32_t client_id;
297 };
298
299 /*
300   a wrapper to catch disconnected clients
301  */
302 static void daemon_incoming_packet_wrap(void *p, struct ctdb_req_header *hdr)
303 {
304         struct ctdb_client *client;
305         struct ctdb_daemon_packet_wrap *w = talloc_get_type(p, 
306                                                             struct ctdb_daemon_packet_wrap);
307         if (w == NULL) {
308                 DEBUG(DEBUG_CRIT,(__location__ " Bad packet type '%s'\n", talloc_get_name(p)));
309                 return;
310         }
311
312         client = ctdb_reqid_find(w->ctdb, w->client_id, struct ctdb_client);
313         if (client == NULL) {
314                 DEBUG(DEBUG_ERR,(__location__ " Packet for disconnected client %u\n",
315                          w->client_id));
316                 talloc_free(w);
317                 return;
318         }
319         talloc_free(w);
320
321         /* process it */
322         daemon_incoming_packet(client, hdr);    
323 }
324
325
326 /*
327   this is called when the ctdb daemon received a ctdb request call
328   from a local client over the unix domain socket
329  */
330 static void daemon_request_call_from_client(struct ctdb_client *client, 
331                                             struct ctdb_req_call *c)
332 {
333         struct ctdb_call_state *state;
334         struct ctdb_db_context *ctdb_db;
335         struct daemon_call_state *dstate;
336         struct ctdb_call *call;
337         struct ctdb_ltdb_header header;
338         TDB_DATA key, data;
339         int ret;
340         struct ctdb_context *ctdb = client->ctdb;
341         struct ctdb_daemon_packet_wrap *w;
342
343         ctdb->statistics.total_calls++;
344         if (client->ctdb->statistics.pending_calls > 0) {
345                 ctdb->statistics.pending_calls++;
346         }
347
348         ctdb_db = find_ctdb_db(client->ctdb, c->db_id);
349         if (!ctdb_db) {
350                 DEBUG(DEBUG_ERR, (__location__ " Unknown database in request. db_id==0x%08x",
351                           c->db_id));
352                 if (client->ctdb->statistics.pending_calls > 0) {
353                         ctdb->statistics.pending_calls--;
354                 }
355                 return;
356         }
357
358         key.dptr = c->data;
359         key.dsize = c->keylen;
360
361         w = talloc(ctdb, struct ctdb_daemon_packet_wrap);
362         CTDB_NO_MEMORY_VOID(ctdb, w);   
363
364         w->ctdb = ctdb;
365         w->client_id = client->client_id;
366
367         ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header, 
368                                            (struct ctdb_req_header *)c, &data,
369                                            daemon_incoming_packet_wrap, w, True);
370         if (ret == -2) {
371                 /* will retry later */
372                 if (client->ctdb->statistics.pending_calls > 0) {
373                         ctdb->statistics.pending_calls--;
374                 }
375                 return;
376         }
377
378         talloc_free(w);
379
380         if (ret != 0) {
381                 DEBUG(DEBUG_ERR,(__location__ " Unable to fetch record\n"));
382                 if (client->ctdb->statistics.pending_calls > 0) {
383                         ctdb->statistics.pending_calls--;
384                 }
385                 return;
386         }
387
388         dstate = talloc(client, struct daemon_call_state);
389         if (dstate == NULL) {
390                 ctdb_ltdb_unlock(ctdb_db, key);
391                 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate dstate\n"));
392                 if (client->ctdb->statistics.pending_calls > 0) {
393                         ctdb->statistics.pending_calls--;
394                 }
395                 return;
396         }
397         dstate->start_time = timeval_current();
398         dstate->client = client;
399         dstate->reqid  = c->hdr.reqid;
400         talloc_steal(dstate, data.dptr);
401
402         call = dstate->call = talloc_zero(dstate, struct ctdb_call);
403         if (call == NULL) {
404                 ctdb_ltdb_unlock(ctdb_db, key);
405                 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate call\n"));
406                 if (client->ctdb->statistics.pending_calls > 0) {
407                         ctdb->statistics.pending_calls--;
408                 }
409                 ctdb_latency(ctdb_db, "call_from_client 1", &ctdb->statistics.max_call_latency, dstate->start_time);
410                 return;
411         }
412
413         call->call_id = c->callid;
414         call->key = key;
415         call->call_data.dptr = c->data + c->keylen;
416         call->call_data.dsize = c->calldatalen;
417         call->flags = c->flags;
418
419         if (header.dmaster == ctdb->pnn) {
420                 state = ctdb_call_local_send(ctdb_db, call, &header, &data);
421         } else {
422                 state = ctdb_daemon_call_send_remote(ctdb_db, call, &header);
423         }
424
425         ctdb_ltdb_unlock(ctdb_db, key);
426
427         if (state == NULL) {
428                 DEBUG(DEBUG_ERR,(__location__ " Unable to setup call send\n"));
429                 if (client->ctdb->statistics.pending_calls > 0) {
430                         ctdb->statistics.pending_calls--;
431                 }
432                 ctdb_latency(ctdb_db, "call_from_client 2", &ctdb->statistics.max_call_latency, dstate->start_time);
433                 return;
434         }
435         talloc_steal(state, dstate);
436         talloc_steal(client, state);
437
438         state->async.fn = daemon_call_from_client_callback;
439         state->async.private_data = dstate;
440 }
441
442
443 static void daemon_request_control_from_client(struct ctdb_client *client, 
444                                                struct ctdb_req_control *c);
445
446 /* data contains a packet from the client */
447 static void daemon_incoming_packet(void *p, struct ctdb_req_header *hdr)
448 {
449         struct ctdb_client *client = talloc_get_type(p, struct ctdb_client);
450         TALLOC_CTX *tmp_ctx;
451         struct ctdb_context *ctdb = client->ctdb;
452
453         /* place the packet as a child of a tmp_ctx. We then use
454            talloc_free() below to free it. If any of the calls want
455            to keep it, then they will steal it somewhere else, and the
456            talloc_free() will be a no-op */
457         tmp_ctx = talloc_new(client);
458         talloc_steal(tmp_ctx, hdr);
459
460         if (hdr->ctdb_magic != CTDB_MAGIC) {
461                 ctdb_set_error(client->ctdb, "Non CTDB packet rejected in daemon\n");
462                 goto done;
463         }
464
465         if (hdr->ctdb_version != CTDB_VERSION) {
466                 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
467                 goto done;
468         }
469
470         switch (hdr->operation) {
471         case CTDB_REQ_CALL:
472                 ctdb->statistics.client.req_call++;
473                 daemon_request_call_from_client(client, (struct ctdb_req_call *)hdr);
474                 break;
475
476         case CTDB_REQ_MESSAGE:
477                 ctdb->statistics.client.req_message++;
478                 daemon_request_message_from_client(client, (struct ctdb_req_message *)hdr);
479                 break;
480
481         case CTDB_REQ_CONTROL:
482                 ctdb->statistics.client.req_control++;
483                 daemon_request_control_from_client(client, (struct ctdb_req_control *)hdr);
484                 break;
485
486         default:
487                 DEBUG(DEBUG_CRIT,(__location__ " daemon: unrecognized operation %u\n",
488                          hdr->operation));
489         }
490
491 done:
492         talloc_free(tmp_ctx);
493 }
494
495 /*
496   called when the daemon gets a incoming packet
497  */
498 static void ctdb_daemon_read_cb(uint8_t *data, size_t cnt, void *args)
499 {
500         struct ctdb_client *client = talloc_get_type(args, struct ctdb_client);
501         struct ctdb_req_header *hdr;
502
503         if (cnt == 0) {
504                 talloc_free(client);
505                 return;
506         }
507
508         client->ctdb->statistics.client_packets_recv++;
509
510         if (cnt < sizeof(*hdr)) {
511                 ctdb_set_error(client->ctdb, "Bad packet length %u in daemon\n", 
512                                (unsigned)cnt);
513                 return;
514         }
515         hdr = (struct ctdb_req_header *)data;
516         if (cnt != hdr->length) {
517                 ctdb_set_error(client->ctdb, "Bad header length %u expected %u\n in daemon", 
518                                (unsigned)hdr->length, (unsigned)cnt);
519                 return;
520         }
521
522         if (hdr->ctdb_magic != CTDB_MAGIC) {
523                 ctdb_set_error(client->ctdb, "Non CTDB packet rejected\n");
524                 return;
525         }
526
527         if (hdr->ctdb_version != CTDB_VERSION) {
528                 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
529                 return;
530         }
531
532         DEBUG(DEBUG_DEBUG,(__location__ " client request %u of type %u length %u from "
533                  "node %u to %u\n", hdr->reqid, hdr->operation, hdr->length,
534                  hdr->srcnode, hdr->destnode));
535
536         /* it is the responsibility of the incoming packet function to free 'data' */
537         daemon_incoming_packet(client, hdr);
538 }
539
540
541 static int ctdb_clientpid_destructor(struct ctdb_client_pid_list *client_pid)
542 {
543         if (client_pid->ctdb->client_pids != NULL) {
544                 DLIST_REMOVE(client_pid->ctdb->client_pids, client_pid);
545         }
546
547         return 0;
548 }
549
550
551 static void ctdb_accept_client(struct event_context *ev, struct fd_event *fde, 
552                          uint16_t flags, void *private_data)
553 {
554         struct sockaddr_un addr;
555         socklen_t len;
556         int fd;
557         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
558         struct ctdb_client *client;
559         struct ctdb_client_pid_list *client_pid;
560 #ifdef _AIX
561         struct peercred_struct cr;
562         socklen_t crl = sizeof(struct peercred_struct);
563 #else
564         struct ucred cr;
565         socklen_t crl = sizeof(struct ucred);
566 #endif
567
568         memset(&addr, 0, sizeof(addr));
569         len = sizeof(addr);
570         fd = accept(ctdb->daemon.sd, (struct sockaddr *)&addr, &len);
571         if (fd == -1) {
572                 return;
573         }
574
575         set_nonblocking(fd);
576         set_close_on_exec(fd);
577
578         DEBUG(DEBUG_DEBUG,(__location__ " Created SOCKET FD:%d to connected child\n", fd));
579
580         client = talloc_zero(ctdb, struct ctdb_client);
581 #ifdef _AIX
582         if (getsockopt(fd, SOL_SOCKET, SO_PEERID, &cr, &crl) == 0) {
583 #else
584         if (getsockopt(fd, SOL_SOCKET, SO_PEERCRED, &cr, &crl) == 0) {
585 #endif
586                 DEBUG(DEBUG_INFO,("Connected client with pid:%u\n", (unsigned)cr.pid));
587         }
588
589         client->ctdb = ctdb;
590         client->fd = fd;
591         client->client_id = ctdb_reqid_new(ctdb, client);
592         client->pid = cr.pid;
593
594         client_pid = talloc(client, struct ctdb_client_pid_list);
595         if (client_pid == NULL) {
596                 DEBUG(DEBUG_ERR,("Failed to allocate client pid structure\n"));
597                 close(fd);
598                 talloc_free(client);
599                 return;
600         }               
601         client_pid->ctdb   = ctdb;
602         client_pid->pid    = cr.pid;
603         client_pid->client = client;
604
605         DLIST_ADD(ctdb->client_pids, client_pid);
606
607         client->queue = ctdb_queue_setup(ctdb, client, fd, CTDB_DS_ALIGNMENT, 
608                                          ctdb_daemon_read_cb, client);
609
610         talloc_set_destructor(client, ctdb_client_destructor);
611         talloc_set_destructor(client_pid, ctdb_clientpid_destructor);
612         ctdb->statistics.num_clients++;
613 }
614
615
616
617 /*
618   create a unix domain socket and bind it
619   return a file descriptor open on the socket 
620 */
621 static int ux_socket_bind(struct ctdb_context *ctdb)
622 {
623         struct sockaddr_un addr;
624
625         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
626         if (ctdb->daemon.sd == -1) {
627                 return -1;
628         }
629
630         set_close_on_exec(ctdb->daemon.sd);
631         set_nonblocking(ctdb->daemon.sd);
632
633         memset(&addr, 0, sizeof(addr));
634         addr.sun_family = AF_UNIX;
635         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
636
637         if (bind(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
638                 DEBUG(DEBUG_CRIT,("Unable to bind on ctdb socket '%s'\n", ctdb->daemon.name));
639                 goto failed;
640         }       
641
642         if (chown(ctdb->daemon.name, geteuid(), getegid()) != 0 ||
643             chmod(ctdb->daemon.name, 0700) != 0) {
644                 DEBUG(DEBUG_CRIT,("Unable to secure ctdb socket '%s', ctdb->daemon.name\n", ctdb->daemon.name));
645                 goto failed;
646         } 
647
648
649         if (listen(ctdb->daemon.sd, 100) != 0) {
650                 DEBUG(DEBUG_CRIT,("Unable to listen on ctdb socket '%s'\n", ctdb->daemon.name));
651                 goto failed;
652         }
653
654         return 0;
655
656 failed:
657         close(ctdb->daemon.sd);
658         ctdb->daemon.sd = -1;
659         return -1;      
660 }
661
662 static void sig_child_handler(struct event_context *ev,
663         struct signal_event *se, int signum, int count,
664         void *dont_care, 
665         void *private_data)
666 {
667 //      struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
668         int status;
669         pid_t pid = -1;
670
671         while (pid != 0) {
672                 pid = waitpid(-1, &status, WNOHANG);
673                 if (pid == -1) {
674                         DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%d\n", errno));
675                         return;
676                 }
677                 if (pid > 0) {
678                         DEBUG(DEBUG_DEBUG, ("SIGCHLD from %d\n", (int)pid));
679                 }
680         }
681 }
682
683 /*
684   start the protocol going as a daemon
685 */
686 int ctdb_start_daemon(struct ctdb_context *ctdb, bool do_fork, bool use_syslog)
687 {
688         int res, ret = -1;
689         struct fd_event *fde;
690         const char *domain_socket_name;
691         struct signal_event *se;
692
693         /* get rid of any old sockets */
694         unlink(ctdb->daemon.name);
695
696         /* create a unix domain stream socket to listen to */
697         res = ux_socket_bind(ctdb);
698         if (res!=0) {
699                 DEBUG(DEBUG_ALERT,(__location__ " Failed to open CTDB unix domain socket\n"));
700                 exit(10);
701         }
702
703         if (do_fork && fork()) {
704                 return 0;
705         }
706
707         tdb_reopen_all(False);
708
709         if (do_fork) {
710                 setsid();
711                 close(0);
712                 if (open("/dev/null", O_RDONLY) != 0) {
713                         DEBUG(DEBUG_ALERT,(__location__ " Failed to setup stdin on /dev/null\n"));
714                         exit(11);
715                 }
716         }
717         block_signal(SIGPIPE);
718
719         ctdbd_pid = getpid();
720
721
722         DEBUG(DEBUG_ERR, ("Starting CTDBD as pid : %u\n", ctdbd_pid));
723
724         if (ctdb->do_setsched) {
725                 /* try to set us up as realtime */
726                 ctdb_set_scheduler(ctdb);
727         }
728
729         /* ensure the socket is deleted on exit of the daemon */
730         domain_socket_name = talloc_strdup(talloc_autofree_context(), ctdb->daemon.name);
731         if (domain_socket_name == NULL) {
732                 DEBUG(DEBUG_ALERT,(__location__ " talloc_strdup failed.\n"));
733                 exit(12);
734         }
735
736         ctdb->ev = event_context_init(NULL);
737
738         ctdb_set_child_logging(ctdb);
739
740         /* force initial recovery for election */
741         ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
742
743         if (strcmp(ctdb->transport, "tcp") == 0) {
744                 int ctdb_tcp_init(struct ctdb_context *);
745                 ret = ctdb_tcp_init(ctdb);
746         }
747 #ifdef USE_INFINIBAND
748         if (strcmp(ctdb->transport, "ib") == 0) {
749                 int ctdb_ibw_init(struct ctdb_context *);
750                 ret = ctdb_ibw_init(ctdb);
751         }
752 #endif
753         if (ret != 0) {
754                 DEBUG(DEBUG_ERR,("Failed to initialise transport '%s'\n", ctdb->transport));
755                 return -1;
756         }
757
758         if (ctdb->methods == NULL) {
759                 DEBUG(DEBUG_ALERT,(__location__ " Can not initialize transport. ctdb->methods is NULL\n"));
760                 ctdb_fatal(ctdb, "transport is unavailable. can not initialize.");
761         }
762
763         /* initialise the transport  */
764         if (ctdb->methods->initialise(ctdb) != 0) {
765                 ctdb_fatal(ctdb, "transport failed to initialise");
766         }
767
768         /* attach to any existing persistent databases */
769         if (ctdb_attach_persistent(ctdb) != 0) {
770                 ctdb_fatal(ctdb, "Failed to attach to persistent databases\n");         
771         }
772
773         /* start frozen, then let the first election sort things out */
774         if (ctdb_blocking_freeze(ctdb)) {
775                 ctdb_fatal(ctdb, "Failed to get initial freeze\n");
776         }
777
778         /* now start accepting clients, only can do this once frozen */
779         fde = event_add_fd(ctdb->ev, ctdb, ctdb->daemon.sd, 
780                            EVENT_FD_READ|EVENT_FD_AUTOCLOSE, 
781                            ctdb_accept_client, ctdb);
782
783         /* tell all other nodes we've just started up */
784         ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL,
785                                  0, CTDB_CONTROL_STARTUP, 0,
786                                  CTDB_CTRL_FLAG_NOREPLY,
787                                  tdb_null, NULL, NULL);
788
789         /* release any IPs we hold from previous runs of the daemon */
790         ctdb_release_all_ips(ctdb);
791
792         /* start the transport going */
793         ctdb_start_transport(ctdb);
794
795         /* set up a handler to pick up sigchld */
796         se = event_add_signal(ctdb->ev, ctdb,
797                                      SIGCHLD, 0,
798                                      sig_child_handler,
799                                      ctdb);
800         if (se == NULL) {
801                 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD\n"));
802                 exit(1);
803         }
804
805         if (use_syslog) {
806                 if (start_syslog_daemon(ctdb)) {
807                         DEBUG(DEBUG_CRIT, ("Failed to start syslog daemon\n"));
808                         exit(10);
809                 }
810         }
811
812           
813         /* go into a wait loop to allow other nodes to complete */
814         event_loop_wait(ctdb->ev);
815
816         DEBUG(DEBUG_CRIT,("event_loop_wait() returned. this should not happen\n"));
817         exit(1);
818 }
819
820 /*
821   allocate a packet for use in daemon<->daemon communication
822  */
823 struct ctdb_req_header *_ctdb_transport_allocate(struct ctdb_context *ctdb,
824                                                  TALLOC_CTX *mem_ctx, 
825                                                  enum ctdb_operation operation, 
826                                                  size_t length, size_t slength,
827                                                  const char *type)
828 {
829         int size;
830         struct ctdb_req_header *hdr;
831
832         length = MAX(length, slength);
833         size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
834
835         if (ctdb->methods == NULL) {
836                 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate transport packet for operation %u of length %u. Transport is DOWN.\n",
837                          operation, (unsigned)length));
838                 return NULL;
839         }
840
841         hdr = (struct ctdb_req_header *)ctdb->methods->allocate_pkt(mem_ctx, size);
842         if (hdr == NULL) {
843                 DEBUG(DEBUG_ERR,("Unable to allocate transport packet for operation %u of length %u\n",
844                          operation, (unsigned)length));
845                 return NULL;
846         }
847         talloc_set_name_const(hdr, type);
848         memset(hdr, 0, slength);
849         hdr->length       = length;
850         hdr->operation    = operation;
851         hdr->ctdb_magic   = CTDB_MAGIC;
852         hdr->ctdb_version = CTDB_VERSION;
853         hdr->generation   = ctdb->vnn_map->generation;
854         hdr->srcnode      = ctdb->pnn;
855
856         return hdr;     
857 }
858
859 struct daemon_control_state {
860         struct daemon_control_state *next, *prev;
861         struct ctdb_client *client;
862         struct ctdb_req_control *c;
863         uint32_t reqid;
864         struct ctdb_node *node;
865 };
866
867 /*
868   callback when a control reply comes in
869  */
870 static void daemon_control_callback(struct ctdb_context *ctdb,
871                                     int32_t status, TDB_DATA data, 
872                                     const char *errormsg,
873                                     void *private_data)
874 {
875         struct daemon_control_state *state = talloc_get_type(private_data, 
876                                                              struct daemon_control_state);
877         struct ctdb_client *client = state->client;
878         struct ctdb_reply_control *r;
879         size_t len;
880
881         /* construct a message to send to the client containing the data */
882         len = offsetof(struct ctdb_reply_control, data) + data.dsize;
883         if (errormsg) {
884                 len += strlen(errormsg);
885         }
886         r = ctdbd_allocate_pkt(ctdb, state, CTDB_REPLY_CONTROL, len, 
887                                struct ctdb_reply_control);
888         CTDB_NO_MEMORY_VOID(ctdb, r);
889
890         r->hdr.reqid     = state->reqid;
891         r->status        = status;
892         r->datalen       = data.dsize;
893         r->errorlen = 0;
894         memcpy(&r->data[0], data.dptr, data.dsize);
895         if (errormsg) {
896                 r->errorlen = strlen(errormsg);
897                 memcpy(&r->data[r->datalen], errormsg, r->errorlen);
898         }
899
900         daemon_queue_send(client, &r->hdr);
901
902         talloc_free(state);
903 }
904
905 /*
906   fail all pending controls to a disconnected node
907  */
908 void ctdb_daemon_cancel_controls(struct ctdb_context *ctdb, struct ctdb_node *node)
909 {
910         struct daemon_control_state *state;
911         while ((state = node->pending_controls)) {
912                 DLIST_REMOVE(node->pending_controls, state);
913                 daemon_control_callback(ctdb, (uint32_t)-1, tdb_null, 
914                                         "node is disconnected", state);
915         }
916 }
917
918 /*
919   destroy a daemon_control_state
920  */
921 static int daemon_control_destructor(struct daemon_control_state *state)
922 {
923         if (state->node) {
924                 DLIST_REMOVE(state->node->pending_controls, state);
925         }
926         return 0;
927 }
928
929 /*
930   this is called when the ctdb daemon received a ctdb request control
931   from a local client over the unix domain socket
932  */
933 static void daemon_request_control_from_client(struct ctdb_client *client, 
934                                                struct ctdb_req_control *c)
935 {
936         TDB_DATA data;
937         int res;
938         struct daemon_control_state *state;
939         TALLOC_CTX *tmp_ctx = talloc_new(client);
940
941         if (c->hdr.destnode == CTDB_CURRENT_NODE) {
942                 c->hdr.destnode = client->ctdb->pnn;
943         }
944
945         state = talloc(client, struct daemon_control_state);
946         CTDB_NO_MEMORY_VOID(client->ctdb, state);
947
948         state->client = client;
949         state->c = talloc_steal(state, c);
950         state->reqid = c->hdr.reqid;
951         if (ctdb_validate_pnn(client->ctdb, c->hdr.destnode)) {
952                 state->node = client->ctdb->nodes[c->hdr.destnode];
953                 DLIST_ADD(state->node->pending_controls, state);
954         } else {
955                 state->node = NULL;
956         }
957
958         talloc_set_destructor(state, daemon_control_destructor);
959
960         if (c->flags & CTDB_CTRL_FLAG_NOREPLY) {
961                 talloc_steal(tmp_ctx, state);
962         }
963         
964         data.dptr = &c->data[0];
965         data.dsize = c->datalen;
966         res = ctdb_daemon_send_control(client->ctdb, c->hdr.destnode,
967                                        c->srvid, c->opcode, client->client_id,
968                                        c->flags,
969                                        data, daemon_control_callback,
970                                        state);
971         if (res != 0) {
972                 DEBUG(DEBUG_ERR,(__location__ " Failed to send control to remote node %u\n",
973                          c->hdr.destnode));
974         }
975
976         talloc_free(tmp_ctx);
977 }
978
979 /*
980   register a call function
981 */
982 int ctdb_daemon_set_call(struct ctdb_context *ctdb, uint32_t db_id,
983                          ctdb_fn_t fn, int id)
984 {
985         struct ctdb_registered_call *call;
986         struct ctdb_db_context *ctdb_db;
987
988         ctdb_db = find_ctdb_db(ctdb, db_id);
989         if (ctdb_db == NULL) {
990                 return -1;
991         }
992
993         call = talloc(ctdb_db, struct ctdb_registered_call);
994         call->fn = fn;
995         call->id = id;
996
997         DLIST_ADD(ctdb_db->calls, call);        
998         return 0;
999 }
1000
1001
1002
1003 /*
1004   this local messaging handler is ugly, but is needed to prevent
1005   recursion in ctdb_send_message() when the destination node is the
1006   same as the source node
1007  */
1008 struct ctdb_local_message {
1009         struct ctdb_context *ctdb;
1010         uint64_t srvid;
1011         TDB_DATA data;
1012 };
1013
1014 static void ctdb_local_message_trigger(struct event_context *ev, struct timed_event *te, 
1015                                        struct timeval t, void *private_data)
1016 {
1017         struct ctdb_local_message *m = talloc_get_type(private_data, 
1018                                                        struct ctdb_local_message);
1019         int res;
1020
1021         res = ctdb_dispatch_message(m->ctdb, m->srvid, m->data);
1022         if (res != 0) {
1023                 DEBUG(DEBUG_ERR, (__location__ " Failed to dispatch message for srvid=%llu\n", 
1024                           (unsigned long long)m->srvid));
1025         }
1026         talloc_free(m);
1027 }
1028
1029 static int ctdb_local_message(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data)
1030 {
1031         struct ctdb_local_message *m;
1032         m = talloc(ctdb, struct ctdb_local_message);
1033         CTDB_NO_MEMORY(ctdb, m);
1034
1035         m->ctdb = ctdb;
1036         m->srvid = srvid;
1037         m->data  = data;
1038         m->data.dptr = talloc_memdup(m, m->data.dptr, m->data.dsize);
1039         if (m->data.dptr == NULL) {
1040                 talloc_free(m);
1041                 return -1;
1042         }
1043
1044         /* this needs to be done as an event to prevent recursion */
1045         event_add_timed(ctdb->ev, m, timeval_zero(), ctdb_local_message_trigger, m);
1046         return 0;
1047 }
1048
1049 /*
1050   send a ctdb message
1051 */
1052 int ctdb_daemon_send_message(struct ctdb_context *ctdb, uint32_t pnn,
1053                              uint64_t srvid, TDB_DATA data)
1054 {
1055         struct ctdb_req_message *r;
1056         int len;
1057
1058         if (ctdb->methods == NULL) {
1059                 DEBUG(DEBUG_ERR,(__location__ " Failed to send message. Transport is DOWN\n"));
1060                 return -1;
1061         }
1062
1063         /* see if this is a message to ourselves */
1064         if (pnn == ctdb->pnn) {
1065                 return ctdb_local_message(ctdb, srvid, data);
1066         }
1067
1068         len = offsetof(struct ctdb_req_message, data) + data.dsize;
1069         r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_MESSAGE, len,
1070                                     struct ctdb_req_message);
1071         CTDB_NO_MEMORY(ctdb, r);
1072
1073         r->hdr.destnode  = pnn;
1074         r->srvid         = srvid;
1075         r->datalen       = data.dsize;
1076         memcpy(&r->data[0], data.dptr, data.dsize);
1077
1078         ctdb_queue_packet(ctdb, &r->hdr);
1079
1080         talloc_free(r);
1081         return 0;
1082 }
1083
1084
1085
1086 struct ctdb_client_notify_list {
1087         struct ctdb_client_notify_list *next, *prev;
1088         struct ctdb_context *ctdb;
1089         uint64_t srvid;
1090         TDB_DATA data;
1091 };
1092
1093
1094 static int ctdb_client_notify_destructor(struct ctdb_client_notify_list *nl)
1095 {
1096         int ret;
1097
1098         DEBUG(DEBUG_ERR,("Sending client notify message for srvid:%llu\n", (unsigned long long)nl->srvid));
1099
1100         ret = ctdb_daemon_send_message(nl->ctdb, CTDB_BROADCAST_CONNECTED, (unsigned long long)nl->srvid, nl->data);
1101         if (ret != 0) {
1102                 DEBUG(DEBUG_ERR,("Failed to send client notify message\n"));
1103         }
1104
1105         return 0;
1106 }
1107
1108 int32_t ctdb_control_register_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1109 {
1110         struct ctdb_client_notify_register *notify = (struct ctdb_client_notify_register *)indata.dptr;
1111         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client); 
1112         struct ctdb_client_notify_list *nl;
1113
1114         DEBUG(DEBUG_ERR,("Register srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
1115
1116         if (indata.dsize < offsetof(struct ctdb_client_notify_register, notify_data)) {
1117                 DEBUG(DEBUG_ERR,(__location__ " Too little data in control : %d\n", (int)indata.dsize));
1118                 return -1;
1119         }
1120
1121         if (indata.dsize != (notify->len + offsetof(struct ctdb_client_notify_register, notify_data))) {
1122                 DEBUG(DEBUG_ERR,(__location__ " Wrong amount of data in control. Got %d, expected %d\n", (int)indata.dsize, (int)(notify->len + offsetof(struct ctdb_client_notify_register, notify_data))));
1123                 return -1;
1124         }
1125
1126
1127         if (client == NULL) {
1128                 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1129                 return -1;
1130         }
1131
1132         for(nl=client->notify; nl; nl=nl->next) {
1133                 if (nl->srvid == notify->srvid) {
1134                         break;
1135                 }
1136         }
1137         if (nl != NULL) {
1138                 DEBUG(DEBUG_ERR,(__location__ " Notification for srvid:%llu already exists for this client\n", (unsigned long long)notify->srvid));
1139                 return -1;
1140         }
1141
1142         nl = talloc(client, struct ctdb_client_notify_list);
1143         CTDB_NO_MEMORY(ctdb, nl);
1144         nl->ctdb       = ctdb;
1145         nl->srvid      = notify->srvid;
1146         nl->data.dsize = notify->len;
1147         nl->data.dptr  = talloc_size(nl, nl->data.dsize);
1148         CTDB_NO_MEMORY(ctdb, nl->data.dptr);
1149         memcpy(nl->data.dptr, notify->notify_data, nl->data.dsize);
1150         
1151         DLIST_ADD(client->notify, nl);
1152         talloc_set_destructor(nl, ctdb_client_notify_destructor);
1153
1154         return 0;
1155 }
1156
1157 int32_t ctdb_control_deregister_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1158 {
1159         struct ctdb_client_notify_deregister *notify = (struct ctdb_client_notify_deregister *)indata.dptr;
1160         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client); 
1161         struct ctdb_client_notify_list *nl;
1162
1163         DEBUG(DEBUG_ERR,("Deregister srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
1164
1165         if (client == NULL) {
1166                 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1167                 return -1;
1168         }
1169
1170         for(nl=client->notify; nl; nl=nl->next) {
1171                 if (nl->srvid == notify->srvid) {
1172                         break;
1173                 }
1174         }
1175         if (nl == NULL) {
1176                 DEBUG(DEBUG_ERR,(__location__ " No notification for srvid:%llu found for this client\n", (unsigned long long)notify->srvid));
1177                 return -1;
1178         }
1179
1180         DLIST_REMOVE(client->notify, nl);
1181         talloc_set_destructor(nl, NULL);
1182         talloc_free(nl);
1183
1184         return 0;
1185 }
1186
1187 struct ctdb_client *ctdb_find_client_by_pid(struct ctdb_context *ctdb, pid_t pid)
1188 {
1189         struct ctdb_client_pid_list *client_pid;
1190
1191         for (client_pid = ctdb->client_pids; client_pid; client_pid=client_pid->next) {
1192                 if (client_pid->pid == pid) {
1193                         return client_pid->client;
1194                 }
1195         }
1196         return NULL;
1197 }
1198
1199
1200 /* This control is used by samba when probing if a process (of a samba daemon)
1201    exists on the node.
1202    Samba does this when it needs/wants to check if a subrecord in one of the
1203    databases is still valied, or if it is stale and can be removed.
1204    If the node is in unhealthy or stopped state we just kill of the samba
1205    process holding htis sub-record and return to the calling samba that
1206    the process does not exist.
1207    This allows us to forcefully recall subrecords registered by samba processes
1208    on banned and stopped nodes.
1209 */
1210 int32_t ctdb_control_process_exists(struct ctdb_context *ctdb, pid_t pid)
1211 {
1212         struct ctdb_client *client;
1213
1214         if (ctdb->nodes[ctdb->pnn]->flags & (NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
1215                 client = ctdb_find_client_by_pid(ctdb, pid);
1216                 if (client != NULL) {
1217                         DEBUG(DEBUG_NOTICE,(__location__ " Killing client with pid:%d on banned/stopped node\n", (int)pid));
1218                         talloc_free(client);
1219                 }
1220                 return -1;
1221         }
1222
1223         return kill(pid, 0);
1224 }