f7798671abef93fe84fe786a36e6abf9cc0db522
[vlendec/samba-autobuild/.git] / ctdb / server / ctdb_daemon.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2006
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/tdb_wrap/tdb_wrap.h"
22 #include "tdb.h"
23 #include "lib/util/dlinklist.h"
24 #include "system/network.h"
25 #include "system/filesys.h"
26 #include "system/wait.h"
27 #include "../include/ctdb_version.h"
28 #include "../include/ctdb_client.h"
29 #include "../include/ctdb_private.h"
30 #include "../common/rb_tree.h"
31 #include <sys/socket.h>
32 #include "common/reqid.h"
33 #include "common/system.h"
34
35 struct ctdb_client_pid_list {
36         struct ctdb_client_pid_list *next, *prev;
37         struct ctdb_context *ctdb;
38         pid_t pid;
39         struct ctdb_client *client;
40 };
41
42 const char *ctdbd_pidfile = NULL;
43
44 static void daemon_incoming_packet(void *, struct ctdb_req_header *);
45
46 static void print_exit_message(void)
47 {
48         if (debug_extra != NULL && debug_extra[0] != '\0') {
49                 DEBUG(DEBUG_NOTICE,("CTDB %s shutting down\n", debug_extra));
50         } else {
51                 DEBUG(DEBUG_NOTICE,("CTDB daemon shutting down\n"));
52
53                 /* Wait a second to allow pending log messages to be flushed */
54                 sleep(1);
55         }
56 }
57
58
59
60 static void ctdb_time_tick(struct event_context *ev, struct timed_event *te, 
61                                   struct timeval t, void *private_data)
62 {
63         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
64
65         if (getpid() != ctdb->ctdbd_pid) {
66                 return;
67         }
68
69         event_add_timed(ctdb->ev, ctdb, 
70                         timeval_current_ofs(1, 0), 
71                         ctdb_time_tick, ctdb);
72 }
73
74 /* Used to trigger a dummy event once per second, to make
75  * detection of hangs more reliable.
76  */
77 static void ctdb_start_time_tickd(struct ctdb_context *ctdb)
78 {
79         event_add_timed(ctdb->ev, ctdb, 
80                         timeval_current_ofs(1, 0), 
81                         ctdb_time_tick, ctdb);
82 }
83
84 static void ctdb_start_periodic_events(struct ctdb_context *ctdb)
85 {
86         /* start monitoring for connected/disconnected nodes */
87         ctdb_start_keepalive(ctdb);
88
89         /* start periodic update of tcp tickle lists */
90         ctdb_start_tcp_tickle_update(ctdb);
91
92         /* start listening for recovery daemon pings */
93         ctdb_control_recd_ping(ctdb);
94
95         /* start listening to timer ticks */
96         ctdb_start_time_tickd(ctdb);
97 }
98
99 static void ignore_signal(int signum)
100 {
101         struct sigaction act;
102
103         memset(&act, 0, sizeof(act));
104
105         act.sa_handler = SIG_IGN;
106         sigemptyset(&act.sa_mask);
107         sigaddset(&act.sa_mask, signum);
108         sigaction(signum, &act, NULL);
109 }
110
111
112 /*
113   send a packet to a client
114  */
115 static int daemon_queue_send(struct ctdb_client *client, struct ctdb_req_header *hdr)
116 {
117         CTDB_INCREMENT_STAT(client->ctdb, client_packets_sent);
118         if (hdr->operation == CTDB_REQ_MESSAGE) {
119                 if (ctdb_queue_length(client->queue) > client->ctdb->tunable.max_queue_depth_drop_msg) {
120                         DEBUG(DEBUG_ERR,("CTDB_REQ_MESSAGE queue full - killing client connection.\n"));
121                         talloc_free(client);
122                         return -1;
123                 }
124         }
125         return ctdb_queue_send(client->queue, (uint8_t *)hdr, hdr->length);
126 }
127
128 /*
129   message handler for when we are in daemon mode. This redirects the message
130   to the right client
131  */
132 static void daemon_message_handler(uint64_t srvid, TDB_DATA data,
133                                    void *private_data)
134 {
135         struct ctdb_client *client = talloc_get_type(private_data, struct ctdb_client);
136         struct ctdb_req_message *r;
137         int len;
138
139         /* construct a message to send to the client containing the data */
140         len = offsetof(struct ctdb_req_message, data) + data.dsize;
141         r = ctdbd_allocate_pkt(client->ctdb, client->ctdb, CTDB_REQ_MESSAGE,
142                                len, struct ctdb_req_message);
143         CTDB_NO_MEMORY_VOID(client->ctdb, r);
144
145         talloc_set_name_const(r, "req_message packet");
146
147         r->srvid         = srvid;
148         r->datalen       = data.dsize;
149         memcpy(&r->data[0], data.dptr, data.dsize);
150
151         daemon_queue_send(client, &r->hdr);
152
153         talloc_free(r);
154 }
155
156 /*
157   this is called when the ctdb daemon received a ctdb request to 
158   set the srvid from the client
159  */
160 int daemon_register_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
161 {
162         struct ctdb_client *client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
163         int res;
164         if (client == NULL) {
165                 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_register_message_handler\n"));
166                 return -1;
167         }
168         res = srvid_register(ctdb->srv, client, srvid, daemon_message_handler,
169                              client);
170         if (res != 0) {
171                 DEBUG(DEBUG_ERR,(__location__ " Failed to register handler %llu in daemon\n", 
172                          (unsigned long long)srvid));
173         } else {
174                 DEBUG(DEBUG_INFO,(__location__ " Registered message handler for srvid=%llu\n", 
175                          (unsigned long long)srvid));
176         }
177
178         return res;
179 }
180
181 /*
182   this is called when the ctdb daemon received a ctdb request to 
183   remove a srvid from the client
184  */
185 int daemon_deregister_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
186 {
187         struct ctdb_client *client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
188         if (client == NULL) {
189                 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_deregister_message_handler\n"));
190                 return -1;
191         }
192         return srvid_deregister(ctdb->srv, srvid, client);
193 }
194
195 int daemon_check_srvids(struct ctdb_context *ctdb, TDB_DATA indata,
196                         TDB_DATA *outdata)
197 {
198         uint64_t *ids;
199         int i, num_ids;
200         uint8_t *results;
201
202         if ((indata.dsize % sizeof(uint64_t)) != 0) {
203                 DEBUG(DEBUG_ERR, ("Bad indata in daemon_check_srvids, "
204                                   "size=%d\n", (int)indata.dsize));
205                 return -1;
206         }
207
208         ids = (uint64_t *)indata.dptr;
209         num_ids = indata.dsize / 8;
210
211         results = talloc_zero_array(outdata, uint8_t, (num_ids+7)/8);
212         if (results == NULL) {
213                 DEBUG(DEBUG_ERR, ("talloc failed in daemon_check_srvids\n"));
214                 return -1;
215         }
216         for (i=0; i<num_ids; i++) {
217                 if (srvid_exists(ctdb->srv, ids[i]) == 0) {
218                         results[i/8] |= (1 << (i%8));
219                 }
220         }
221         outdata->dptr = (uint8_t *)results;
222         outdata->dsize = talloc_get_size(results);
223         return 0;
224 }
225
226 /*
227   destroy a ctdb_client
228 */
229 static int ctdb_client_destructor(struct ctdb_client *client)
230 {
231         struct ctdb_db_context *ctdb_db;
232
233         ctdb_takeover_client_destructor_hook(client);
234         reqid_remove(client->ctdb->idr, client->client_id);
235         client->ctdb->num_clients--;
236
237         if (client->num_persistent_updates != 0) {
238                 DEBUG(DEBUG_ERR,(__location__ " Client disconnecting with %u persistent updates in flight. Starting recovery\n", client->num_persistent_updates));
239                 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
240         }
241         ctdb_db = find_ctdb_db(client->ctdb, client->db_id);
242         if (ctdb_db) {
243                 DEBUG(DEBUG_ERR, (__location__ " client exit while transaction "
244                                   "commit active. Forcing recovery.\n"));
245                 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
246
247                 /*
248                  * trans3 transaction state:
249                  *
250                  * The destructor sets the pointer to NULL.
251                  */
252                 talloc_free(ctdb_db->persistent_state);
253         }
254
255         return 0;
256 }
257
258
259 /*
260   this is called when the ctdb daemon received a ctdb request message
261   from a local client over the unix domain socket
262  */
263 static void daemon_request_message_from_client(struct ctdb_client *client, 
264                                                struct ctdb_req_message *c)
265 {
266         TDB_DATA data;
267         int res;
268
269         if (c->hdr.destnode == CTDB_CURRENT_NODE) {
270                 c->hdr.destnode = ctdb_get_pnn(client->ctdb);
271         }
272
273         /* maybe the message is for another client on this node */
274         if (ctdb_get_pnn(client->ctdb)==c->hdr.destnode) {
275                 ctdb_request_message(client->ctdb, (struct ctdb_req_header *)c);
276                 return;
277         }
278
279         /* its for a remote node */
280         data.dptr = &c->data[0];
281         data.dsize = c->datalen;
282         res = ctdb_daemon_send_message(client->ctdb, c->hdr.destnode,
283                                        c->srvid, data);
284         if (res != 0) {
285                 DEBUG(DEBUG_ERR,(__location__ " Failed to send message to remote node %u\n",
286                          c->hdr.destnode));
287         }
288 }
289
290
291 struct daemon_call_state {
292         struct ctdb_client *client;
293         uint32_t reqid;
294         struct ctdb_call *call;
295         struct timeval start_time;
296
297         /* readonly request ? */
298         uint32_t readonly_fetch;
299         uint32_t client_callid;
300 };
301
302 /* 
303    complete a call from a client 
304 */
305 static void daemon_call_from_client_callback(struct ctdb_call_state *state)
306 {
307         struct daemon_call_state *dstate = talloc_get_type(state->async.private_data, 
308                                                            struct daemon_call_state);
309         struct ctdb_reply_call *r;
310         int res;
311         uint32_t length;
312         struct ctdb_client *client = dstate->client;
313         struct ctdb_db_context *ctdb_db = state->ctdb_db;
314
315         talloc_steal(client, dstate);
316         talloc_steal(dstate, dstate->call);
317
318         res = ctdb_daemon_call_recv(state, dstate->call);
319         if (res != 0) {
320                 DEBUG(DEBUG_ERR, (__location__ " ctdbd_call_recv() returned error\n"));
321                 CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
322
323                 CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 1", call_latency, dstate->start_time);
324                 return;
325         }
326
327         length = offsetof(struct ctdb_reply_call, data) + dstate->call->reply_data.dsize;
328         /* If the client asked for readonly FETCH, we remapped this to 
329            FETCH_WITH_HEADER when calling the daemon. So we must
330            strip the extra header off the reply data before passing
331            it back to the client.
332         */
333         if (dstate->readonly_fetch
334         && dstate->client_callid == CTDB_FETCH_FUNC) {
335                 length -= sizeof(struct ctdb_ltdb_header);
336         }
337
338         r = ctdbd_allocate_pkt(client->ctdb, dstate, CTDB_REPLY_CALL, 
339                                length, struct ctdb_reply_call);
340         if (r == NULL) {
341                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate reply_call in ctdb daemon\n"));
342                 CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
343                 CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 2", call_latency, dstate->start_time);
344                 return;
345         }
346         r->hdr.reqid        = dstate->reqid;
347         r->status           = dstate->call->status;
348
349         if (dstate->readonly_fetch
350         && dstate->client_callid == CTDB_FETCH_FUNC) {
351                 /* client only asked for a FETCH so we must strip off
352                    the extra ctdb_ltdb header
353                 */
354                 r->datalen          = dstate->call->reply_data.dsize - sizeof(struct ctdb_ltdb_header);
355                 memcpy(&r->data[0], dstate->call->reply_data.dptr + sizeof(struct ctdb_ltdb_header), r->datalen);
356         } else {
357                 r->datalen          = dstate->call->reply_data.dsize;
358                 memcpy(&r->data[0], dstate->call->reply_data.dptr, r->datalen);
359         }
360
361         res = daemon_queue_send(client, &r->hdr);
362         if (res == -1) {
363                 /* client is dead - return immediately */
364                 return;
365         }
366         if (res != 0) {
367                 DEBUG(DEBUG_ERR, (__location__ " Failed to queue packet from daemon to client\n"));
368         }
369         CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 3", call_latency, dstate->start_time);
370         CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
371         talloc_free(dstate);
372 }
373
374 struct ctdb_daemon_packet_wrap {
375         struct ctdb_context *ctdb;
376         uint32_t client_id;
377 };
378
379 /*
380   a wrapper to catch disconnected clients
381  */
382 static void daemon_incoming_packet_wrap(void *p, struct ctdb_req_header *hdr)
383 {
384         struct ctdb_client *client;
385         struct ctdb_daemon_packet_wrap *w = talloc_get_type(p, 
386                                                             struct ctdb_daemon_packet_wrap);
387         if (w == NULL) {
388                 DEBUG(DEBUG_CRIT,(__location__ " Bad packet type '%s'\n", talloc_get_name(p)));
389                 return;
390         }
391
392         client = reqid_find(w->ctdb->idr, w->client_id, struct ctdb_client);
393         if (client == NULL) {
394                 DEBUG(DEBUG_ERR,(__location__ " Packet for disconnected client %u\n",
395                          w->client_id));
396                 talloc_free(w);
397                 return;
398         }
399         talloc_free(w);
400
401         /* process it */
402         daemon_incoming_packet(client, hdr);    
403 }
404
405 struct ctdb_deferred_fetch_call {
406         struct ctdb_deferred_fetch_call *next, *prev;
407         struct ctdb_req_call *c;
408         struct ctdb_daemon_packet_wrap *w;
409 };
410
411 struct ctdb_deferred_fetch_queue {
412         struct ctdb_deferred_fetch_call *deferred_calls;
413 };
414
415 struct ctdb_deferred_requeue {
416         struct ctdb_deferred_fetch_call *dfc;
417         struct ctdb_client *client;
418 };
419
420 /* called from a timer event and starts reprocessing the deferred call.*/
421 static void reprocess_deferred_call(struct event_context *ev, struct timed_event *te, 
422                                        struct timeval t, void *private_data)
423 {
424         struct ctdb_deferred_requeue *dfr = (struct ctdb_deferred_requeue *)private_data;
425         struct ctdb_client *client = dfr->client;
426
427         talloc_steal(client, dfr->dfc->c);
428         daemon_incoming_packet(client, (struct ctdb_req_header *)dfr->dfc->c);
429         talloc_free(dfr);
430 }
431
432 /* the referral context is destroyed either after a timeout or when the initial
433    fetch-lock has finished.
434    at this stage, immediately start reprocessing the queued up deferred
435    calls so they get reprocessed immediately (and since we are dmaster at
436    this stage, trigger the waiting smbd processes to pick up and aquire the
437    record right away.
438 */
439 static int deferred_fetch_queue_destructor(struct ctdb_deferred_fetch_queue *dfq)
440 {
441
442         /* need to reprocess the packets from the queue explicitely instead of
443            just using a normal destructor since we want, need, to
444            call the clients in the same oder as the requests queued up
445         */
446         while (dfq->deferred_calls != NULL) {
447                 struct ctdb_client *client;
448                 struct ctdb_deferred_fetch_call *dfc = dfq->deferred_calls;
449                 struct ctdb_deferred_requeue *dfr;
450
451                 DLIST_REMOVE(dfq->deferred_calls, dfc);
452
453                 client = reqid_find(dfc->w->ctdb->idr, dfc->w->client_id, struct ctdb_client);
454                 if (client == NULL) {
455                         DEBUG(DEBUG_ERR,(__location__ " Packet for disconnected client %u\n",
456                                  dfc->w->client_id));
457                         continue;
458                 }
459
460                 /* process it by pushing it back onto the eventloop */
461                 dfr = talloc(client, struct ctdb_deferred_requeue);
462                 if (dfr == NULL) {
463                         DEBUG(DEBUG_ERR,("Failed to allocate deferred fetch requeue structure\n"));
464                         continue;
465                 }
466
467                 dfr->dfc    = talloc_steal(dfr, dfc);
468                 dfr->client = client;
469
470                 event_add_timed(dfc->w->ctdb->ev, client, timeval_zero(), reprocess_deferred_call, dfr);
471         }
472
473         return 0;
474 }
475
476 /* insert the new deferral context into the rb tree.
477    there should never be a pre-existing context here, but check for it
478    warn and destroy the previous context if there is already a deferral context
479    for this key.
480 */
481 static void *insert_dfq_callback(void *parm, void *data)
482 {
483         if (data) {
484                 DEBUG(DEBUG_ERR,("Already have DFQ registered. Free old %p and create new %p\n", data, parm));
485                 talloc_free(data);
486         }
487         return parm;
488 }
489
490 /* if the original fetch-lock did not complete within a reasonable time,
491    free the context and context for all deferred requests to cause them to be
492    re-inserted into the event system.
493 */
494 static void dfq_timeout(struct event_context *ev, struct timed_event *te, 
495                                   struct timeval t, void *private_data)
496 {
497         talloc_free(private_data);
498 }
499
500 /* This function is used in the local daemon to register a KEY in a database
501    for being "fetched"
502    While the remote fetch is in-flight, any futher attempts to re-fetch the
503    same record will be deferred until the fetch completes.
504 */
505 static int setup_deferred_fetch_locks(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
506 {
507         uint32_t *k;
508         struct ctdb_deferred_fetch_queue *dfq;
509
510         k = ctdb_key_to_idkey(call, call->key);
511         if (k == NULL) {
512                 DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch\n"));
513                 return -1;
514         }
515
516         dfq  = talloc(call, struct ctdb_deferred_fetch_queue);
517         if (dfq == NULL) {
518                 DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch queue structure\n"));
519                 talloc_free(k);
520                 return -1;
521         }
522         dfq->deferred_calls = NULL;
523
524         trbt_insertarray32_callback(ctdb_db->deferred_fetch, k[0], &k[0], insert_dfq_callback, dfq);
525
526         talloc_set_destructor(dfq, deferred_fetch_queue_destructor);
527
528         /* if the fetch havent completed in 30 seconds, just tear it all down
529            and let it try again as the events are reissued */
530         event_add_timed(ctdb_db->ctdb->ev, dfq, timeval_current_ofs(30, 0), dfq_timeout, dfq);
531
532         talloc_free(k);
533         return 0;
534 }
535
536 /* check if this is a duplicate request to a fetch already in-flight
537    if it is, make this call deferred to be reprocessed later when
538    the in-flight fetch completes.
539 */
540 static int requeue_duplicate_fetch(struct ctdb_db_context *ctdb_db, struct ctdb_client *client, TDB_DATA key, struct ctdb_req_call *c)
541 {
542         uint32_t *k;
543         struct ctdb_deferred_fetch_queue *dfq;
544         struct ctdb_deferred_fetch_call *dfc;
545
546         k = ctdb_key_to_idkey(c, key);
547         if (k == NULL) {
548                 DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch\n"));
549                 return -1;
550         }
551
552         dfq = trbt_lookuparray32(ctdb_db->deferred_fetch, k[0], &k[0]);
553         if (dfq == NULL) {
554                 talloc_free(k);
555                 return -1;
556         }
557
558
559         talloc_free(k);
560
561         dfc = talloc(dfq, struct ctdb_deferred_fetch_call);
562         if (dfc == NULL) {
563                 DEBUG(DEBUG_ERR, ("Failed to allocate deferred fetch call structure\n"));
564                 return -1;
565         }
566
567         dfc->w = talloc(dfc, struct ctdb_daemon_packet_wrap);
568         if (dfc->w == NULL) {
569                 DEBUG(DEBUG_ERR,("Failed to allocate deferred fetch daemon packet wrap structure\n"));
570                 talloc_free(dfc);
571                 return -1;
572         }
573
574         dfc->c = talloc_steal(dfc, c);
575         dfc->w->ctdb = ctdb_db->ctdb;
576         dfc->w->client_id = client->client_id;
577
578         DLIST_ADD_END(dfq->deferred_calls, dfc, NULL);
579
580         return 0;
581 }
582
583
584 /*
585   this is called when the ctdb daemon received a ctdb request call
586   from a local client over the unix domain socket
587  */
588 static void daemon_request_call_from_client(struct ctdb_client *client, 
589                                             struct ctdb_req_call *c)
590 {
591         struct ctdb_call_state *state;
592         struct ctdb_db_context *ctdb_db;
593         struct daemon_call_state *dstate;
594         struct ctdb_call *call;
595         struct ctdb_ltdb_header header;
596         TDB_DATA key, data;
597         int ret;
598         struct ctdb_context *ctdb = client->ctdb;
599         struct ctdb_daemon_packet_wrap *w;
600
601         CTDB_INCREMENT_STAT(ctdb, total_calls);
602         CTDB_INCREMENT_STAT(ctdb, pending_calls);
603
604         ctdb_db = find_ctdb_db(client->ctdb, c->db_id);
605         if (!ctdb_db) {
606                 DEBUG(DEBUG_ERR, (__location__ " Unknown database in request. db_id==0x%08x",
607                           c->db_id));
608                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
609                 return;
610         }
611
612         if (ctdb_db->unhealthy_reason) {
613                 /*
614                  * this is just a warning, as the tdb should be empty anyway,
615                  * and only persistent databases can be unhealthy, which doesn't
616                  * use this code patch
617                  */
618                 DEBUG(DEBUG_WARNING,("warn: db(%s) unhealty in daemon_request_call_from_client(): %s\n",
619                                      ctdb_db->db_name, ctdb_db->unhealthy_reason));
620         }
621
622         key.dptr = c->data;
623         key.dsize = c->keylen;
624
625         w = talloc(ctdb, struct ctdb_daemon_packet_wrap);
626         CTDB_NO_MEMORY_VOID(ctdb, w);   
627
628         w->ctdb = ctdb;
629         w->client_id = client->client_id;
630
631         ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header, 
632                                            (struct ctdb_req_header *)c, &data,
633                                            daemon_incoming_packet_wrap, w, true);
634         if (ret == -2) {
635                 /* will retry later */
636                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
637                 return;
638         }
639
640         talloc_free(w);
641
642         if (ret != 0) {
643                 DEBUG(DEBUG_ERR,(__location__ " Unable to fetch record\n"));
644                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
645                 return;
646         }
647
648
649         /* check if this fetch request is a duplicate for a
650            request we already have in flight. If so defer it until
651            the first request completes.
652         */
653         if (ctdb->tunable.fetch_collapse == 1) {
654                 if (requeue_duplicate_fetch(ctdb_db, client, key, c) == 0) {
655                         ret = ctdb_ltdb_unlock(ctdb_db, key);
656                         if (ret != 0) {
657                                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
658                         }
659                         CTDB_DECREMENT_STAT(ctdb, pending_calls);
660                         return;
661                 }
662         }
663
664         /* Dont do READONLY if we dont have a tracking database */
665         if ((c->flags & CTDB_WANT_READONLY) && !ctdb_db->readonly) {
666                 c->flags &= ~CTDB_WANT_READONLY;
667         }
668
669         if (header.flags & CTDB_REC_RO_REVOKE_COMPLETE) {
670                 header.flags &= ~CTDB_REC_RO_FLAGS;
671                 CTDB_INCREMENT_STAT(ctdb, total_ro_revokes);
672                 CTDB_INCREMENT_DB_STAT(ctdb_db, db_ro_revokes);
673                 if (ctdb_ltdb_store(ctdb_db, key, &header, data) != 0) {
674                         ctdb_fatal(ctdb, "Failed to write header with cleared REVOKE flag");
675                 }
676                 /* and clear out the tracking data */
677                 if (tdb_delete(ctdb_db->rottdb, key) != 0) {
678                         DEBUG(DEBUG_ERR,(__location__ " Failed to clear out trackingdb record\n"));
679                 }
680         }
681
682         /* if we are revoking, we must defer all other calls until the revoke
683          * had completed.
684          */
685         if (header.flags & CTDB_REC_RO_REVOKING_READONLY) {
686                 talloc_free(data.dptr);
687                 ret = ctdb_ltdb_unlock(ctdb_db, key);
688
689                 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, key, (struct ctdb_req_header *)c, daemon_incoming_packet, client) != 0) {
690                         ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
691                 }
692                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
693                 return;
694         }
695
696         if ((header.dmaster == ctdb->pnn)
697         && (!(c->flags & CTDB_WANT_READONLY))
698         && (header.flags & (CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY)) ) {
699                 header.flags   |= CTDB_REC_RO_REVOKING_READONLY;
700                 if (ctdb_ltdb_store(ctdb_db, key, &header, data) != 0) {
701                         ctdb_fatal(ctdb, "Failed to store record with HAVE_DELEGATIONS set");
702                 }
703                 ret = ctdb_ltdb_unlock(ctdb_db, key);
704
705                 if (ctdb_start_revoke_ro_record(ctdb, ctdb_db, key, &header, data) != 0) {
706                         ctdb_fatal(ctdb, "Failed to start record revoke");
707                 }
708                 talloc_free(data.dptr);
709
710                 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, key, (struct ctdb_req_header *)c, daemon_incoming_packet, client) != 0) {
711                         ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
712                 }
713
714                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
715                 return;
716         }               
717
718         dstate = talloc(client, struct daemon_call_state);
719         if (dstate == NULL) {
720                 ret = ctdb_ltdb_unlock(ctdb_db, key);
721                 if (ret != 0) {
722                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
723                 }
724
725                 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate dstate\n"));
726                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
727                 return;
728         }
729         dstate->start_time = timeval_current();
730         dstate->client = client;
731         dstate->reqid  = c->hdr.reqid;
732         talloc_steal(dstate, data.dptr);
733
734         call = dstate->call = talloc_zero(dstate, struct ctdb_call);
735         if (call == NULL) {
736                 ret = ctdb_ltdb_unlock(ctdb_db, key);
737                 if (ret != 0) {
738                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
739                 }
740
741                 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate call\n"));
742                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
743                 CTDB_UPDATE_LATENCY(ctdb, ctdb_db, "call_from_client 1", call_latency, dstate->start_time);
744                 return;
745         }
746
747         dstate->readonly_fetch = 0;
748         call->call_id = c->callid;
749         call->key = key;
750         call->call_data.dptr = c->data + c->keylen;
751         call->call_data.dsize = c->calldatalen;
752         call->flags = c->flags;
753
754         if (c->flags & CTDB_WANT_READONLY) {
755                 /* client wants readonly record, so translate this into a 
756                    fetch with header. remember what the client asked for
757                    so we can remap the reply back to the proper format for
758                    the client in the reply
759                  */
760                 dstate->client_callid = call->call_id;
761                 call->call_id = CTDB_FETCH_WITH_HEADER_FUNC;
762                 dstate->readonly_fetch = 1;
763         }
764
765         if (header.dmaster == ctdb->pnn) {
766                 state = ctdb_call_local_send(ctdb_db, call, &header, &data);
767         } else {
768                 state = ctdb_daemon_call_send_remote(ctdb_db, call, &header);
769                 if (ctdb->tunable.fetch_collapse == 1) {
770                         /* This request triggered a remote fetch-lock.
771                            set up a deferral for this key so any additional
772                            fetch-locks are deferred until the current one
773                            finishes.
774                          */
775                         setup_deferred_fetch_locks(ctdb_db, call);
776                 }
777         }
778
779         ret = ctdb_ltdb_unlock(ctdb_db, key);
780         if (ret != 0) {
781                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
782         }
783
784         if (state == NULL) {
785                 DEBUG(DEBUG_ERR,(__location__ " Unable to setup call send\n"));
786                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
787                 CTDB_UPDATE_LATENCY(ctdb, ctdb_db, "call_from_client 2", call_latency, dstate->start_time);
788                 return;
789         }
790         talloc_steal(state, dstate);
791         talloc_steal(client, state);
792
793         state->async.fn = daemon_call_from_client_callback;
794         state->async.private_data = dstate;
795 }
796
797
798 static void daemon_request_control_from_client(struct ctdb_client *client, 
799                                                struct ctdb_req_control *c);
800
801 /* data contains a packet from the client */
802 static void daemon_incoming_packet(void *p, struct ctdb_req_header *hdr)
803 {
804         struct ctdb_client *client = talloc_get_type(p, struct ctdb_client);
805         TALLOC_CTX *tmp_ctx;
806         struct ctdb_context *ctdb = client->ctdb;
807
808         /* place the packet as a child of a tmp_ctx. We then use
809            talloc_free() below to free it. If any of the calls want
810            to keep it, then they will steal it somewhere else, and the
811            talloc_free() will be a no-op */
812         tmp_ctx = talloc_new(client);
813         talloc_steal(tmp_ctx, hdr);
814
815         if (hdr->ctdb_magic != CTDB_MAGIC) {
816                 ctdb_set_error(client->ctdb, "Non CTDB packet rejected in daemon\n");
817                 goto done;
818         }
819
820         if (hdr->ctdb_version != CTDB_PROTOCOL) {
821                 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
822                 goto done;
823         }
824
825         switch (hdr->operation) {
826         case CTDB_REQ_CALL:
827                 CTDB_INCREMENT_STAT(ctdb, client.req_call);
828                 daemon_request_call_from_client(client, (struct ctdb_req_call *)hdr);
829                 break;
830
831         case CTDB_REQ_MESSAGE:
832                 CTDB_INCREMENT_STAT(ctdb, client.req_message);
833                 daemon_request_message_from_client(client, (struct ctdb_req_message *)hdr);
834                 break;
835
836         case CTDB_REQ_CONTROL:
837                 CTDB_INCREMENT_STAT(ctdb, client.req_control);
838                 daemon_request_control_from_client(client, (struct ctdb_req_control *)hdr);
839                 break;
840
841         default:
842                 DEBUG(DEBUG_CRIT,(__location__ " daemon: unrecognized operation %u\n",
843                          hdr->operation));
844         }
845
846 done:
847         talloc_free(tmp_ctx);
848 }
849
850 /*
851   called when the daemon gets a incoming packet
852  */
853 static void ctdb_daemon_read_cb(uint8_t *data, size_t cnt, void *args)
854 {
855         struct ctdb_client *client = talloc_get_type(args, struct ctdb_client);
856         struct ctdb_req_header *hdr;
857
858         if (cnt == 0) {
859                 talloc_free(client);
860                 return;
861         }
862
863         CTDB_INCREMENT_STAT(client->ctdb, client_packets_recv);
864
865         if (cnt < sizeof(*hdr)) {
866                 ctdb_set_error(client->ctdb, "Bad packet length %u in daemon\n", 
867                                (unsigned)cnt);
868                 return;
869         }
870         hdr = (struct ctdb_req_header *)data;
871         if (cnt != hdr->length) {
872                 ctdb_set_error(client->ctdb, "Bad header length %u expected %u\n in daemon", 
873                                (unsigned)hdr->length, (unsigned)cnt);
874                 return;
875         }
876
877         if (hdr->ctdb_magic != CTDB_MAGIC) {
878                 ctdb_set_error(client->ctdb, "Non CTDB packet rejected\n");
879                 return;
880         }
881
882         if (hdr->ctdb_version != CTDB_PROTOCOL) {
883                 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
884                 return;
885         }
886
887         DEBUG(DEBUG_DEBUG,(__location__ " client request %u of type %u length %u from "
888                  "node %u to %u\n", hdr->reqid, hdr->operation, hdr->length,
889                  hdr->srcnode, hdr->destnode));
890
891         /* it is the responsibility of the incoming packet function to free 'data' */
892         daemon_incoming_packet(client, hdr);
893 }
894
895
896 static int ctdb_clientpid_destructor(struct ctdb_client_pid_list *client_pid)
897 {
898         if (client_pid->ctdb->client_pids != NULL) {
899                 DLIST_REMOVE(client_pid->ctdb->client_pids, client_pid);
900         }
901
902         return 0;
903 }
904
905
906 static void ctdb_accept_client(struct event_context *ev, struct fd_event *fde, 
907                          uint16_t flags, void *private_data)
908 {
909         struct sockaddr_un addr;
910         socklen_t len;
911         int fd;
912         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
913         struct ctdb_client *client;
914         struct ctdb_client_pid_list *client_pid;
915         pid_t peer_pid = 0;
916
917         memset(&addr, 0, sizeof(addr));
918         len = sizeof(addr);
919         fd = accept(ctdb->daemon.sd, (struct sockaddr *)&addr, &len);
920         if (fd == -1) {
921                 return;
922         }
923
924         set_nonblocking(fd);
925         set_close_on_exec(fd);
926
927         DEBUG(DEBUG_DEBUG,(__location__ " Created SOCKET FD:%d to connected child\n", fd));
928
929         client = talloc_zero(ctdb, struct ctdb_client);
930         if (ctdb_get_peer_pid(fd, &peer_pid) == 0) {
931                 DEBUG(DEBUG_INFO,("Connected client with pid:%u\n", (unsigned)peer_pid));
932         }
933
934         client->ctdb = ctdb;
935         client->fd = fd;
936         client->client_id = reqid_new(ctdb->idr, client);
937         client->pid = peer_pid;
938
939         client_pid = talloc(client, struct ctdb_client_pid_list);
940         if (client_pid == NULL) {
941                 DEBUG(DEBUG_ERR,("Failed to allocate client pid structure\n"));
942                 close(fd);
943                 talloc_free(client);
944                 return;
945         }               
946         client_pid->ctdb   = ctdb;
947         client_pid->pid    = peer_pid;
948         client_pid->client = client;
949
950         DLIST_ADD(ctdb->client_pids, client_pid);
951
952         client->queue = ctdb_queue_setup(ctdb, client, fd, CTDB_DS_ALIGNMENT, 
953                                          ctdb_daemon_read_cb, client,
954                                          "client-%u", client->pid);
955
956         talloc_set_destructor(client, ctdb_client_destructor);
957         talloc_set_destructor(client_pid, ctdb_clientpid_destructor);
958         ctdb->num_clients++;
959 }
960
961
962
963 /*
964   create a unix domain socket and bind it
965   return a file descriptor open on the socket 
966 */
967 static int ux_socket_bind(struct ctdb_context *ctdb)
968 {
969         struct sockaddr_un addr;
970
971         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
972         if (ctdb->daemon.sd == -1) {
973                 return -1;
974         }
975
976         memset(&addr, 0, sizeof(addr));
977         addr.sun_family = AF_UNIX;
978         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path)-1);
979
980         /* First check if an old ctdbd might be running */
981         if (connect(ctdb->daemon.sd,
982                     (struct sockaddr *)&addr, sizeof(addr)) == 0) {
983                 DEBUG(DEBUG_CRIT,
984                       ("Something is already listening on ctdb socket '%s'\n",
985                        ctdb->daemon.name));
986                 goto failed;
987         }
988
989         /* Remove any old socket */
990         unlink(ctdb->daemon.name);
991
992         set_close_on_exec(ctdb->daemon.sd);
993         set_nonblocking(ctdb->daemon.sd);
994
995         if (bind(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
996                 DEBUG(DEBUG_CRIT,("Unable to bind on ctdb socket '%s'\n", ctdb->daemon.name));
997                 goto failed;
998         }
999
1000         if (chown(ctdb->daemon.name, geteuid(), getegid()) != 0 ||
1001             chmod(ctdb->daemon.name, 0700) != 0) {
1002                 DEBUG(DEBUG_CRIT,("Unable to secure ctdb socket '%s', ctdb->daemon.name\n", ctdb->daemon.name));
1003                 goto failed;
1004         }
1005
1006
1007         if (listen(ctdb->daemon.sd, 100) != 0) {
1008                 DEBUG(DEBUG_CRIT,("Unable to listen on ctdb socket '%s'\n", ctdb->daemon.name));
1009                 goto failed;
1010         }
1011
1012         return 0;
1013
1014 failed:
1015         close(ctdb->daemon.sd);
1016         ctdb->daemon.sd = -1;
1017         return -1;      
1018 }
1019
1020 static void initialise_node_flags (struct ctdb_context *ctdb)
1021 {
1022         if (ctdb->pnn == -1) {
1023                 ctdb_fatal(ctdb, "PNN is set to -1 (unknown value)");
1024         }
1025
1026         ctdb->nodes[ctdb->pnn]->flags &= ~NODE_FLAGS_DISCONNECTED;
1027
1028         /* do we start out in DISABLED mode? */
1029         if (ctdb->start_as_disabled != 0) {
1030                 DEBUG(DEBUG_NOTICE, ("This node is configured to start in DISABLED state\n"));
1031                 ctdb->nodes[ctdb->pnn]->flags |= NODE_FLAGS_DISABLED;
1032         }
1033         /* do we start out in STOPPED mode? */
1034         if (ctdb->start_as_stopped != 0) {
1035                 DEBUG(DEBUG_NOTICE, ("This node is configured to start in STOPPED state\n"));
1036                 ctdb->nodes[ctdb->pnn]->flags |= NODE_FLAGS_STOPPED;
1037         }
1038 }
1039
1040 static void ctdb_setup_event_callback(struct ctdb_context *ctdb, int status,
1041                                       void *private_data)
1042 {
1043         if (status != 0) {
1044                 ctdb_die(ctdb, "Failed to run setup event");
1045         }
1046         ctdb_run_notification_script(ctdb, "setup");
1047
1048         /* tell all other nodes we've just started up */
1049         ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL,
1050                                  0, CTDB_CONTROL_STARTUP, 0,
1051                                  CTDB_CTRL_FLAG_NOREPLY,
1052                                  tdb_null, NULL, NULL);
1053
1054         /* Start the recovery daemon */
1055         if (ctdb_start_recoverd(ctdb) != 0) {
1056                 DEBUG(DEBUG_ALERT,("Failed to start recovery daemon\n"));
1057                 exit(11);
1058         }
1059
1060         ctdb_start_periodic_events(ctdb);
1061
1062         ctdb_wait_for_first_recovery(ctdb);
1063 }
1064
1065 static struct timeval tevent_before_wait_ts;
1066 static struct timeval tevent_after_wait_ts;
1067
1068 static void ctdb_tevent_trace(enum tevent_trace_point tp,
1069                               void *private_data)
1070 {
1071         struct timeval diff;
1072         struct timeval now;
1073         struct ctdb_context *ctdb =
1074                 talloc_get_type(private_data, struct ctdb_context);
1075
1076         if (getpid() != ctdb->ctdbd_pid) {
1077                 return;
1078         }
1079
1080         now = timeval_current();
1081
1082         switch (tp) {
1083         case TEVENT_TRACE_BEFORE_WAIT:
1084                 if (!timeval_is_zero(&tevent_after_wait_ts)) {
1085                         diff = timeval_until(&tevent_after_wait_ts, &now);
1086                         if (diff.tv_sec > 3) {
1087                                 DEBUG(DEBUG_ERR,
1088                                       ("Handling event took %ld seconds!\n",
1089                                        (long)diff.tv_sec));
1090                         }
1091                 }
1092                 tevent_before_wait_ts = now;
1093                 break;
1094
1095         case TEVENT_TRACE_AFTER_WAIT:
1096                 if (!timeval_is_zero(&tevent_before_wait_ts)) {
1097                         diff = timeval_until(&tevent_before_wait_ts, &now);
1098                         if (diff.tv_sec > 3) {
1099                                 DEBUG(DEBUG_CRIT,
1100                                       ("No event for %ld seconds!\n",
1101                                        (long)diff.tv_sec));
1102                         }
1103                 }
1104                 tevent_after_wait_ts = now;
1105                 break;
1106
1107         default:
1108                 /* Do nothing for future tevent trace points */ ;
1109         }
1110 }
1111
1112 static void ctdb_remove_pidfile(void)
1113 {
1114         /* Only the main ctdbd's PID matches the SID */
1115         if (ctdbd_pidfile != NULL && getsid(0) == getpid()) {
1116                 if (unlink(ctdbd_pidfile) == 0) {
1117                         DEBUG(DEBUG_NOTICE, ("Removed PID file %s\n",
1118                                              ctdbd_pidfile));
1119                 } else {
1120                         DEBUG(DEBUG_WARNING, ("Failed to Remove PID file %s\n",
1121                                               ctdbd_pidfile));
1122                 }
1123         }
1124 }
1125
1126 static void ctdb_create_pidfile(pid_t pid)
1127 {
1128         if (ctdbd_pidfile != NULL) {
1129                 FILE *fp;
1130
1131                 fp = fopen(ctdbd_pidfile, "w");
1132                 if (fp == NULL) {
1133                         DEBUG(DEBUG_ALERT,
1134                               ("Failed to open PID file %s\n", ctdbd_pidfile));
1135                         exit(11);
1136                 }
1137
1138                 fprintf(fp, "%d\n", pid);
1139                 fclose(fp);
1140                 DEBUG(DEBUG_NOTICE, ("Created PID file %s\n", ctdbd_pidfile));
1141                 atexit(ctdb_remove_pidfile);
1142         }
1143 }
1144
1145 static void ctdb_initialise_vnn_map(struct ctdb_context *ctdb)
1146 {
1147         int i, j, count;
1148
1149         /* initialize the vnn mapping table, skipping any deleted nodes */
1150         ctdb->vnn_map = talloc(ctdb, struct ctdb_vnn_map);
1151         CTDB_NO_MEMORY_FATAL(ctdb, ctdb->vnn_map);
1152
1153         count = 0;
1154         for (i = 0; i < ctdb->num_nodes; i++) {
1155                 if ((ctdb->nodes[i]->flags & NODE_FLAGS_DELETED) == 0) {
1156                         count++;
1157                 }
1158         }
1159
1160         ctdb->vnn_map->generation = INVALID_GENERATION;
1161         ctdb->vnn_map->size = count;
1162         ctdb->vnn_map->map = talloc_array(ctdb->vnn_map, uint32_t, ctdb->vnn_map->size);
1163         CTDB_NO_MEMORY_FATAL(ctdb, ctdb->vnn_map->map);
1164
1165         for(i=0, j=0; i < ctdb->vnn_map->size; i++) {
1166                 if (ctdb->nodes[i]->flags & NODE_FLAGS_DELETED) {
1167                         continue;
1168                 }
1169                 ctdb->vnn_map->map[j] = i;
1170                 j++;
1171         }
1172 }
1173
1174 static void ctdb_set_my_pnn(struct ctdb_context *ctdb)
1175 {
1176         int nodeid;
1177
1178         if (ctdb->address == NULL) {
1179                 ctdb_fatal(ctdb,
1180                            "Can not determine PNN - node address is not set\n");
1181         }
1182
1183         nodeid = ctdb_ip_to_nodeid(ctdb, ctdb->address);
1184         if (nodeid == -1) {
1185                 ctdb_fatal(ctdb,
1186                            "Can not determine PNN - node address not found in node list\n");
1187         }
1188
1189         ctdb->pnn = ctdb->nodes[nodeid]->pnn;
1190         DEBUG(DEBUG_NOTICE, ("PNN is %u\n", ctdb->pnn));
1191 }
1192
1193 /*
1194   start the protocol going as a daemon
1195 */
1196 int ctdb_start_daemon(struct ctdb_context *ctdb, bool do_fork)
1197 {
1198         int res, ret = -1;
1199         struct fd_event *fde;
1200
1201         /* create a unix domain stream socket to listen to */
1202         res = ux_socket_bind(ctdb);
1203         if (res!=0) {
1204                 DEBUG(DEBUG_ALERT,("Cannot continue.  Exiting!\n"));
1205                 exit(10);
1206         }
1207
1208         if (do_fork && fork()) {
1209                 return 0;
1210         }
1211
1212         tdb_reopen_all(false);
1213
1214         if (do_fork) {
1215                 if (setsid() == -1) {
1216                         ctdb_die(ctdb, "Failed to setsid()\n");
1217                 }
1218                 close(0);
1219                 if (open("/dev/null", O_RDONLY) != 0) {
1220                         DEBUG(DEBUG_ALERT,(__location__ " Failed to setup stdin on /dev/null\n"));
1221                         exit(11);
1222                 }
1223         }
1224         ignore_signal(SIGPIPE);
1225         ignore_signal(SIGUSR1);
1226
1227         ctdb->ctdbd_pid = getpid();
1228         DEBUG(DEBUG_ERR, ("Starting CTDBD (Version %s) as PID: %u\n",
1229                           CTDB_VERSION_STRING, ctdb->ctdbd_pid));
1230         ctdb_create_pidfile(ctdb->ctdbd_pid);
1231
1232         /* Make sure we log something when the daemon terminates.
1233          * This must be the first exit handler to run (so the last to
1234          * be registered.
1235          */
1236         atexit(print_exit_message);
1237
1238         if (ctdb->do_setsched) {
1239                 /* try to set us up as realtime */
1240                 if (!set_scheduler()) {
1241                         exit(1);
1242                 }
1243                 DEBUG(DEBUG_NOTICE, ("Set real-time scheduler priority\n"));
1244         }
1245
1246         ctdb->ev = event_context_init(NULL);
1247         tevent_loop_allow_nesting(ctdb->ev);
1248         tevent_set_trace_callback(ctdb->ev, ctdb_tevent_trace, ctdb);
1249         ret = ctdb_init_tevent_logging(ctdb);
1250         if (ret != 0) {
1251                 DEBUG(DEBUG_ALERT,("Failed to initialize TEVENT logging\n"));
1252                 exit(1);
1253         }
1254
1255         /* set up a handler to pick up sigchld */
1256         if (ctdb_init_sigchld(ctdb) == NULL) {
1257                 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD\n"));
1258                 exit(1);
1259         }
1260
1261         ctdb_set_child_logging(ctdb);
1262
1263         if (srvid_init(ctdb, &ctdb->srv) != 0) {
1264                 DEBUG(DEBUG_CRIT,("Failed to setup message srvid context\n"));
1265                 exit(1);
1266         }
1267
1268         /* initialize statistics collection */
1269         ctdb_statistics_init(ctdb);
1270
1271         /* force initial recovery for election */
1272         ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
1273
1274         ctdb_set_runstate(ctdb, CTDB_RUNSTATE_INIT);
1275         ret = ctdb_event_script(ctdb, CTDB_EVENT_INIT);
1276         if (ret != 0) {
1277                 ctdb_die(ctdb, "Failed to run init event\n");
1278         }
1279         ctdb_run_notification_script(ctdb, "init");
1280
1281         if (strcmp(ctdb->transport, "tcp") == 0) {
1282                 ret = ctdb_tcp_init(ctdb);
1283         }
1284 #ifdef USE_INFINIBAND
1285         if (strcmp(ctdb->transport, "ib") == 0) {
1286                 ret = ctdb_ibw_init(ctdb);
1287         }
1288 #endif
1289         if (ret != 0) {
1290                 DEBUG(DEBUG_ERR,("Failed to initialise transport '%s'\n", ctdb->transport));
1291                 return -1;
1292         }
1293
1294         if (ctdb->methods == NULL) {
1295                 DEBUG(DEBUG_ALERT,(__location__ " Can not initialize transport. ctdb->methods is NULL\n"));
1296                 ctdb_fatal(ctdb, "transport is unavailable. can not initialize.");
1297         }
1298
1299         /* Initialise the transport.  This sets the node address if it
1300          * was not set via the command-line. */
1301         if (ctdb->methods->initialise(ctdb) != 0) {
1302                 ctdb_fatal(ctdb, "transport failed to initialise");
1303         }
1304
1305         ctdb_set_my_pnn(ctdb);
1306
1307         initialise_node_flags(ctdb);
1308
1309         if (ctdb->public_addresses_file) {
1310                 ret = ctdb_set_public_addresses(ctdb, true);
1311                 if (ret == -1) {
1312                         DEBUG(DEBUG_ALERT,("Unable to setup public address list\n"));
1313                         exit(1);
1314                 }
1315         }
1316
1317         ctdb_initialise_vnn_map(ctdb);
1318
1319         /* attach to existing databases */
1320         if (ctdb_attach_databases(ctdb) != 0) {
1321                 ctdb_fatal(ctdb, "Failed to attach to databases\n");
1322         }
1323
1324         /* start frozen, then let the first election sort things out */
1325         if (!ctdb_blocking_freeze(ctdb)) {
1326                 ctdb_fatal(ctdb, "Failed to get initial freeze\n");
1327         }
1328
1329         /* now start accepting clients, only can do this once frozen */
1330         fde = event_add_fd(ctdb->ev, ctdb, ctdb->daemon.sd, 
1331                            EVENT_FD_READ,
1332                            ctdb_accept_client, ctdb);
1333         if (fde == NULL) {
1334                 ctdb_fatal(ctdb, "Failed to add daemon socket to event loop");
1335         }
1336         tevent_fd_set_auto_close(fde);
1337
1338         /* Start the transport */
1339         if (ctdb->methods->start(ctdb) != 0) {
1340                 DEBUG(DEBUG_ALERT,("transport failed to start!\n"));
1341                 ctdb_fatal(ctdb, "transport failed to start");
1342         }
1343
1344         /* Recovery daemon and timed events are started from the
1345          * callback, only after the setup event completes
1346          * successfully.
1347          */
1348         ctdb_set_runstate(ctdb, CTDB_RUNSTATE_SETUP);
1349         ret = ctdb_event_script_callback(ctdb,
1350                                          ctdb,
1351                                          ctdb_setup_event_callback,
1352                                          ctdb,
1353                                          CTDB_EVENT_SETUP,
1354                                          "%s",
1355                                          "");
1356         if (ret != 0) {
1357                 DEBUG(DEBUG_CRIT,("Failed to set up 'setup' event\n"));
1358                 exit(1);
1359         }
1360
1361         lockdown_memory(ctdb->valgrinding);
1362
1363         /* go into a wait loop to allow other nodes to complete */
1364         event_loop_wait(ctdb->ev);
1365
1366         DEBUG(DEBUG_CRIT,("event_loop_wait() returned. this should not happen\n"));
1367         exit(1);
1368 }
1369
1370 /*
1371   allocate a packet for use in daemon<->daemon communication
1372  */
1373 struct ctdb_req_header *_ctdb_transport_allocate(struct ctdb_context *ctdb,
1374                                                  TALLOC_CTX *mem_ctx, 
1375                                                  enum ctdb_operation operation, 
1376                                                  size_t length, size_t slength,
1377                                                  const char *type)
1378 {
1379         int size;
1380         struct ctdb_req_header *hdr;
1381
1382         length = MAX(length, slength);
1383         size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
1384
1385         if (ctdb->methods == NULL) {
1386                 DEBUG(DEBUG_INFO,(__location__ " Unable to allocate transport packet for operation %u of length %u. Transport is DOWN.\n",
1387                          operation, (unsigned)length));
1388                 return NULL;
1389         }
1390
1391         hdr = (struct ctdb_req_header *)ctdb->methods->allocate_pkt(mem_ctx, size);
1392         if (hdr == NULL) {
1393                 DEBUG(DEBUG_ERR,("Unable to allocate transport packet for operation %u of length %u\n",
1394                          operation, (unsigned)length));
1395                 return NULL;
1396         }
1397         talloc_set_name_const(hdr, type);
1398         memset(hdr, 0, slength);
1399         hdr->length       = length;
1400         hdr->operation    = operation;
1401         hdr->ctdb_magic   = CTDB_MAGIC;
1402         hdr->ctdb_version = CTDB_PROTOCOL;
1403         hdr->generation   = ctdb->vnn_map->generation;
1404         hdr->srcnode      = ctdb->pnn;
1405
1406         return hdr;     
1407 }
1408
1409 struct daemon_control_state {
1410         struct daemon_control_state *next, *prev;
1411         struct ctdb_client *client;
1412         struct ctdb_req_control *c;
1413         uint32_t reqid;
1414         struct ctdb_node *node;
1415 };
1416
1417 /*
1418   callback when a control reply comes in
1419  */
1420 static void daemon_control_callback(struct ctdb_context *ctdb,
1421                                     int32_t status, TDB_DATA data, 
1422                                     const char *errormsg,
1423                                     void *private_data)
1424 {
1425         struct daemon_control_state *state = talloc_get_type(private_data, 
1426                                                              struct daemon_control_state);
1427         struct ctdb_client *client = state->client;
1428         struct ctdb_reply_control *r;
1429         size_t len;
1430         int ret;
1431
1432         /* construct a message to send to the client containing the data */
1433         len = offsetof(struct ctdb_reply_control, data) + data.dsize;
1434         if (errormsg) {
1435                 len += strlen(errormsg);
1436         }
1437         r = ctdbd_allocate_pkt(ctdb, state, CTDB_REPLY_CONTROL, len, 
1438                                struct ctdb_reply_control);
1439         CTDB_NO_MEMORY_VOID(ctdb, r);
1440
1441         r->hdr.reqid     = state->reqid;
1442         r->status        = status;
1443         r->datalen       = data.dsize;
1444         r->errorlen = 0;
1445         memcpy(&r->data[0], data.dptr, data.dsize);
1446         if (errormsg) {
1447                 r->errorlen = strlen(errormsg);
1448                 memcpy(&r->data[r->datalen], errormsg, r->errorlen);
1449         }
1450
1451         ret = daemon_queue_send(client, &r->hdr);
1452         if (ret != -1) {
1453                 talloc_free(state);
1454         }
1455 }
1456
1457 /*
1458   fail all pending controls to a disconnected node
1459  */
1460 void ctdb_daemon_cancel_controls(struct ctdb_context *ctdb, struct ctdb_node *node)
1461 {
1462         struct daemon_control_state *state;
1463         while ((state = node->pending_controls)) {
1464                 DLIST_REMOVE(node->pending_controls, state);
1465                 daemon_control_callback(ctdb, (uint32_t)-1, tdb_null, 
1466                                         "node is disconnected", state);
1467         }
1468 }
1469
1470 /*
1471   destroy a daemon_control_state
1472  */
1473 static int daemon_control_destructor(struct daemon_control_state *state)
1474 {
1475         if (state->node) {
1476                 DLIST_REMOVE(state->node->pending_controls, state);
1477         }
1478         return 0;
1479 }
1480
1481 /*
1482   this is called when the ctdb daemon received a ctdb request control
1483   from a local client over the unix domain socket
1484  */
1485 static void daemon_request_control_from_client(struct ctdb_client *client, 
1486                                                struct ctdb_req_control *c)
1487 {
1488         TDB_DATA data;
1489         int res;
1490         struct daemon_control_state *state;
1491         TALLOC_CTX *tmp_ctx = talloc_new(client);
1492
1493         if (c->hdr.destnode == CTDB_CURRENT_NODE) {
1494                 c->hdr.destnode = client->ctdb->pnn;
1495         }
1496
1497         state = talloc(client, struct daemon_control_state);
1498         CTDB_NO_MEMORY_VOID(client->ctdb, state);
1499
1500         state->client = client;
1501         state->c = talloc_steal(state, c);
1502         state->reqid = c->hdr.reqid;
1503         if (ctdb_validate_pnn(client->ctdb, c->hdr.destnode)) {
1504                 state->node = client->ctdb->nodes[c->hdr.destnode];
1505                 DLIST_ADD(state->node->pending_controls, state);
1506         } else {
1507                 state->node = NULL;
1508         }
1509
1510         talloc_set_destructor(state, daemon_control_destructor);
1511
1512         if (c->flags & CTDB_CTRL_FLAG_NOREPLY) {
1513                 talloc_steal(tmp_ctx, state);
1514         }
1515         
1516         data.dptr = &c->data[0];
1517         data.dsize = c->datalen;
1518         res = ctdb_daemon_send_control(client->ctdb, c->hdr.destnode,
1519                                        c->srvid, c->opcode, client->client_id,
1520                                        c->flags,
1521                                        data, daemon_control_callback,
1522                                        state);
1523         if (res != 0) {
1524                 DEBUG(DEBUG_ERR,(__location__ " Failed to send control to remote node %u\n",
1525                          c->hdr.destnode));
1526         }
1527
1528         talloc_free(tmp_ctx);
1529 }
1530
1531 /*
1532   register a call function
1533 */
1534 int ctdb_daemon_set_call(struct ctdb_context *ctdb, uint32_t db_id,
1535                          ctdb_fn_t fn, int id)
1536 {
1537         struct ctdb_registered_call *call;
1538         struct ctdb_db_context *ctdb_db;
1539
1540         ctdb_db = find_ctdb_db(ctdb, db_id);
1541         if (ctdb_db == NULL) {
1542                 return -1;
1543         }
1544
1545         call = talloc(ctdb_db, struct ctdb_registered_call);
1546         call->fn = fn;
1547         call->id = id;
1548
1549         DLIST_ADD(ctdb_db->calls, call);        
1550         return 0;
1551 }
1552
1553
1554
1555 /*
1556   this local messaging handler is ugly, but is needed to prevent
1557   recursion in ctdb_send_message() when the destination node is the
1558   same as the source node
1559  */
1560 struct ctdb_local_message {
1561         struct ctdb_context *ctdb;
1562         uint64_t srvid;
1563         TDB_DATA data;
1564 };
1565
1566 static void ctdb_local_message_trigger(struct event_context *ev,
1567                                        struct timed_event *te,
1568                                        struct timeval t, void *private_data)
1569 {
1570         struct ctdb_local_message *m = talloc_get_type(
1571                 private_data, struct ctdb_local_message);
1572
1573         srvid_dispatch(m->ctdb->srv, m->srvid, CTDB_SRVID_ALL, m->data);
1574         talloc_free(m);
1575 }
1576
1577 static int ctdb_local_message(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data)
1578 {
1579         struct ctdb_local_message *m;
1580         m = talloc(ctdb, struct ctdb_local_message);
1581         CTDB_NO_MEMORY(ctdb, m);
1582
1583         m->ctdb = ctdb;
1584         m->srvid = srvid;
1585         m->data  = data;
1586         m->data.dptr = talloc_memdup(m, m->data.dptr, m->data.dsize);
1587         if (m->data.dptr == NULL) {
1588                 talloc_free(m);
1589                 return -1;
1590         }
1591
1592         /* this needs to be done as an event to prevent recursion */
1593         event_add_timed(ctdb->ev, m, timeval_zero(), ctdb_local_message_trigger, m);
1594         return 0;
1595 }
1596
1597 /*
1598   send a ctdb message
1599 */
1600 int ctdb_daemon_send_message(struct ctdb_context *ctdb, uint32_t pnn,
1601                              uint64_t srvid, TDB_DATA data)
1602 {
1603         struct ctdb_req_message *r;
1604         int len;
1605
1606         if (ctdb->methods == NULL) {
1607                 DEBUG(DEBUG_INFO,(__location__ " Failed to send message. Transport is DOWN\n"));
1608                 return -1;
1609         }
1610
1611         /* see if this is a message to ourselves */
1612         if (pnn == ctdb->pnn) {
1613                 return ctdb_local_message(ctdb, srvid, data);
1614         }
1615
1616         len = offsetof(struct ctdb_req_message, data) + data.dsize;
1617         r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_MESSAGE, len,
1618                                     struct ctdb_req_message);
1619         CTDB_NO_MEMORY(ctdb, r);
1620
1621         r->hdr.destnode  = pnn;
1622         r->srvid         = srvid;
1623         r->datalen       = data.dsize;
1624         memcpy(&r->data[0], data.dptr, data.dsize);
1625
1626         ctdb_queue_packet(ctdb, &r->hdr);
1627
1628         talloc_free(r);
1629         return 0;
1630 }
1631
1632
1633
1634 struct ctdb_client_notify_list {
1635         struct ctdb_client_notify_list *next, *prev;
1636         struct ctdb_context *ctdb;
1637         uint64_t srvid;
1638         TDB_DATA data;
1639 };
1640
1641
1642 static int ctdb_client_notify_destructor(struct ctdb_client_notify_list *nl)
1643 {
1644         int ret;
1645
1646         DEBUG(DEBUG_ERR,("Sending client notify message for srvid:%llu\n", (unsigned long long)nl->srvid));
1647
1648         ret = ctdb_daemon_send_message(nl->ctdb, CTDB_BROADCAST_CONNECTED, (unsigned long long)nl->srvid, nl->data);
1649         if (ret != 0) {
1650                 DEBUG(DEBUG_ERR,("Failed to send client notify message\n"));
1651         }
1652
1653         return 0;
1654 }
1655
1656 int32_t ctdb_control_register_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1657 {
1658         struct ctdb_client_notify_register *notify = (struct ctdb_client_notify_register *)indata.dptr;
1659         struct ctdb_client *client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
1660         struct ctdb_client_notify_list *nl;
1661
1662         DEBUG(DEBUG_INFO,("Register srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
1663
1664         if (indata.dsize < offsetof(struct ctdb_client_notify_register, notify_data)) {
1665                 DEBUG(DEBUG_ERR,(__location__ " Too little data in control : %d\n", (int)indata.dsize));
1666                 return -1;
1667         }
1668
1669         if (indata.dsize != (notify->len + offsetof(struct ctdb_client_notify_register, notify_data))) {
1670                 DEBUG(DEBUG_ERR,(__location__ " Wrong amount of data in control. Got %d, expected %d\n", (int)indata.dsize, (int)(notify->len + offsetof(struct ctdb_client_notify_register, notify_data))));
1671                 return -1;
1672         }
1673
1674
1675         if (client == NULL) {
1676                 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1677                 return -1;
1678         }
1679
1680         for(nl=client->notify; nl; nl=nl->next) {
1681                 if (nl->srvid == notify->srvid) {
1682                         break;
1683                 }
1684         }
1685         if (nl != NULL) {
1686                 DEBUG(DEBUG_ERR,(__location__ " Notification for srvid:%llu already exists for this client\n", (unsigned long long)notify->srvid));
1687                 return -1;
1688         }
1689
1690         nl = talloc(client, struct ctdb_client_notify_list);
1691         CTDB_NO_MEMORY(ctdb, nl);
1692         nl->ctdb       = ctdb;
1693         nl->srvid      = notify->srvid;
1694         nl->data.dsize = notify->len;
1695         nl->data.dptr  = talloc_size(nl, nl->data.dsize);
1696         CTDB_NO_MEMORY(ctdb, nl->data.dptr);
1697         memcpy(nl->data.dptr, notify->notify_data, nl->data.dsize);
1698         
1699         DLIST_ADD(client->notify, nl);
1700         talloc_set_destructor(nl, ctdb_client_notify_destructor);
1701
1702         return 0;
1703 }
1704
1705 int32_t ctdb_control_deregister_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1706 {
1707         struct ctdb_client_notify_deregister *notify = (struct ctdb_client_notify_deregister *)indata.dptr;
1708         struct ctdb_client *client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
1709         struct ctdb_client_notify_list *nl;
1710
1711         DEBUG(DEBUG_INFO,("Deregister srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
1712
1713         if (client == NULL) {
1714                 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1715                 return -1;
1716         }
1717
1718         for(nl=client->notify; nl; nl=nl->next) {
1719                 if (nl->srvid == notify->srvid) {
1720                         break;
1721                 }
1722         }
1723         if (nl == NULL) {
1724                 DEBUG(DEBUG_ERR,(__location__ " No notification for srvid:%llu found for this client\n", (unsigned long long)notify->srvid));
1725                 return -1;
1726         }
1727
1728         DLIST_REMOVE(client->notify, nl);
1729         talloc_set_destructor(nl, NULL);
1730         talloc_free(nl);
1731
1732         return 0;
1733 }
1734
1735 struct ctdb_client *ctdb_find_client_by_pid(struct ctdb_context *ctdb, pid_t pid)
1736 {
1737         struct ctdb_client_pid_list *client_pid;
1738
1739         for (client_pid = ctdb->client_pids; client_pid; client_pid=client_pid->next) {
1740                 if (client_pid->pid == pid) {
1741                         return client_pid->client;
1742                 }
1743         }
1744         return NULL;
1745 }
1746
1747
1748 /* This control is used by samba when probing if a process (of a samba daemon)
1749    exists on the node.
1750    Samba does this when it needs/wants to check if a subrecord in one of the
1751    databases is still valied, or if it is stale and can be removed.
1752    If the node is in unhealthy or stopped state we just kill of the samba
1753    process holding htis sub-record and return to the calling samba that
1754    the process does not exist.
1755    This allows us to forcefully recall subrecords registered by samba processes
1756    on banned and stopped nodes.
1757 */
1758 int32_t ctdb_control_process_exists(struct ctdb_context *ctdb, pid_t pid)
1759 {
1760         struct ctdb_client *client;
1761
1762         if (ctdb->nodes[ctdb->pnn]->flags & (NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
1763                 client = ctdb_find_client_by_pid(ctdb, pid);
1764                 if (client != NULL) {
1765                         DEBUG(DEBUG_NOTICE,(__location__ " Killing client with pid:%d on banned/stopped node\n", (int)pid));
1766                         talloc_free(client);
1767                 }
1768                 return -1;
1769         }
1770
1771         return kill(pid, 0);
1772 }
1773
1774 int ctdb_control_getnodesfile(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
1775 {
1776         struct ctdb_node_map *node_map = NULL;
1777
1778         CHECK_CONTROL_DATA_SIZE(0);
1779
1780         node_map = ctdb_read_nodes_file(ctdb, ctdb->nodes_file);
1781         if (node_map == NULL) {
1782                 DEBUG(DEBUG_ERR, ("Failed to read nodes file\n"));
1783                 return -1;
1784         }
1785
1786         outdata->dptr  = (unsigned char *)node_map;
1787         outdata->dsize = talloc_get_size(outdata->dptr);
1788
1789         return 0;
1790 }
1791
1792 void ctdb_shutdown_sequence(struct ctdb_context *ctdb, int exit_code)
1793 {
1794         if (ctdb->runstate == CTDB_RUNSTATE_SHUTDOWN) {
1795                 DEBUG(DEBUG_NOTICE,("Already shutting down so will not proceed.\n"));
1796                 return;
1797         }
1798
1799         DEBUG(DEBUG_NOTICE,("Shutdown sequence commencing.\n"));
1800         ctdb_set_runstate(ctdb, CTDB_RUNSTATE_SHUTDOWN);
1801         ctdb_stop_recoverd(ctdb);
1802         ctdb_stop_keepalive(ctdb);
1803         ctdb_stop_monitoring(ctdb);
1804         ctdb_release_all_ips(ctdb);
1805         ctdb_event_script(ctdb, CTDB_EVENT_SHUTDOWN);
1806         if (ctdb->methods != NULL) {
1807                 ctdb->methods->shutdown(ctdb);
1808         }
1809
1810         DEBUG(DEBUG_NOTICE,("Shutdown sequence complete, exiting.\n"));
1811         exit(exit_code);
1812 }