ctdb: remove useless setting of variable domain_socket_name
[obnox/samba/samba-obnox.git] / ctdb / server / ctdb_daemon.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2006
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/tdb_wrap/tdb_wrap.h"
22 #include "tdb.h"
23 #include "lib/util/dlinklist.h"
24 #include "system/network.h"
25 #include "system/filesys.h"
26 #include "system/wait.h"
27 #include "../include/ctdb_version.h"
28 #include "../include/ctdb_client.h"
29 #include "../include/ctdb_private.h"
30 #include "../common/rb_tree.h"
31 #include <sys/socket.h>
32
33 struct ctdb_client_pid_list {
34         struct ctdb_client_pid_list *next, *prev;
35         struct ctdb_context *ctdb;
36         pid_t pid;
37         struct ctdb_client *client;
38 };
39
40 const char *ctdbd_pidfile = NULL;
41
42 static void daemon_incoming_packet(void *, struct ctdb_req_header *);
43
44 static void print_exit_message(void)
45 {
46         if (debug_extra != NULL && debug_extra[0] != '\0') {
47                 DEBUG(DEBUG_NOTICE,("CTDB %s shutting down\n", debug_extra));
48         } else {
49                 DEBUG(DEBUG_NOTICE,("CTDB daemon shutting down\n"));
50
51                 /* Wait a second to allow pending log messages to be flushed */
52                 sleep(1);
53         }
54 }
55
56
57
58 static void ctdb_time_tick(struct event_context *ev, struct timed_event *te, 
59                                   struct timeval t, void *private_data)
60 {
61         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
62
63         if (getpid() != ctdb->ctdbd_pid) {
64                 return;
65         }
66
67         event_add_timed(ctdb->ev, ctdb, 
68                         timeval_current_ofs(1, 0), 
69                         ctdb_time_tick, ctdb);
70 }
71
72 /* Used to trigger a dummy event once per second, to make
73  * detection of hangs more reliable.
74  */
75 static void ctdb_start_time_tickd(struct ctdb_context *ctdb)
76 {
77         event_add_timed(ctdb->ev, ctdb, 
78                         timeval_current_ofs(1, 0), 
79                         ctdb_time_tick, ctdb);
80 }
81
82 static void ctdb_start_periodic_events(struct ctdb_context *ctdb)
83 {
84         /* start monitoring for connected/disconnected nodes */
85         ctdb_start_keepalive(ctdb);
86
87         /* start periodic update of tcp tickle lists */
88         ctdb_start_tcp_tickle_update(ctdb);
89
90         /* start listening for recovery daemon pings */
91         ctdb_control_recd_ping(ctdb);
92
93         /* start listening to timer ticks */
94         ctdb_start_time_tickd(ctdb);
95 }
96
97 static void ignore_signal(int signum)
98 {
99         struct sigaction act;
100
101         memset(&act, 0, sizeof(act));
102
103         act.sa_handler = SIG_IGN;
104         sigemptyset(&act.sa_mask);
105         sigaddset(&act.sa_mask, signum);
106         sigaction(signum, &act, NULL);
107 }
108
109
110 /*
111   send a packet to a client
112  */
113 static int daemon_queue_send(struct ctdb_client *client, struct ctdb_req_header *hdr)
114 {
115         CTDB_INCREMENT_STAT(client->ctdb, client_packets_sent);
116         if (hdr->operation == CTDB_REQ_MESSAGE) {
117                 if (ctdb_queue_length(client->queue) > client->ctdb->tunable.max_queue_depth_drop_msg) {
118                         DEBUG(DEBUG_ERR,("CTDB_REQ_MESSAGE queue full - killing client connection.\n"));
119                         talloc_free(client);
120                         return -1;
121                 }
122         }
123         return ctdb_queue_send(client->queue, (uint8_t *)hdr, hdr->length);
124 }
125
126 /*
127   message handler for when we are in daemon mode. This redirects the message
128   to the right client
129  */
130 static void daemon_message_handler(struct ctdb_context *ctdb, uint64_t srvid, 
131                                     TDB_DATA data, void *private_data)
132 {
133         struct ctdb_client *client = talloc_get_type(private_data, struct ctdb_client);
134         struct ctdb_req_message *r;
135         int len;
136
137         /* construct a message to send to the client containing the data */
138         len = offsetof(struct ctdb_req_message, data) + data.dsize;
139         r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE, 
140                                len, struct ctdb_req_message);
141         CTDB_NO_MEMORY_VOID(ctdb, r);
142
143         talloc_set_name_const(r, "req_message packet");
144
145         r->srvid         = srvid;
146         r->datalen       = data.dsize;
147         memcpy(&r->data[0], data.dptr, data.dsize);
148
149         daemon_queue_send(client, &r->hdr);
150
151         talloc_free(r);
152 }
153
154 /*
155   this is called when the ctdb daemon received a ctdb request to 
156   set the srvid from the client
157  */
158 int daemon_register_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
159 {
160         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
161         int res;
162         if (client == NULL) {
163                 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_register_message_handler\n"));
164                 return -1;
165         }
166         res = ctdb_register_message_handler(ctdb, client, srvid, daemon_message_handler, client);
167         if (res != 0) {
168                 DEBUG(DEBUG_ERR,(__location__ " Failed to register handler %llu in daemon\n", 
169                          (unsigned long long)srvid));
170         } else {
171                 DEBUG(DEBUG_INFO,(__location__ " Registered message handler for srvid=%llu\n", 
172                          (unsigned long long)srvid));
173         }
174
175         return res;
176 }
177
178 /*
179   this is called when the ctdb daemon received a ctdb request to 
180   remove a srvid from the client
181  */
182 int daemon_deregister_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
183 {
184         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
185         if (client == NULL) {
186                 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_deregister_message_handler\n"));
187                 return -1;
188         }
189         return ctdb_deregister_message_handler(ctdb, srvid, client);
190 }
191
192 int daemon_check_srvids(struct ctdb_context *ctdb, TDB_DATA indata,
193                         TDB_DATA *outdata)
194 {
195         uint64_t *ids;
196         int i, num_ids;
197         uint8_t *results;
198
199         if ((indata.dsize % sizeof(uint64_t)) != 0) {
200                 DEBUG(DEBUG_ERR, ("Bad indata in daemon_check_srvids, "
201                                   "size=%d\n", (int)indata.dsize));
202                 return -1;
203         }
204
205         ids = (uint64_t *)indata.dptr;
206         num_ids = indata.dsize / 8;
207
208         results = talloc_zero_array(outdata, uint8_t, (num_ids+7)/8);
209         if (results == NULL) {
210                 DEBUG(DEBUG_ERR, ("talloc failed in daemon_check_srvids\n"));
211                 return -1;
212         }
213         for (i=0; i<num_ids; i++) {
214                 if (ctdb_check_message_handler(ctdb, ids[i])) {
215                         results[i/8] |= (1 << (i%8));
216                 }
217         }
218         outdata->dptr = (uint8_t *)results;
219         outdata->dsize = talloc_get_size(results);
220         return 0;
221 }
222
223 /*
224   destroy a ctdb_client
225 */
226 static int ctdb_client_destructor(struct ctdb_client *client)
227 {
228         struct ctdb_db_context *ctdb_db;
229
230         ctdb_takeover_client_destructor_hook(client);
231         ctdb_reqid_remove(client->ctdb, client->client_id);
232         client->ctdb->num_clients--;
233
234         if (client->num_persistent_updates != 0) {
235                 DEBUG(DEBUG_ERR,(__location__ " Client disconnecting with %u persistent updates in flight. Starting recovery\n", client->num_persistent_updates));
236                 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
237         }
238         ctdb_db = find_ctdb_db(client->ctdb, client->db_id);
239         if (ctdb_db) {
240                 DEBUG(DEBUG_ERR, (__location__ " client exit while transaction "
241                                   "commit active. Forcing recovery.\n"));
242                 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
243
244                 /*
245                  * trans3 transaction state:
246                  *
247                  * The destructor sets the pointer to NULL.
248                  */
249                 talloc_free(ctdb_db->persistent_state);
250         }
251
252         return 0;
253 }
254
255
256 /*
257   this is called when the ctdb daemon received a ctdb request message
258   from a local client over the unix domain socket
259  */
260 static void daemon_request_message_from_client(struct ctdb_client *client, 
261                                                struct ctdb_req_message *c)
262 {
263         TDB_DATA data;
264         int res;
265
266         if (c->hdr.destnode == CTDB_CURRENT_NODE) {
267                 c->hdr.destnode = ctdb_get_pnn(client->ctdb);
268         }
269
270         /* maybe the message is for another client on this node */
271         if (ctdb_get_pnn(client->ctdb)==c->hdr.destnode) {
272                 ctdb_request_message(client->ctdb, (struct ctdb_req_header *)c);
273                 return;
274         }
275
276         /* its for a remote node */
277         data.dptr = &c->data[0];
278         data.dsize = c->datalen;
279         res = ctdb_daemon_send_message(client->ctdb, c->hdr.destnode,
280                                        c->srvid, data);
281         if (res != 0) {
282                 DEBUG(DEBUG_ERR,(__location__ " Failed to send message to remote node %u\n",
283                          c->hdr.destnode));
284         }
285 }
286
287
288 struct daemon_call_state {
289         struct ctdb_client *client;
290         uint32_t reqid;
291         struct ctdb_call *call;
292         struct timeval start_time;
293
294         /* readonly request ? */
295         uint32_t readonly_fetch;
296         uint32_t client_callid;
297 };
298
299 /* 
300    complete a call from a client 
301 */
302 static void daemon_call_from_client_callback(struct ctdb_call_state *state)
303 {
304         struct daemon_call_state *dstate = talloc_get_type(state->async.private_data, 
305                                                            struct daemon_call_state);
306         struct ctdb_reply_call *r;
307         int res;
308         uint32_t length;
309         struct ctdb_client *client = dstate->client;
310         struct ctdb_db_context *ctdb_db = state->ctdb_db;
311
312         talloc_steal(client, dstate);
313         talloc_steal(dstate, dstate->call);
314
315         res = ctdb_daemon_call_recv(state, dstate->call);
316         if (res != 0) {
317                 DEBUG(DEBUG_ERR, (__location__ " ctdbd_call_recv() returned error\n"));
318                 CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
319
320                 CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 1", call_latency, dstate->start_time);
321                 return;
322         }
323
324         length = offsetof(struct ctdb_reply_call, data) + dstate->call->reply_data.dsize;
325         /* If the client asked for readonly FETCH, we remapped this to 
326            FETCH_WITH_HEADER when calling the daemon. So we must
327            strip the extra header off the reply data before passing
328            it back to the client.
329         */
330         if (dstate->readonly_fetch
331         && dstate->client_callid == CTDB_FETCH_FUNC) {
332                 length -= sizeof(struct ctdb_ltdb_header);
333         }
334
335         r = ctdbd_allocate_pkt(client->ctdb, dstate, CTDB_REPLY_CALL, 
336                                length, struct ctdb_reply_call);
337         if (r == NULL) {
338                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate reply_call in ctdb daemon\n"));
339                 CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
340                 CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 2", call_latency, dstate->start_time);
341                 return;
342         }
343         r->hdr.reqid        = dstate->reqid;
344         r->status           = dstate->call->status;
345
346         if (dstate->readonly_fetch
347         && dstate->client_callid == CTDB_FETCH_FUNC) {
348                 /* client only asked for a FETCH so we must strip off
349                    the extra ctdb_ltdb header
350                 */
351                 r->datalen          = dstate->call->reply_data.dsize - sizeof(struct ctdb_ltdb_header);
352                 memcpy(&r->data[0], dstate->call->reply_data.dptr + sizeof(struct ctdb_ltdb_header), r->datalen);
353         } else {
354                 r->datalen          = dstate->call->reply_data.dsize;
355                 memcpy(&r->data[0], dstate->call->reply_data.dptr, r->datalen);
356         }
357
358         res = daemon_queue_send(client, &r->hdr);
359         if (res == -1) {
360                 /* client is dead - return immediately */
361                 return;
362         }
363         if (res != 0) {
364                 DEBUG(DEBUG_ERR, (__location__ " Failed to queue packet from daemon to client\n"));
365         }
366         CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 3", call_latency, dstate->start_time);
367         CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
368         talloc_free(dstate);
369 }
370
371 struct ctdb_daemon_packet_wrap {
372         struct ctdb_context *ctdb;
373         uint32_t client_id;
374 };
375
376 /*
377   a wrapper to catch disconnected clients
378  */
379 static void daemon_incoming_packet_wrap(void *p, struct ctdb_req_header *hdr)
380 {
381         struct ctdb_client *client;
382         struct ctdb_daemon_packet_wrap *w = talloc_get_type(p, 
383                                                             struct ctdb_daemon_packet_wrap);
384         if (w == NULL) {
385                 DEBUG(DEBUG_CRIT,(__location__ " Bad packet type '%s'\n", talloc_get_name(p)));
386                 return;
387         }
388
389         client = ctdb_reqid_find(w->ctdb, w->client_id, struct ctdb_client);
390         if (client == NULL) {
391                 DEBUG(DEBUG_ERR,(__location__ " Packet for disconnected client %u\n",
392                          w->client_id));
393                 talloc_free(w);
394                 return;
395         }
396         talloc_free(w);
397
398         /* process it */
399         daemon_incoming_packet(client, hdr);    
400 }
401
402 struct ctdb_deferred_fetch_call {
403         struct ctdb_deferred_fetch_call *next, *prev;
404         struct ctdb_req_call *c;
405         struct ctdb_daemon_packet_wrap *w;
406 };
407
408 struct ctdb_deferred_fetch_queue {
409         struct ctdb_deferred_fetch_call *deferred_calls;
410 };
411
412 struct ctdb_deferred_requeue {
413         struct ctdb_deferred_fetch_call *dfc;
414         struct ctdb_client *client;
415 };
416
417 /* called from a timer event and starts reprocessing the deferred call.*/
418 static void reprocess_deferred_call(struct event_context *ev, struct timed_event *te, 
419                                        struct timeval t, void *private_data)
420 {
421         struct ctdb_deferred_requeue *dfr = (struct ctdb_deferred_requeue *)private_data;
422         struct ctdb_client *client = dfr->client;
423
424         talloc_steal(client, dfr->dfc->c);
425         daemon_incoming_packet(client, (struct ctdb_req_header *)dfr->dfc->c);
426         talloc_free(dfr);
427 }
428
429 /* the referral context is destroyed either after a timeout or when the initial
430    fetch-lock has finished.
431    at this stage, immediately start reprocessing the queued up deferred
432    calls so they get reprocessed immediately (and since we are dmaster at
433    this stage, trigger the waiting smbd processes to pick up and aquire the
434    record right away.
435 */
436 static int deferred_fetch_queue_destructor(struct ctdb_deferred_fetch_queue *dfq)
437 {
438
439         /* need to reprocess the packets from the queue explicitely instead of
440            just using a normal destructor since we want, need, to
441            call the clients in the same oder as the requests queued up
442         */
443         while (dfq->deferred_calls != NULL) {
444                 struct ctdb_client *client;
445                 struct ctdb_deferred_fetch_call *dfc = dfq->deferred_calls;
446                 struct ctdb_deferred_requeue *dfr;
447
448                 DLIST_REMOVE(dfq->deferred_calls, dfc);
449
450                 client = ctdb_reqid_find(dfc->w->ctdb, dfc->w->client_id, struct ctdb_client);
451                 if (client == NULL) {
452                         DEBUG(DEBUG_ERR,(__location__ " Packet for disconnected client %u\n",
453                                  dfc->w->client_id));
454                         continue;
455                 }
456
457                 /* process it by pushing it back onto the eventloop */
458                 dfr = talloc(client, struct ctdb_deferred_requeue);
459                 if (dfr == NULL) {
460                         DEBUG(DEBUG_ERR,("Failed to allocate deferred fetch requeue structure\n"));
461                         continue;
462                 }
463
464                 dfr->dfc    = talloc_steal(dfr, dfc);
465                 dfr->client = client;
466
467                 event_add_timed(dfc->w->ctdb->ev, client, timeval_zero(), reprocess_deferred_call, dfr);
468         }
469
470         return 0;
471 }
472
473 /* insert the new deferral context into the rb tree.
474    there should never be a pre-existing context here, but check for it
475    warn and destroy the previous context if there is already a deferral context
476    for this key.
477 */
478 static void *insert_dfq_callback(void *parm, void *data)
479 {
480         if (data) {
481                 DEBUG(DEBUG_ERR,("Already have DFQ registered. Free old %p and create new %p\n", data, parm));
482                 talloc_free(data);
483         }
484         return parm;
485 }
486
487 /* if the original fetch-lock did not complete within a reasonable time,
488    free the context and context for all deferred requests to cause them to be
489    re-inserted into the event system.
490 */
491 static void dfq_timeout(struct event_context *ev, struct timed_event *te, 
492                                   struct timeval t, void *private_data)
493 {
494         talloc_free(private_data);
495 }
496
497 /* This function is used in the local daemon to register a KEY in a database
498    for being "fetched"
499    While the remote fetch is in-flight, any futher attempts to re-fetch the
500    same record will be deferred until the fetch completes.
501 */
502 static int setup_deferred_fetch_locks(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
503 {
504         uint32_t *k;
505         struct ctdb_deferred_fetch_queue *dfq;
506
507         k = ctdb_key_to_idkey(call, call->key);
508         if (k == NULL) {
509                 DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch\n"));
510                 return -1;
511         }
512
513         dfq  = talloc(call, struct ctdb_deferred_fetch_queue);
514         if (dfq == NULL) {
515                 DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch queue structure\n"));
516                 talloc_free(k);
517                 return -1;
518         }
519         dfq->deferred_calls = NULL;
520
521         trbt_insertarray32_callback(ctdb_db->deferred_fetch, k[0], &k[0], insert_dfq_callback, dfq);
522
523         talloc_set_destructor(dfq, deferred_fetch_queue_destructor);
524
525         /* if the fetch havent completed in 30 seconds, just tear it all down
526            and let it try again as the events are reissued */
527         event_add_timed(ctdb_db->ctdb->ev, dfq, timeval_current_ofs(30, 0), dfq_timeout, dfq);
528
529         talloc_free(k);
530         return 0;
531 }
532
533 /* check if this is a duplicate request to a fetch already in-flight
534    if it is, make this call deferred to be reprocessed later when
535    the in-flight fetch completes.
536 */
537 static int requeue_duplicate_fetch(struct ctdb_db_context *ctdb_db, struct ctdb_client *client, TDB_DATA key, struct ctdb_req_call *c)
538 {
539         uint32_t *k;
540         struct ctdb_deferred_fetch_queue *dfq;
541         struct ctdb_deferred_fetch_call *dfc;
542
543         k = ctdb_key_to_idkey(c, key);
544         if (k == NULL) {
545                 DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch\n"));
546                 return -1;
547         }
548
549         dfq = trbt_lookuparray32(ctdb_db->deferred_fetch, k[0], &k[0]);
550         if (dfq == NULL) {
551                 talloc_free(k);
552                 return -1;
553         }
554
555
556         talloc_free(k);
557
558         dfc = talloc(dfq, struct ctdb_deferred_fetch_call);
559         if (dfc == NULL) {
560                 DEBUG(DEBUG_ERR, ("Failed to allocate deferred fetch call structure\n"));
561                 return -1;
562         }
563
564         dfc->w = talloc(dfc, struct ctdb_daemon_packet_wrap);
565         if (dfc->w == NULL) {
566                 DEBUG(DEBUG_ERR,("Failed to allocate deferred fetch daemon packet wrap structure\n"));
567                 talloc_free(dfc);
568                 return -1;
569         }
570
571         dfc->c = talloc_steal(dfc, c);
572         dfc->w->ctdb = ctdb_db->ctdb;
573         dfc->w->client_id = client->client_id;
574
575         DLIST_ADD_END(dfq->deferred_calls, dfc, NULL);
576
577         return 0;
578 }
579
580
581 /*
582   this is called when the ctdb daemon received a ctdb request call
583   from a local client over the unix domain socket
584  */
585 static void daemon_request_call_from_client(struct ctdb_client *client, 
586                                             struct ctdb_req_call *c)
587 {
588         struct ctdb_call_state *state;
589         struct ctdb_db_context *ctdb_db;
590         struct daemon_call_state *dstate;
591         struct ctdb_call *call;
592         struct ctdb_ltdb_header header;
593         TDB_DATA key, data;
594         int ret;
595         struct ctdb_context *ctdb = client->ctdb;
596         struct ctdb_daemon_packet_wrap *w;
597
598         CTDB_INCREMENT_STAT(ctdb, total_calls);
599         CTDB_INCREMENT_STAT(ctdb, pending_calls);
600
601         ctdb_db = find_ctdb_db(client->ctdb, c->db_id);
602         if (!ctdb_db) {
603                 DEBUG(DEBUG_ERR, (__location__ " Unknown database in request. db_id==0x%08x",
604                           c->db_id));
605                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
606                 return;
607         }
608
609         if (ctdb_db->unhealthy_reason) {
610                 /*
611                  * this is just a warning, as the tdb should be empty anyway,
612                  * and only persistent databases can be unhealthy, which doesn't
613                  * use this code patch
614                  */
615                 DEBUG(DEBUG_WARNING,("warn: db(%s) unhealty in daemon_request_call_from_client(): %s\n",
616                                      ctdb_db->db_name, ctdb_db->unhealthy_reason));
617         }
618
619         key.dptr = c->data;
620         key.dsize = c->keylen;
621
622         w = talloc(ctdb, struct ctdb_daemon_packet_wrap);
623         CTDB_NO_MEMORY_VOID(ctdb, w);   
624
625         w->ctdb = ctdb;
626         w->client_id = client->client_id;
627
628         ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header, 
629                                            (struct ctdb_req_header *)c, &data,
630                                            daemon_incoming_packet_wrap, w, true);
631         if (ret == -2) {
632                 /* will retry later */
633                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
634                 return;
635         }
636
637         talloc_free(w);
638
639         if (ret != 0) {
640                 DEBUG(DEBUG_ERR,(__location__ " Unable to fetch record\n"));
641                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
642                 return;
643         }
644
645
646         /* check if this fetch request is a duplicate for a
647            request we already have in flight. If so defer it until
648            the first request completes.
649         */
650         if (ctdb->tunable.fetch_collapse == 1) {
651                 if (requeue_duplicate_fetch(ctdb_db, client, key, c) == 0) {
652                         ret = ctdb_ltdb_unlock(ctdb_db, key);
653                         if (ret != 0) {
654                                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
655                         }
656                         CTDB_DECREMENT_STAT(ctdb, pending_calls);
657                         return;
658                 }
659         }
660
661         /* Dont do READONLY if we dont have a tracking database */
662         if ((c->flags & CTDB_WANT_READONLY) && !ctdb_db->readonly) {
663                 c->flags &= ~CTDB_WANT_READONLY;
664         }
665
666         if (header.flags & CTDB_REC_RO_REVOKE_COMPLETE) {
667                 header.flags &= ~CTDB_REC_RO_FLAGS;
668                 CTDB_INCREMENT_STAT(ctdb, total_ro_revokes);
669                 CTDB_INCREMENT_DB_STAT(ctdb_db, db_ro_revokes);
670                 if (ctdb_ltdb_store(ctdb_db, key, &header, data) != 0) {
671                         ctdb_fatal(ctdb, "Failed to write header with cleared REVOKE flag");
672                 }
673                 /* and clear out the tracking data */
674                 if (tdb_delete(ctdb_db->rottdb, key) != 0) {
675                         DEBUG(DEBUG_ERR,(__location__ " Failed to clear out trackingdb record\n"));
676                 }
677         }
678
679         /* if we are revoking, we must defer all other calls until the revoke
680          * had completed.
681          */
682         if (header.flags & CTDB_REC_RO_REVOKING_READONLY) {
683                 talloc_free(data.dptr);
684                 ret = ctdb_ltdb_unlock(ctdb_db, key);
685
686                 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, key, (struct ctdb_req_header *)c, daemon_incoming_packet, client) != 0) {
687                         ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
688                 }
689                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
690                 return;
691         }
692
693         if ((header.dmaster == ctdb->pnn)
694         && (!(c->flags & CTDB_WANT_READONLY))
695         && (header.flags & (CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY)) ) {
696                 header.flags   |= CTDB_REC_RO_REVOKING_READONLY;
697                 if (ctdb_ltdb_store(ctdb_db, key, &header, data) != 0) {
698                         ctdb_fatal(ctdb, "Failed to store record with HAVE_DELEGATIONS set");
699                 }
700                 ret = ctdb_ltdb_unlock(ctdb_db, key);
701
702                 if (ctdb_start_revoke_ro_record(ctdb, ctdb_db, key, &header, data) != 0) {
703                         ctdb_fatal(ctdb, "Failed to start record revoke");
704                 }
705                 talloc_free(data.dptr);
706
707                 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, key, (struct ctdb_req_header *)c, daemon_incoming_packet, client) != 0) {
708                         ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
709                 }
710
711                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
712                 return;
713         }               
714
715         dstate = talloc(client, struct daemon_call_state);
716         if (dstate == NULL) {
717                 ret = ctdb_ltdb_unlock(ctdb_db, key);
718                 if (ret != 0) {
719                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
720                 }
721
722                 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate dstate\n"));
723                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
724                 return;
725         }
726         dstate->start_time = timeval_current();
727         dstate->client = client;
728         dstate->reqid  = c->hdr.reqid;
729         talloc_steal(dstate, data.dptr);
730
731         call = dstate->call = talloc_zero(dstate, struct ctdb_call);
732         if (call == NULL) {
733                 ret = ctdb_ltdb_unlock(ctdb_db, key);
734                 if (ret != 0) {
735                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
736                 }
737
738                 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate call\n"));
739                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
740                 CTDB_UPDATE_LATENCY(ctdb, ctdb_db, "call_from_client 1", call_latency, dstate->start_time);
741                 return;
742         }
743
744         dstate->readonly_fetch = 0;
745         call->call_id = c->callid;
746         call->key = key;
747         call->call_data.dptr = c->data + c->keylen;
748         call->call_data.dsize = c->calldatalen;
749         call->flags = c->flags;
750
751         if (c->flags & CTDB_WANT_READONLY) {
752                 /* client wants readonly record, so translate this into a 
753                    fetch with header. remember what the client asked for
754                    so we can remap the reply back to the proper format for
755                    the client in the reply
756                  */
757                 dstate->client_callid = call->call_id;
758                 call->call_id = CTDB_FETCH_WITH_HEADER_FUNC;
759                 dstate->readonly_fetch = 1;
760         }
761
762         if (header.dmaster == ctdb->pnn) {
763                 state = ctdb_call_local_send(ctdb_db, call, &header, &data);
764         } else {
765                 state = ctdb_daemon_call_send_remote(ctdb_db, call, &header);
766                 if (ctdb->tunable.fetch_collapse == 1) {
767                         /* This request triggered a remote fetch-lock.
768                            set up a deferral for this key so any additional
769                            fetch-locks are deferred until the current one
770                            finishes.
771                          */
772                         setup_deferred_fetch_locks(ctdb_db, call);
773                 }
774         }
775
776         ret = ctdb_ltdb_unlock(ctdb_db, key);
777         if (ret != 0) {
778                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
779         }
780
781         if (state == NULL) {
782                 DEBUG(DEBUG_ERR,(__location__ " Unable to setup call send\n"));
783                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
784                 CTDB_UPDATE_LATENCY(ctdb, ctdb_db, "call_from_client 2", call_latency, dstate->start_time);
785                 return;
786         }
787         talloc_steal(state, dstate);
788         talloc_steal(client, state);
789
790         state->async.fn = daemon_call_from_client_callback;
791         state->async.private_data = dstate;
792 }
793
794
795 static void daemon_request_control_from_client(struct ctdb_client *client, 
796                                                struct ctdb_req_control *c);
797
798 /* data contains a packet from the client */
799 static void daemon_incoming_packet(void *p, struct ctdb_req_header *hdr)
800 {
801         struct ctdb_client *client = talloc_get_type(p, struct ctdb_client);
802         TALLOC_CTX *tmp_ctx;
803         struct ctdb_context *ctdb = client->ctdb;
804
805         /* place the packet as a child of a tmp_ctx. We then use
806            talloc_free() below to free it. If any of the calls want
807            to keep it, then they will steal it somewhere else, and the
808            talloc_free() will be a no-op */
809         tmp_ctx = talloc_new(client);
810         talloc_steal(tmp_ctx, hdr);
811
812         if (hdr->ctdb_magic != CTDB_MAGIC) {
813                 ctdb_set_error(client->ctdb, "Non CTDB packet rejected in daemon\n");
814                 goto done;
815         }
816
817         if (hdr->ctdb_version != CTDB_PROTOCOL) {
818                 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
819                 goto done;
820         }
821
822         switch (hdr->operation) {
823         case CTDB_REQ_CALL:
824                 CTDB_INCREMENT_STAT(ctdb, client.req_call);
825                 daemon_request_call_from_client(client, (struct ctdb_req_call *)hdr);
826                 break;
827
828         case CTDB_REQ_MESSAGE:
829                 CTDB_INCREMENT_STAT(ctdb, client.req_message);
830                 daemon_request_message_from_client(client, (struct ctdb_req_message *)hdr);
831                 break;
832
833         case CTDB_REQ_CONTROL:
834                 CTDB_INCREMENT_STAT(ctdb, client.req_control);
835                 daemon_request_control_from_client(client, (struct ctdb_req_control *)hdr);
836                 break;
837
838         default:
839                 DEBUG(DEBUG_CRIT,(__location__ " daemon: unrecognized operation %u\n",
840                          hdr->operation));
841         }
842
843 done:
844         talloc_free(tmp_ctx);
845 }
846
847 /*
848   called when the daemon gets a incoming packet
849  */
850 static void ctdb_daemon_read_cb(uint8_t *data, size_t cnt, void *args)
851 {
852         struct ctdb_client *client = talloc_get_type(args, struct ctdb_client);
853         struct ctdb_req_header *hdr;
854
855         if (cnt == 0) {
856                 talloc_free(client);
857                 return;
858         }
859
860         CTDB_INCREMENT_STAT(client->ctdb, client_packets_recv);
861
862         if (cnt < sizeof(*hdr)) {
863                 ctdb_set_error(client->ctdb, "Bad packet length %u in daemon\n", 
864                                (unsigned)cnt);
865                 return;
866         }
867         hdr = (struct ctdb_req_header *)data;
868         if (cnt != hdr->length) {
869                 ctdb_set_error(client->ctdb, "Bad header length %u expected %u\n in daemon", 
870                                (unsigned)hdr->length, (unsigned)cnt);
871                 return;
872         }
873
874         if (hdr->ctdb_magic != CTDB_MAGIC) {
875                 ctdb_set_error(client->ctdb, "Non CTDB packet rejected\n");
876                 return;
877         }
878
879         if (hdr->ctdb_version != CTDB_PROTOCOL) {
880                 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
881                 return;
882         }
883
884         DEBUG(DEBUG_DEBUG,(__location__ " client request %u of type %u length %u from "
885                  "node %u to %u\n", hdr->reqid, hdr->operation, hdr->length,
886                  hdr->srcnode, hdr->destnode));
887
888         /* it is the responsibility of the incoming packet function to free 'data' */
889         daemon_incoming_packet(client, hdr);
890 }
891
892
893 static int ctdb_clientpid_destructor(struct ctdb_client_pid_list *client_pid)
894 {
895         if (client_pid->ctdb->client_pids != NULL) {
896                 DLIST_REMOVE(client_pid->ctdb->client_pids, client_pid);
897         }
898
899         return 0;
900 }
901
902
903 static void ctdb_accept_client(struct event_context *ev, struct fd_event *fde, 
904                          uint16_t flags, void *private_data)
905 {
906         struct sockaddr_un addr;
907         socklen_t len;
908         int fd;
909         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
910         struct ctdb_client *client;
911         struct ctdb_client_pid_list *client_pid;
912         pid_t peer_pid = 0;
913
914         memset(&addr, 0, sizeof(addr));
915         len = sizeof(addr);
916         fd = accept(ctdb->daemon.sd, (struct sockaddr *)&addr, &len);
917         if (fd == -1) {
918                 return;
919         }
920
921         set_nonblocking(fd);
922         set_close_on_exec(fd);
923
924         DEBUG(DEBUG_DEBUG,(__location__ " Created SOCKET FD:%d to connected child\n", fd));
925
926         client = talloc_zero(ctdb, struct ctdb_client);
927         if (ctdb_get_peer_pid(fd, &peer_pid) == 0) {
928                 DEBUG(DEBUG_INFO,("Connected client with pid:%u\n", (unsigned)peer_pid));
929         }
930
931         client->ctdb = ctdb;
932         client->fd = fd;
933         client->client_id = ctdb_reqid_new(ctdb, client);
934         client->pid = peer_pid;
935
936         client_pid = talloc(client, struct ctdb_client_pid_list);
937         if (client_pid == NULL) {
938                 DEBUG(DEBUG_ERR,("Failed to allocate client pid structure\n"));
939                 close(fd);
940                 talloc_free(client);
941                 return;
942         }               
943         client_pid->ctdb   = ctdb;
944         client_pid->pid    = peer_pid;
945         client_pid->client = client;
946
947         DLIST_ADD(ctdb->client_pids, client_pid);
948
949         client->queue = ctdb_queue_setup(ctdb, client, fd, CTDB_DS_ALIGNMENT, 
950                                          ctdb_daemon_read_cb, client,
951                                          "client-%u", client->pid);
952
953         talloc_set_destructor(client, ctdb_client_destructor);
954         talloc_set_destructor(client_pid, ctdb_clientpid_destructor);
955         ctdb->num_clients++;
956 }
957
958
959
960 /*
961   create a unix domain socket and bind it
962   return a file descriptor open on the socket 
963 */
964 static int ux_socket_bind(struct ctdb_context *ctdb)
965 {
966         struct sockaddr_un addr;
967
968         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
969         if (ctdb->daemon.sd == -1) {
970                 return -1;
971         }
972
973         memset(&addr, 0, sizeof(addr));
974         addr.sun_family = AF_UNIX;
975         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path)-1);
976
977         /* First check if an old ctdbd might be running */
978         if (connect(ctdb->daemon.sd,
979                     (struct sockaddr *)&addr, sizeof(addr)) == 0) {
980                 DEBUG(DEBUG_CRIT,
981                       ("Something is already listening on ctdb socket '%s'\n",
982                        ctdb->daemon.name));
983                 goto failed;
984         }
985
986         /* Remove any old socket */
987         unlink(ctdb->daemon.name);
988
989         set_close_on_exec(ctdb->daemon.sd);
990         set_nonblocking(ctdb->daemon.sd);
991
992         if (bind(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
993                 DEBUG(DEBUG_CRIT,("Unable to bind on ctdb socket '%s'\n", ctdb->daemon.name));
994                 goto failed;
995         }
996
997         if (chown(ctdb->daemon.name, geteuid(), getegid()) != 0 ||
998             chmod(ctdb->daemon.name, 0700) != 0) {
999                 DEBUG(DEBUG_CRIT,("Unable to secure ctdb socket '%s', ctdb->daemon.name\n", ctdb->daemon.name));
1000                 goto failed;
1001         }
1002
1003
1004         if (listen(ctdb->daemon.sd, 100) != 0) {
1005                 DEBUG(DEBUG_CRIT,("Unable to listen on ctdb socket '%s'\n", ctdb->daemon.name));
1006                 goto failed;
1007         }
1008
1009         return 0;
1010
1011 failed:
1012         close(ctdb->daemon.sd);
1013         ctdb->daemon.sd = -1;
1014         return -1;      
1015 }
1016
1017 static void initialise_node_flags (struct ctdb_context *ctdb)
1018 {
1019         if (ctdb->pnn == -1) {
1020                 ctdb_fatal(ctdb, "PNN is set to -1 (unknown value)");
1021         }
1022
1023         ctdb->nodes[ctdb->pnn]->flags &= ~NODE_FLAGS_DISCONNECTED;
1024
1025         /* do we start out in DISABLED mode? */
1026         if (ctdb->start_as_disabled != 0) {
1027                 DEBUG(DEBUG_INFO, ("This node is configured to start in DISABLED state\n"));
1028                 ctdb->nodes[ctdb->pnn]->flags |= NODE_FLAGS_DISABLED;
1029         }
1030         /* do we start out in STOPPED mode? */
1031         if (ctdb->start_as_stopped != 0) {
1032                 DEBUG(DEBUG_INFO, ("This node is configured to start in STOPPED state\n"));
1033                 ctdb->nodes[ctdb->pnn]->flags |= NODE_FLAGS_STOPPED;
1034         }
1035 }
1036
1037 static void ctdb_setup_event_callback(struct ctdb_context *ctdb, int status,
1038                                       void *private_data)
1039 {
1040         if (status != 0) {
1041                 ctdb_die(ctdb, "Failed to run setup event");
1042         }
1043         ctdb_run_notification_script(ctdb, "setup");
1044
1045         /* tell all other nodes we've just started up */
1046         ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL,
1047                                  0, CTDB_CONTROL_STARTUP, 0,
1048                                  CTDB_CTRL_FLAG_NOREPLY,
1049                                  tdb_null, NULL, NULL);
1050
1051         /* Start the recovery daemon */
1052         if (ctdb_start_recoverd(ctdb) != 0) {
1053                 DEBUG(DEBUG_ALERT,("Failed to start recovery daemon\n"));
1054                 exit(11);
1055         }
1056
1057         ctdb_start_periodic_events(ctdb);
1058
1059         ctdb_wait_for_first_recovery(ctdb);
1060 }
1061
1062 static struct timeval tevent_before_wait_ts;
1063 static struct timeval tevent_after_wait_ts;
1064
1065 static void ctdb_tevent_trace(enum tevent_trace_point tp,
1066                               void *private_data)
1067 {
1068         struct timeval diff;
1069         struct timeval now;
1070         struct ctdb_context *ctdb =
1071                 talloc_get_type(private_data, struct ctdb_context);
1072
1073         if (getpid() != ctdb->ctdbd_pid) {
1074                 return;
1075         }
1076
1077         now = timeval_current();
1078
1079         switch (tp) {
1080         case TEVENT_TRACE_BEFORE_WAIT:
1081                 if (!timeval_is_zero(&tevent_after_wait_ts)) {
1082                         diff = timeval_until(&tevent_after_wait_ts, &now);
1083                         if (diff.tv_sec > 3) {
1084                                 DEBUG(DEBUG_ERR,
1085                                       ("Handling event took %ld seconds!\n",
1086                                        diff.tv_sec));
1087                         }
1088                 }
1089                 tevent_before_wait_ts = now;
1090                 break;
1091
1092         case TEVENT_TRACE_AFTER_WAIT:
1093                 if (!timeval_is_zero(&tevent_before_wait_ts)) {
1094                         diff = timeval_until(&tevent_before_wait_ts, &now);
1095                         if (diff.tv_sec > 3) {
1096                                 DEBUG(DEBUG_CRIT,
1097                                       ("No event for %ld seconds!\n",
1098                                        diff.tv_sec));
1099                         }
1100                 }
1101                 tevent_after_wait_ts = now;
1102                 break;
1103
1104         default:
1105                 /* Do nothing for future tevent trace points */ ;
1106         }
1107 }
1108
1109 static void ctdb_remove_pidfile(void)
1110 {
1111         /* Only the main ctdbd's PID matches the SID */
1112         if (ctdbd_pidfile != NULL && getsid(0) == getpid()) {
1113                 if (unlink(ctdbd_pidfile) == 0) {
1114                         DEBUG(DEBUG_NOTICE, ("Removed PID file %s\n",
1115                                              ctdbd_pidfile));
1116                 } else {
1117                         DEBUG(DEBUG_WARNING, ("Failed to Remove PID file %s\n",
1118                                               ctdbd_pidfile));
1119                 }
1120         }
1121 }
1122
1123 static void ctdb_create_pidfile(pid_t pid)
1124 {
1125         if (ctdbd_pidfile != NULL) {
1126                 FILE *fp;
1127
1128                 fp = fopen(ctdbd_pidfile, "w");
1129                 if (fp == NULL) {
1130                         DEBUG(DEBUG_ALERT,
1131                               ("Failed to open PID file %s\n", ctdbd_pidfile));
1132                         exit(11);
1133                 }
1134
1135                 fprintf(fp, "%d\n", pid);
1136                 fclose(fp);
1137                 DEBUG(DEBUG_NOTICE, ("Created PID file %s\n", ctdbd_pidfile));
1138                 atexit(ctdb_remove_pidfile);
1139         }
1140 }
1141
1142 static void ctdb_initialise_vnn_map(struct ctdb_context *ctdb)
1143 {
1144         int i, j, count;
1145
1146         /* initialize the vnn mapping table, skipping any deleted nodes */
1147         ctdb->vnn_map = talloc(ctdb, struct ctdb_vnn_map);
1148         CTDB_NO_MEMORY_FATAL(ctdb, ctdb->vnn_map);
1149
1150         count = 0;
1151         for (i = 0; i < ctdb->num_nodes; i++) {
1152                 if ((ctdb->nodes[i]->flags & NODE_FLAGS_DELETED) == 0) {
1153                         count++;
1154                 }
1155         }
1156
1157         ctdb->vnn_map->generation = INVALID_GENERATION;
1158         ctdb->vnn_map->size = count;
1159         ctdb->vnn_map->map = talloc_array(ctdb->vnn_map, uint32_t, ctdb->vnn_map->size);
1160         CTDB_NO_MEMORY_FATAL(ctdb, ctdb->vnn_map->map);
1161
1162         for(i=0, j=0; i < ctdb->vnn_map->size; i++) {
1163                 if (ctdb->nodes[i]->flags & NODE_FLAGS_DELETED) {
1164                         continue;
1165                 }
1166                 ctdb->vnn_map->map[j] = i;
1167                 j++;
1168         }
1169 }
1170
1171 static void ctdb_set_my_pnn(struct ctdb_context *ctdb)
1172 {
1173         int nodeid;
1174
1175         if (ctdb->address == NULL) {
1176                 ctdb_fatal(ctdb,
1177                            "Can not determine PNN - node address is not set\n");
1178         }
1179
1180         nodeid = ctdb_ip_to_nodeid(ctdb, ctdb->address);
1181         if (nodeid == -1) {
1182                 ctdb_fatal(ctdb,
1183                            "Can not determine PNN - node address not found in node list\n");
1184         }
1185
1186         ctdb->pnn = ctdb->nodes[nodeid]->pnn;
1187         DEBUG(DEBUG_NOTICE, ("PNN is %u\n", ctdb->pnn));
1188 }
1189
1190 /*
1191   start the protocol going as a daemon
1192 */
1193 int ctdb_start_daemon(struct ctdb_context *ctdb, bool do_fork)
1194 {
1195         int res, ret = -1;
1196         struct fd_event *fde;
1197
1198         /* create a unix domain stream socket to listen to */
1199         res = ux_socket_bind(ctdb);
1200         if (res!=0) {
1201                 DEBUG(DEBUG_ALERT,("Cannot continue.  Exiting!\n"));
1202                 exit(10);
1203         }
1204
1205         if (do_fork && fork()) {
1206                 return 0;
1207         }
1208
1209         tdb_reopen_all(false);
1210
1211         if (do_fork) {
1212                 if (setsid() == -1) {
1213                         ctdb_die(ctdb, "Failed to setsid()\n");
1214                 }
1215                 close(0);
1216                 if (open("/dev/null", O_RDONLY) != 0) {
1217                         DEBUG(DEBUG_ALERT,(__location__ " Failed to setup stdin on /dev/null\n"));
1218                         exit(11);
1219                 }
1220         }
1221         ignore_signal(SIGPIPE);
1222
1223         ctdb->ctdbd_pid = getpid();
1224         DEBUG(DEBUG_ERR, ("Starting CTDBD (Version %s) as PID: %u\n",
1225                           CTDB_VERSION_STRING, ctdb->ctdbd_pid));
1226         ctdb_create_pidfile(ctdb->ctdbd_pid);
1227
1228         /* Make sure we log something when the daemon terminates.
1229          * This must be the first exit handler to run (so the last to
1230          * be registered.
1231          */
1232         atexit(print_exit_message);
1233
1234         if (ctdb->do_setsched) {
1235                 /* try to set us up as realtime */
1236                 if (!set_scheduler()) {
1237                         exit(1);
1238                 }
1239                 DEBUG(DEBUG_NOTICE, ("Set real-time scheduler priority\n"));
1240         }
1241
1242         ctdb->ev = event_context_init(NULL);
1243         tevent_loop_allow_nesting(ctdb->ev);
1244         tevent_set_trace_callback(ctdb->ev, ctdb_tevent_trace, ctdb);
1245         ret = ctdb_init_tevent_logging(ctdb);
1246         if (ret != 0) {
1247                 DEBUG(DEBUG_ALERT,("Failed to initialize TEVENT logging\n"));
1248                 exit(1);
1249         }
1250
1251         /* set up a handler to pick up sigchld */
1252         if (ctdb_init_sigchld(ctdb) == NULL) {
1253                 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD\n"));
1254                 exit(1);
1255         }
1256
1257         ctdb_set_child_logging(ctdb);
1258
1259         /* initialize statistics collection */
1260         ctdb_statistics_init(ctdb);
1261
1262         /* force initial recovery for election */
1263         ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
1264
1265         ctdb_set_runstate(ctdb, CTDB_RUNSTATE_INIT);
1266         ret = ctdb_event_script(ctdb, CTDB_EVENT_INIT);
1267         if (ret != 0) {
1268                 ctdb_die(ctdb, "Failed to run init event\n");
1269         }
1270         ctdb_run_notification_script(ctdb, "init");
1271
1272         if (strcmp(ctdb->transport, "tcp") == 0) {
1273                 ret = ctdb_tcp_init(ctdb);
1274         }
1275 #ifdef USE_INFINIBAND
1276         if (strcmp(ctdb->transport, "ib") == 0) {
1277                 ret = ctdb_ibw_init(ctdb);
1278         }
1279 #endif
1280         if (ret != 0) {
1281                 DEBUG(DEBUG_ERR,("Failed to initialise transport '%s'\n", ctdb->transport));
1282                 return -1;
1283         }
1284
1285         if (ctdb->methods == NULL) {
1286                 DEBUG(DEBUG_ALERT,(__location__ " Can not initialize transport. ctdb->methods is NULL\n"));
1287                 ctdb_fatal(ctdb, "transport is unavailable. can not initialize.");
1288         }
1289
1290         /* Initialise the transport.  This sets the node address if it
1291          * was not set via the command-line. */
1292         if (ctdb->methods->initialise(ctdb) != 0) {
1293                 ctdb_fatal(ctdb, "transport failed to initialise");
1294         }
1295
1296         ctdb_set_my_pnn(ctdb);
1297
1298         initialise_node_flags(ctdb);
1299
1300         if (ctdb->public_addresses_file) {
1301                 ret = ctdb_set_public_addresses(ctdb, true);
1302                 if (ret == -1) {
1303                         DEBUG(DEBUG_ALERT,("Unable to setup public address list\n"));
1304                         exit(1);
1305                 }
1306         }
1307
1308         ctdb_initialise_vnn_map(ctdb);
1309
1310         /* attach to existing databases */
1311         if (ctdb_attach_databases(ctdb) != 0) {
1312                 ctdb_fatal(ctdb, "Failed to attach to databases\n");
1313         }
1314
1315         /* start frozen, then let the first election sort things out */
1316         if (!ctdb_blocking_freeze(ctdb)) {
1317                 ctdb_fatal(ctdb, "Failed to get initial freeze\n");
1318         }
1319
1320         /* now start accepting clients, only can do this once frozen */
1321         fde = event_add_fd(ctdb->ev, ctdb, ctdb->daemon.sd, 
1322                            EVENT_FD_READ,
1323                            ctdb_accept_client, ctdb);
1324         if (fde == NULL) {
1325                 ctdb_fatal(ctdb, "Failed to add daemon socket to event loop");
1326         }
1327         tevent_fd_set_auto_close(fde);
1328
1329         /* Start the transport */
1330         if (ctdb->methods->start(ctdb) != 0) {
1331                 DEBUG(DEBUG_ALERT,("transport failed to start!\n"));
1332                 ctdb_fatal(ctdb, "transport failed to start");
1333         }
1334
1335         /* Recovery daemon and timed events are started from the
1336          * callback, only after the setup event completes
1337          * successfully.
1338          */
1339         ctdb_set_runstate(ctdb, CTDB_RUNSTATE_SETUP);
1340         ret = ctdb_event_script_callback(ctdb,
1341                                          ctdb,
1342                                          ctdb_setup_event_callback,
1343                                          ctdb,
1344                                          CTDB_EVENT_SETUP,
1345                                          "%s",
1346                                          "");
1347         if (ret != 0) {
1348                 DEBUG(DEBUG_CRIT,("Failed to set up 'setup' event\n"));
1349                 exit(1);
1350         }
1351
1352         lockdown_memory(ctdb->valgrinding);
1353
1354         /* go into a wait loop to allow other nodes to complete */
1355         event_loop_wait(ctdb->ev);
1356
1357         DEBUG(DEBUG_CRIT,("event_loop_wait() returned. this should not happen\n"));
1358         exit(1);
1359 }
1360
1361 /*
1362   allocate a packet for use in daemon<->daemon communication
1363  */
1364 struct ctdb_req_header *_ctdb_transport_allocate(struct ctdb_context *ctdb,
1365                                                  TALLOC_CTX *mem_ctx, 
1366                                                  enum ctdb_operation operation, 
1367                                                  size_t length, size_t slength,
1368                                                  const char *type)
1369 {
1370         int size;
1371         struct ctdb_req_header *hdr;
1372
1373         length = MAX(length, slength);
1374         size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
1375
1376         if (ctdb->methods == NULL) {
1377                 DEBUG(DEBUG_INFO,(__location__ " Unable to allocate transport packet for operation %u of length %u. Transport is DOWN.\n",
1378                          operation, (unsigned)length));
1379                 return NULL;
1380         }
1381
1382         hdr = (struct ctdb_req_header *)ctdb->methods->allocate_pkt(mem_ctx, size);
1383         if (hdr == NULL) {
1384                 DEBUG(DEBUG_ERR,("Unable to allocate transport packet for operation %u of length %u\n",
1385                          operation, (unsigned)length));
1386                 return NULL;
1387         }
1388         talloc_set_name_const(hdr, type);
1389         memset(hdr, 0, slength);
1390         hdr->length       = length;
1391         hdr->operation    = operation;
1392         hdr->ctdb_magic   = CTDB_MAGIC;
1393         hdr->ctdb_version = CTDB_PROTOCOL;
1394         hdr->generation   = ctdb->vnn_map->generation;
1395         hdr->srcnode      = ctdb->pnn;
1396
1397         return hdr;     
1398 }
1399
1400 struct daemon_control_state {
1401         struct daemon_control_state *next, *prev;
1402         struct ctdb_client *client;
1403         struct ctdb_req_control *c;
1404         uint32_t reqid;
1405         struct ctdb_node *node;
1406 };
1407
1408 /*
1409   callback when a control reply comes in
1410  */
1411 static void daemon_control_callback(struct ctdb_context *ctdb,
1412                                     int32_t status, TDB_DATA data, 
1413                                     const char *errormsg,
1414                                     void *private_data)
1415 {
1416         struct daemon_control_state *state = talloc_get_type(private_data, 
1417                                                              struct daemon_control_state);
1418         struct ctdb_client *client = state->client;
1419         struct ctdb_reply_control *r;
1420         size_t len;
1421         int ret;
1422
1423         /* construct a message to send to the client containing the data */
1424         len = offsetof(struct ctdb_reply_control, data) + data.dsize;
1425         if (errormsg) {
1426                 len += strlen(errormsg);
1427         }
1428         r = ctdbd_allocate_pkt(ctdb, state, CTDB_REPLY_CONTROL, len, 
1429                                struct ctdb_reply_control);
1430         CTDB_NO_MEMORY_VOID(ctdb, r);
1431
1432         r->hdr.reqid     = state->reqid;
1433         r->status        = status;
1434         r->datalen       = data.dsize;
1435         r->errorlen = 0;
1436         memcpy(&r->data[0], data.dptr, data.dsize);
1437         if (errormsg) {
1438                 r->errorlen = strlen(errormsg);
1439                 memcpy(&r->data[r->datalen], errormsg, r->errorlen);
1440         }
1441
1442         ret = daemon_queue_send(client, &r->hdr);
1443         if (ret != -1) {
1444                 talloc_free(state);
1445         }
1446 }
1447
1448 /*
1449   fail all pending controls to a disconnected node
1450  */
1451 void ctdb_daemon_cancel_controls(struct ctdb_context *ctdb, struct ctdb_node *node)
1452 {
1453         struct daemon_control_state *state;
1454         while ((state = node->pending_controls)) {
1455                 DLIST_REMOVE(node->pending_controls, state);
1456                 daemon_control_callback(ctdb, (uint32_t)-1, tdb_null, 
1457                                         "node is disconnected", state);
1458         }
1459 }
1460
1461 /*
1462   destroy a daemon_control_state
1463  */
1464 static int daemon_control_destructor(struct daemon_control_state *state)
1465 {
1466         if (state->node) {
1467                 DLIST_REMOVE(state->node->pending_controls, state);
1468         }
1469         return 0;
1470 }
1471
1472 /*
1473   this is called when the ctdb daemon received a ctdb request control
1474   from a local client over the unix domain socket
1475  */
1476 static void daemon_request_control_from_client(struct ctdb_client *client, 
1477                                                struct ctdb_req_control *c)
1478 {
1479         TDB_DATA data;
1480         int res;
1481         struct daemon_control_state *state;
1482         TALLOC_CTX *tmp_ctx = talloc_new(client);
1483
1484         if (c->hdr.destnode == CTDB_CURRENT_NODE) {
1485                 c->hdr.destnode = client->ctdb->pnn;
1486         }
1487
1488         state = talloc(client, struct daemon_control_state);
1489         CTDB_NO_MEMORY_VOID(client->ctdb, state);
1490
1491         state->client = client;
1492         state->c = talloc_steal(state, c);
1493         state->reqid = c->hdr.reqid;
1494         if (ctdb_validate_pnn(client->ctdb, c->hdr.destnode)) {
1495                 state->node = client->ctdb->nodes[c->hdr.destnode];
1496                 DLIST_ADD(state->node->pending_controls, state);
1497         } else {
1498                 state->node = NULL;
1499         }
1500
1501         talloc_set_destructor(state, daemon_control_destructor);
1502
1503         if (c->flags & CTDB_CTRL_FLAG_NOREPLY) {
1504                 talloc_steal(tmp_ctx, state);
1505         }
1506         
1507         data.dptr = &c->data[0];
1508         data.dsize = c->datalen;
1509         res = ctdb_daemon_send_control(client->ctdb, c->hdr.destnode,
1510                                        c->srvid, c->opcode, client->client_id,
1511                                        c->flags,
1512                                        data, daemon_control_callback,
1513                                        state);
1514         if (res != 0) {
1515                 DEBUG(DEBUG_ERR,(__location__ " Failed to send control to remote node %u\n",
1516                          c->hdr.destnode));
1517         }
1518
1519         talloc_free(tmp_ctx);
1520 }
1521
1522 /*
1523   register a call function
1524 */
1525 int ctdb_daemon_set_call(struct ctdb_context *ctdb, uint32_t db_id,
1526                          ctdb_fn_t fn, int id)
1527 {
1528         struct ctdb_registered_call *call;
1529         struct ctdb_db_context *ctdb_db;
1530
1531         ctdb_db = find_ctdb_db(ctdb, db_id);
1532         if (ctdb_db == NULL) {
1533                 return -1;
1534         }
1535
1536         call = talloc(ctdb_db, struct ctdb_registered_call);
1537         call->fn = fn;
1538         call->id = id;
1539
1540         DLIST_ADD(ctdb_db->calls, call);        
1541         return 0;
1542 }
1543
1544
1545
1546 /*
1547   this local messaging handler is ugly, but is needed to prevent
1548   recursion in ctdb_send_message() when the destination node is the
1549   same as the source node
1550  */
1551 struct ctdb_local_message {
1552         struct ctdb_context *ctdb;
1553         uint64_t srvid;
1554         TDB_DATA data;
1555 };
1556
1557 static void ctdb_local_message_trigger(struct event_context *ev, struct timed_event *te, 
1558                                        struct timeval t, void *private_data)
1559 {
1560         struct ctdb_local_message *m = talloc_get_type(private_data, 
1561                                                        struct ctdb_local_message);
1562         int res;
1563
1564         res = ctdb_dispatch_message(m->ctdb, m->srvid, m->data);
1565         if (res != 0) {
1566                 DEBUG(DEBUG_ERR, (__location__ " Failed to dispatch message for srvid=%llu\n", 
1567                           (unsigned long long)m->srvid));
1568         }
1569         talloc_free(m);
1570 }
1571
1572 static int ctdb_local_message(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data)
1573 {
1574         struct ctdb_local_message *m;
1575         m = talloc(ctdb, struct ctdb_local_message);
1576         CTDB_NO_MEMORY(ctdb, m);
1577
1578         m->ctdb = ctdb;
1579         m->srvid = srvid;
1580         m->data  = data;
1581         m->data.dptr = talloc_memdup(m, m->data.dptr, m->data.dsize);
1582         if (m->data.dptr == NULL) {
1583                 talloc_free(m);
1584                 return -1;
1585         }
1586
1587         /* this needs to be done as an event to prevent recursion */
1588         event_add_timed(ctdb->ev, m, timeval_zero(), ctdb_local_message_trigger, m);
1589         return 0;
1590 }
1591
1592 /*
1593   send a ctdb message
1594 */
1595 int ctdb_daemon_send_message(struct ctdb_context *ctdb, uint32_t pnn,
1596                              uint64_t srvid, TDB_DATA data)
1597 {
1598         struct ctdb_req_message *r;
1599         int len;
1600
1601         if (ctdb->methods == NULL) {
1602                 DEBUG(DEBUG_INFO,(__location__ " Failed to send message. Transport is DOWN\n"));
1603                 return -1;
1604         }
1605
1606         /* see if this is a message to ourselves */
1607         if (pnn == ctdb->pnn) {
1608                 return ctdb_local_message(ctdb, srvid, data);
1609         }
1610
1611         len = offsetof(struct ctdb_req_message, data) + data.dsize;
1612         r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_MESSAGE, len,
1613                                     struct ctdb_req_message);
1614         CTDB_NO_MEMORY(ctdb, r);
1615
1616         r->hdr.destnode  = pnn;
1617         r->srvid         = srvid;
1618         r->datalen       = data.dsize;
1619         memcpy(&r->data[0], data.dptr, data.dsize);
1620
1621         ctdb_queue_packet(ctdb, &r->hdr);
1622
1623         talloc_free(r);
1624         return 0;
1625 }
1626
1627
1628
1629 struct ctdb_client_notify_list {
1630         struct ctdb_client_notify_list *next, *prev;
1631         struct ctdb_context *ctdb;
1632         uint64_t srvid;
1633         TDB_DATA data;
1634 };
1635
1636
1637 static int ctdb_client_notify_destructor(struct ctdb_client_notify_list *nl)
1638 {
1639         int ret;
1640
1641         DEBUG(DEBUG_ERR,("Sending client notify message for srvid:%llu\n", (unsigned long long)nl->srvid));
1642
1643         ret = ctdb_daemon_send_message(nl->ctdb, CTDB_BROADCAST_CONNECTED, (unsigned long long)nl->srvid, nl->data);
1644         if (ret != 0) {
1645                 DEBUG(DEBUG_ERR,("Failed to send client notify message\n"));
1646         }
1647
1648         return 0;
1649 }
1650
1651 int32_t ctdb_control_register_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1652 {
1653         struct ctdb_client_notify_register *notify = (struct ctdb_client_notify_register *)indata.dptr;
1654         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client); 
1655         struct ctdb_client_notify_list *nl;
1656
1657         DEBUG(DEBUG_INFO,("Register srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
1658
1659         if (indata.dsize < offsetof(struct ctdb_client_notify_register, notify_data)) {
1660                 DEBUG(DEBUG_ERR,(__location__ " Too little data in control : %d\n", (int)indata.dsize));
1661                 return -1;
1662         }
1663
1664         if (indata.dsize != (notify->len + offsetof(struct ctdb_client_notify_register, notify_data))) {
1665                 DEBUG(DEBUG_ERR,(__location__ " Wrong amount of data in control. Got %d, expected %d\n", (int)indata.dsize, (int)(notify->len + offsetof(struct ctdb_client_notify_register, notify_data))));
1666                 return -1;
1667         }
1668
1669
1670         if (client == NULL) {
1671                 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1672                 return -1;
1673         }
1674
1675         for(nl=client->notify; nl; nl=nl->next) {
1676                 if (nl->srvid == notify->srvid) {
1677                         break;
1678                 }
1679         }
1680         if (nl != NULL) {
1681                 DEBUG(DEBUG_ERR,(__location__ " Notification for srvid:%llu already exists for this client\n", (unsigned long long)notify->srvid));
1682                 return -1;
1683         }
1684
1685         nl = talloc(client, struct ctdb_client_notify_list);
1686         CTDB_NO_MEMORY(ctdb, nl);
1687         nl->ctdb       = ctdb;
1688         nl->srvid      = notify->srvid;
1689         nl->data.dsize = notify->len;
1690         nl->data.dptr  = talloc_size(nl, nl->data.dsize);
1691         CTDB_NO_MEMORY(ctdb, nl->data.dptr);
1692         memcpy(nl->data.dptr, notify->notify_data, nl->data.dsize);
1693         
1694         DLIST_ADD(client->notify, nl);
1695         talloc_set_destructor(nl, ctdb_client_notify_destructor);
1696
1697         return 0;
1698 }
1699
1700 int32_t ctdb_control_deregister_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1701 {
1702         struct ctdb_client_notify_deregister *notify = (struct ctdb_client_notify_deregister *)indata.dptr;
1703         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client); 
1704         struct ctdb_client_notify_list *nl;
1705
1706         DEBUG(DEBUG_INFO,("Deregister srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
1707
1708         if (client == NULL) {
1709                 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1710                 return -1;
1711         }
1712
1713         for(nl=client->notify; nl; nl=nl->next) {
1714                 if (nl->srvid == notify->srvid) {
1715                         break;
1716                 }
1717         }
1718         if (nl == NULL) {
1719                 DEBUG(DEBUG_ERR,(__location__ " No notification for srvid:%llu found for this client\n", (unsigned long long)notify->srvid));
1720                 return -1;
1721         }
1722
1723         DLIST_REMOVE(client->notify, nl);
1724         talloc_set_destructor(nl, NULL);
1725         talloc_free(nl);
1726
1727         return 0;
1728 }
1729
1730 struct ctdb_client *ctdb_find_client_by_pid(struct ctdb_context *ctdb, pid_t pid)
1731 {
1732         struct ctdb_client_pid_list *client_pid;
1733
1734         for (client_pid = ctdb->client_pids; client_pid; client_pid=client_pid->next) {
1735                 if (client_pid->pid == pid) {
1736                         return client_pid->client;
1737                 }
1738         }
1739         return NULL;
1740 }
1741
1742
1743 /* This control is used by samba when probing if a process (of a samba daemon)
1744    exists on the node.
1745    Samba does this when it needs/wants to check if a subrecord in one of the
1746    databases is still valied, or if it is stale and can be removed.
1747    If the node is in unhealthy or stopped state we just kill of the samba
1748    process holding htis sub-record and return to the calling samba that
1749    the process does not exist.
1750    This allows us to forcefully recall subrecords registered by samba processes
1751    on banned and stopped nodes.
1752 */
1753 int32_t ctdb_control_process_exists(struct ctdb_context *ctdb, pid_t pid)
1754 {
1755         struct ctdb_client *client;
1756
1757         if (ctdb->nodes[ctdb->pnn]->flags & (NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
1758                 client = ctdb_find_client_by_pid(ctdb, pid);
1759                 if (client != NULL) {
1760                         DEBUG(DEBUG_NOTICE,(__location__ " Killing client with pid:%d on banned/stopped node\n", (int)pid));
1761                         talloc_free(client);
1762                 }
1763                 return -1;
1764         }
1765
1766         return kill(pid, 0);
1767 }
1768
1769 int ctdb_control_getnodesfile(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
1770 {
1771         struct ctdb_node_map *node_map = NULL;
1772
1773         CHECK_CONTROL_DATA_SIZE(0);
1774
1775         node_map = ctdb_read_nodes_file(ctdb, ctdb->nodes_file);
1776         if (node_map == NULL) {
1777                 DEBUG(DEBUG_ERR, ("Failed to read nodes file\n"));
1778                 return -1;
1779         }
1780
1781         outdata->dptr  = (unsigned char *)node_map;
1782         outdata->dsize = talloc_get_size(outdata->dptr);
1783
1784         return 0;
1785 }
1786
1787 void ctdb_shutdown_sequence(struct ctdb_context *ctdb, int exit_code)
1788 {
1789         if (ctdb->runstate == CTDB_RUNSTATE_SHUTDOWN) {
1790                 DEBUG(DEBUG_NOTICE,("Already shutting down so will not proceed.\n"));
1791                 return;
1792         }
1793
1794         DEBUG(DEBUG_NOTICE,("Shutdown sequence commencing.\n"));
1795         ctdb_set_runstate(ctdb, CTDB_RUNSTATE_SHUTDOWN);
1796         ctdb_stop_recoverd(ctdb);
1797         ctdb_stop_keepalive(ctdb);
1798         ctdb_stop_monitoring(ctdb);
1799         ctdb_release_all_ips(ctdb);
1800         ctdb_event_script(ctdb, CTDB_EVENT_SHUTDOWN);
1801         if (ctdb->methods != NULL) {
1802                 ctdb->methods->shutdown(ctdb);
1803         }
1804
1805         DEBUG(DEBUG_NOTICE,("Shutdown sequence complete, exiting.\n"));
1806         exit(exit_code);
1807 }