ctdb-daemon: New control CTDB_CONTROL_GET_NODES_FILE
[obnox/samba/samba-obnox.git] / ctdb / server / ctdb_daemon.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2006
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/tdb_wrap/tdb_wrap.h"
22 #include "tdb.h"
23 #include "lib/util/dlinklist.h"
24 #include "system/network.h"
25 #include "system/filesys.h"
26 #include "system/wait.h"
27 #include "../include/ctdb_version.h"
28 #include "../include/ctdb_client.h"
29 #include "../include/ctdb_private.h"
30 #include "../common/rb_tree.h"
31 #include <sys/socket.h>
32
33 struct ctdb_client_pid_list {
34         struct ctdb_client_pid_list *next, *prev;
35         struct ctdb_context *ctdb;
36         pid_t pid;
37         struct ctdb_client *client;
38 };
39
40 const char *ctdbd_pidfile = NULL;
41
42 static void daemon_incoming_packet(void *, struct ctdb_req_header *);
43
44 static void print_exit_message(void)
45 {
46         if (debug_extra != NULL && debug_extra[0] != '\0') {
47                 DEBUG(DEBUG_NOTICE,("CTDB %s shutting down\n", debug_extra));
48         } else {
49                 DEBUG(DEBUG_NOTICE,("CTDB daemon shutting down\n"));
50
51                 /* Wait a second to allow pending log messages to be flushed */
52                 sleep(1);
53         }
54 }
55
56
57
58 static void ctdb_time_tick(struct event_context *ev, struct timed_event *te, 
59                                   struct timeval t, void *private_data)
60 {
61         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
62
63         if (getpid() != ctdb->ctdbd_pid) {
64                 return;
65         }
66
67         event_add_timed(ctdb->ev, ctdb, 
68                         timeval_current_ofs(1, 0), 
69                         ctdb_time_tick, ctdb);
70 }
71
72 /* Used to trigger a dummy event once per second, to make
73  * detection of hangs more reliable.
74  */
75 static void ctdb_start_time_tickd(struct ctdb_context *ctdb)
76 {
77         event_add_timed(ctdb->ev, ctdb, 
78                         timeval_current_ofs(1, 0), 
79                         ctdb_time_tick, ctdb);
80 }
81
82 static void ctdb_start_periodic_events(struct ctdb_context *ctdb)
83 {
84         /* start monitoring for connected/disconnected nodes */
85         ctdb_start_keepalive(ctdb);
86
87         /* start periodic update of tcp tickle lists */
88         ctdb_start_tcp_tickle_update(ctdb);
89
90         /* start listening for recovery daemon pings */
91         ctdb_control_recd_ping(ctdb);
92
93         /* start listening to timer ticks */
94         ctdb_start_time_tickd(ctdb);
95 }
96
97 static void ignore_signal(int signum)
98 {
99         struct sigaction act;
100
101         memset(&act, 0, sizeof(act));
102
103         act.sa_handler = SIG_IGN;
104         sigemptyset(&act.sa_mask);
105         sigaddset(&act.sa_mask, signum);
106         sigaction(signum, &act, NULL);
107 }
108
109
110 /*
111   send a packet to a client
112  */
113 static int daemon_queue_send(struct ctdb_client *client, struct ctdb_req_header *hdr)
114 {
115         CTDB_INCREMENT_STAT(client->ctdb, client_packets_sent);
116         if (hdr->operation == CTDB_REQ_MESSAGE) {
117                 if (ctdb_queue_length(client->queue) > client->ctdb->tunable.max_queue_depth_drop_msg) {
118                         DEBUG(DEBUG_ERR,("CTDB_REQ_MESSAGE queue full - killing client connection.\n"));
119                         talloc_free(client);
120                         return -1;
121                 }
122         }
123         return ctdb_queue_send(client->queue, (uint8_t *)hdr, hdr->length);
124 }
125
126 /*
127   message handler for when we are in daemon mode. This redirects the message
128   to the right client
129  */
130 static void daemon_message_handler(struct ctdb_context *ctdb, uint64_t srvid, 
131                                     TDB_DATA data, void *private_data)
132 {
133         struct ctdb_client *client = talloc_get_type(private_data, struct ctdb_client);
134         struct ctdb_req_message *r;
135         int len;
136
137         /* construct a message to send to the client containing the data */
138         len = offsetof(struct ctdb_req_message, data) + data.dsize;
139         r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE, 
140                                len, struct ctdb_req_message);
141         CTDB_NO_MEMORY_VOID(ctdb, r);
142
143         talloc_set_name_const(r, "req_message packet");
144
145         r->srvid         = srvid;
146         r->datalen       = data.dsize;
147         memcpy(&r->data[0], data.dptr, data.dsize);
148
149         daemon_queue_send(client, &r->hdr);
150
151         talloc_free(r);
152 }
153
154 /*
155   this is called when the ctdb daemon received a ctdb request to 
156   set the srvid from the client
157  */
158 int daemon_register_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
159 {
160         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
161         int res;
162         if (client == NULL) {
163                 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_register_message_handler\n"));
164                 return -1;
165         }
166         res = ctdb_register_message_handler(ctdb, client, srvid, daemon_message_handler, client);
167         if (res != 0) {
168                 DEBUG(DEBUG_ERR,(__location__ " Failed to register handler %llu in daemon\n", 
169                          (unsigned long long)srvid));
170         } else {
171                 DEBUG(DEBUG_INFO,(__location__ " Registered message handler for srvid=%llu\n", 
172                          (unsigned long long)srvid));
173         }
174
175         return res;
176 }
177
178 /*
179   this is called when the ctdb daemon received a ctdb request to 
180   remove a srvid from the client
181  */
182 int daemon_deregister_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
183 {
184         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
185         if (client == NULL) {
186                 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_deregister_message_handler\n"));
187                 return -1;
188         }
189         return ctdb_deregister_message_handler(ctdb, srvid, client);
190 }
191
192 int daemon_check_srvids(struct ctdb_context *ctdb, TDB_DATA indata,
193                         TDB_DATA *outdata)
194 {
195         uint64_t *ids;
196         int i, num_ids;
197         uint8_t *results;
198
199         if ((indata.dsize % sizeof(uint64_t)) != 0) {
200                 DEBUG(DEBUG_ERR, ("Bad indata in daemon_check_srvids, "
201                                   "size=%d\n", (int)indata.dsize));
202                 return -1;
203         }
204
205         ids = (uint64_t *)indata.dptr;
206         num_ids = indata.dsize / 8;
207
208         results = talloc_zero_array(outdata, uint8_t, (num_ids+7)/8);
209         if (results == NULL) {
210                 DEBUG(DEBUG_ERR, ("talloc failed in daemon_check_srvids\n"));
211                 return -1;
212         }
213         for (i=0; i<num_ids; i++) {
214                 if (ctdb_check_message_handler(ctdb, ids[i])) {
215                         results[i/8] |= (1 << (i%8));
216                 }
217         }
218         outdata->dptr = (uint8_t *)results;
219         outdata->dsize = talloc_get_size(results);
220         return 0;
221 }
222
223 /*
224   destroy a ctdb_client
225 */
226 static int ctdb_client_destructor(struct ctdb_client *client)
227 {
228         struct ctdb_db_context *ctdb_db;
229
230         ctdb_takeover_client_destructor_hook(client);
231         ctdb_reqid_remove(client->ctdb, client->client_id);
232         client->ctdb->num_clients--;
233
234         if (client->num_persistent_updates != 0) {
235                 DEBUG(DEBUG_ERR,(__location__ " Client disconnecting with %u persistent updates in flight. Starting recovery\n", client->num_persistent_updates));
236                 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
237         }
238         ctdb_db = find_ctdb_db(client->ctdb, client->db_id);
239         if (ctdb_db) {
240                 DEBUG(DEBUG_ERR, (__location__ " client exit while transaction "
241                                   "commit active. Forcing recovery.\n"));
242                 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
243
244                 /*
245                  * trans3 transaction state:
246                  *
247                  * The destructor sets the pointer to NULL.
248                  */
249                 talloc_free(ctdb_db->persistent_state);
250         }
251
252         return 0;
253 }
254
255
256 /*
257   this is called when the ctdb daemon received a ctdb request message
258   from a local client over the unix domain socket
259  */
260 static void daemon_request_message_from_client(struct ctdb_client *client, 
261                                                struct ctdb_req_message *c)
262 {
263         TDB_DATA data;
264         int res;
265
266         if (c->hdr.destnode == CTDB_CURRENT_NODE) {
267                 c->hdr.destnode = ctdb_get_pnn(client->ctdb);
268         }
269
270         /* maybe the message is for another client on this node */
271         if (ctdb_get_pnn(client->ctdb)==c->hdr.destnode) {
272                 ctdb_request_message(client->ctdb, (struct ctdb_req_header *)c);
273                 return;
274         }
275
276         /* its for a remote node */
277         data.dptr = &c->data[0];
278         data.dsize = c->datalen;
279         res = ctdb_daemon_send_message(client->ctdb, c->hdr.destnode,
280                                        c->srvid, data);
281         if (res != 0) {
282                 DEBUG(DEBUG_ERR,(__location__ " Failed to send message to remote node %u\n",
283                          c->hdr.destnode));
284         }
285 }
286
287
288 struct daemon_call_state {
289         struct ctdb_client *client;
290         uint32_t reqid;
291         struct ctdb_call *call;
292         struct timeval start_time;
293
294         /* readonly request ? */
295         uint32_t readonly_fetch;
296         uint32_t client_callid;
297 };
298
299 /* 
300    complete a call from a client 
301 */
302 static void daemon_call_from_client_callback(struct ctdb_call_state *state)
303 {
304         struct daemon_call_state *dstate = talloc_get_type(state->async.private_data, 
305                                                            struct daemon_call_state);
306         struct ctdb_reply_call *r;
307         int res;
308         uint32_t length;
309         struct ctdb_client *client = dstate->client;
310         struct ctdb_db_context *ctdb_db = state->ctdb_db;
311
312         talloc_steal(client, dstate);
313         talloc_steal(dstate, dstate->call);
314
315         res = ctdb_daemon_call_recv(state, dstate->call);
316         if (res != 0) {
317                 DEBUG(DEBUG_ERR, (__location__ " ctdbd_call_recv() returned error\n"));
318                 CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
319
320                 CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 1", call_latency, dstate->start_time);
321                 return;
322         }
323
324         length = offsetof(struct ctdb_reply_call, data) + dstate->call->reply_data.dsize;
325         /* If the client asked for readonly FETCH, we remapped this to 
326            FETCH_WITH_HEADER when calling the daemon. So we must
327            strip the extra header off the reply data before passing
328            it back to the client.
329         */
330         if (dstate->readonly_fetch
331         && dstate->client_callid == CTDB_FETCH_FUNC) {
332                 length -= sizeof(struct ctdb_ltdb_header);
333         }
334
335         r = ctdbd_allocate_pkt(client->ctdb, dstate, CTDB_REPLY_CALL, 
336                                length, struct ctdb_reply_call);
337         if (r == NULL) {
338                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate reply_call in ctdb daemon\n"));
339                 CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
340                 CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 2", call_latency, dstate->start_time);
341                 return;
342         }
343         r->hdr.reqid        = dstate->reqid;
344         r->status           = dstate->call->status;
345
346         if (dstate->readonly_fetch
347         && dstate->client_callid == CTDB_FETCH_FUNC) {
348                 /* client only asked for a FETCH so we must strip off
349                    the extra ctdb_ltdb header
350                 */
351                 r->datalen          = dstate->call->reply_data.dsize - sizeof(struct ctdb_ltdb_header);
352                 memcpy(&r->data[0], dstate->call->reply_data.dptr + sizeof(struct ctdb_ltdb_header), r->datalen);
353         } else {
354                 r->datalen          = dstate->call->reply_data.dsize;
355                 memcpy(&r->data[0], dstate->call->reply_data.dptr, r->datalen);
356         }
357
358         res = daemon_queue_send(client, &r->hdr);
359         if (res == -1) {
360                 /* client is dead - return immediately */
361                 return;
362         }
363         if (res != 0) {
364                 DEBUG(DEBUG_ERR, (__location__ " Failed to queue packet from daemon to client\n"));
365         }
366         CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 3", call_latency, dstate->start_time);
367         CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
368         talloc_free(dstate);
369 }
370
371 struct ctdb_daemon_packet_wrap {
372         struct ctdb_context *ctdb;
373         uint32_t client_id;
374 };
375
376 /*
377   a wrapper to catch disconnected clients
378  */
379 static void daemon_incoming_packet_wrap(void *p, struct ctdb_req_header *hdr)
380 {
381         struct ctdb_client *client;
382         struct ctdb_daemon_packet_wrap *w = talloc_get_type(p, 
383                                                             struct ctdb_daemon_packet_wrap);
384         if (w == NULL) {
385                 DEBUG(DEBUG_CRIT,(__location__ " Bad packet type '%s'\n", talloc_get_name(p)));
386                 return;
387         }
388
389         client = ctdb_reqid_find(w->ctdb, w->client_id, struct ctdb_client);
390         if (client == NULL) {
391                 DEBUG(DEBUG_ERR,(__location__ " Packet for disconnected client %u\n",
392                          w->client_id));
393                 talloc_free(w);
394                 return;
395         }
396         talloc_free(w);
397
398         /* process it */
399         daemon_incoming_packet(client, hdr);    
400 }
401
402 struct ctdb_deferred_fetch_call {
403         struct ctdb_deferred_fetch_call *next, *prev;
404         struct ctdb_req_call *c;
405         struct ctdb_daemon_packet_wrap *w;
406 };
407
408 struct ctdb_deferred_fetch_queue {
409         struct ctdb_deferred_fetch_call *deferred_calls;
410 };
411
412 struct ctdb_deferred_requeue {
413         struct ctdb_deferred_fetch_call *dfc;
414         struct ctdb_client *client;
415 };
416
417 /* called from a timer event and starts reprocessing the deferred call.*/
418 static void reprocess_deferred_call(struct event_context *ev, struct timed_event *te, 
419                                        struct timeval t, void *private_data)
420 {
421         struct ctdb_deferred_requeue *dfr = (struct ctdb_deferred_requeue *)private_data;
422         struct ctdb_client *client = dfr->client;
423
424         talloc_steal(client, dfr->dfc->c);
425         daemon_incoming_packet(client, (struct ctdb_req_header *)dfr->dfc->c);
426         talloc_free(dfr);
427 }
428
429 /* the referral context is destroyed either after a timeout or when the initial
430    fetch-lock has finished.
431    at this stage, immediately start reprocessing the queued up deferred
432    calls so they get reprocessed immediately (and since we are dmaster at
433    this stage, trigger the waiting smbd processes to pick up and aquire the
434    record right away.
435 */
436 static int deferred_fetch_queue_destructor(struct ctdb_deferred_fetch_queue *dfq)
437 {
438
439         /* need to reprocess the packets from the queue explicitely instead of
440            just using a normal destructor since we want, need, to
441            call the clients in the same oder as the requests queued up
442         */
443         while (dfq->deferred_calls != NULL) {
444                 struct ctdb_client *client;
445                 struct ctdb_deferred_fetch_call *dfc = dfq->deferred_calls;
446                 struct ctdb_deferred_requeue *dfr;
447
448                 DLIST_REMOVE(dfq->deferred_calls, dfc);
449
450                 client = ctdb_reqid_find(dfc->w->ctdb, dfc->w->client_id, struct ctdb_client);
451                 if (client == NULL) {
452                         DEBUG(DEBUG_ERR,(__location__ " Packet for disconnected client %u\n",
453                                  dfc->w->client_id));
454                         continue;
455                 }
456
457                 /* process it by pushing it back onto the eventloop */
458                 dfr = talloc(client, struct ctdb_deferred_requeue);
459                 if (dfr == NULL) {
460                         DEBUG(DEBUG_ERR,("Failed to allocate deferred fetch requeue structure\n"));
461                         continue;
462                 }
463
464                 dfr->dfc    = talloc_steal(dfr, dfc);
465                 dfr->client = client;
466
467                 event_add_timed(dfc->w->ctdb->ev, client, timeval_zero(), reprocess_deferred_call, dfr);
468         }
469
470         return 0;
471 }
472
473 /* insert the new deferral context into the rb tree.
474    there should never be a pre-existing context here, but check for it
475    warn and destroy the previous context if there is already a deferral context
476    for this key.
477 */
478 static void *insert_dfq_callback(void *parm, void *data)
479 {
480         if (data) {
481                 DEBUG(DEBUG_ERR,("Already have DFQ registered. Free old %p and create new %p\n", data, parm));
482                 talloc_free(data);
483         }
484         return parm;
485 }
486
487 /* if the original fetch-lock did not complete within a reasonable time,
488    free the context and context for all deferred requests to cause them to be
489    re-inserted into the event system.
490 */
491 static void dfq_timeout(struct event_context *ev, struct timed_event *te, 
492                                   struct timeval t, void *private_data)
493 {
494         talloc_free(private_data);
495 }
496
497 /* This function is used in the local daemon to register a KEY in a database
498    for being "fetched"
499    While the remote fetch is in-flight, any futher attempts to re-fetch the
500    same record will be deferred until the fetch completes.
501 */
502 static int setup_deferred_fetch_locks(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
503 {
504         uint32_t *k;
505         struct ctdb_deferred_fetch_queue *dfq;
506
507         k = ctdb_key_to_idkey(call, call->key);
508         if (k == NULL) {
509                 DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch\n"));
510                 return -1;
511         }
512
513         dfq  = talloc(call, struct ctdb_deferred_fetch_queue);
514         if (dfq == NULL) {
515                 DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch queue structure\n"));
516                 talloc_free(k);
517                 return -1;
518         }
519         dfq->deferred_calls = NULL;
520
521         trbt_insertarray32_callback(ctdb_db->deferred_fetch, k[0], &k[0], insert_dfq_callback, dfq);
522
523         talloc_set_destructor(dfq, deferred_fetch_queue_destructor);
524
525         /* if the fetch havent completed in 30 seconds, just tear it all down
526            and let it try again as the events are reissued */
527         event_add_timed(ctdb_db->ctdb->ev, dfq, timeval_current_ofs(30, 0), dfq_timeout, dfq);
528
529         talloc_free(k);
530         return 0;
531 }
532
533 /* check if this is a duplicate request to a fetch already in-flight
534    if it is, make this call deferred to be reprocessed later when
535    the in-flight fetch completes.
536 */
537 static int requeue_duplicate_fetch(struct ctdb_db_context *ctdb_db, struct ctdb_client *client, TDB_DATA key, struct ctdb_req_call *c)
538 {
539         uint32_t *k;
540         struct ctdb_deferred_fetch_queue *dfq;
541         struct ctdb_deferred_fetch_call *dfc;
542
543         k = ctdb_key_to_idkey(c, key);
544         if (k == NULL) {
545                 DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch\n"));
546                 return -1;
547         }
548
549         dfq = trbt_lookuparray32(ctdb_db->deferred_fetch, k[0], &k[0]);
550         if (dfq == NULL) {
551                 talloc_free(k);
552                 return -1;
553         }
554
555
556         talloc_free(k);
557
558         dfc = talloc(dfq, struct ctdb_deferred_fetch_call);
559         if (dfc == NULL) {
560                 DEBUG(DEBUG_ERR, ("Failed to allocate deferred fetch call structure\n"));
561                 return -1;
562         }
563
564         dfc->w = talloc(dfc, struct ctdb_daemon_packet_wrap);
565         if (dfc->w == NULL) {
566                 DEBUG(DEBUG_ERR,("Failed to allocate deferred fetch daemon packet wrap structure\n"));
567                 talloc_free(dfc);
568                 return -1;
569         }
570
571         dfc->c = talloc_steal(dfc, c);
572         dfc->w->ctdb = ctdb_db->ctdb;
573         dfc->w->client_id = client->client_id;
574
575         DLIST_ADD_END(dfq->deferred_calls, dfc, NULL);
576
577         return 0;
578 }
579
580
581 /*
582   this is called when the ctdb daemon received a ctdb request call
583   from a local client over the unix domain socket
584  */
585 static void daemon_request_call_from_client(struct ctdb_client *client, 
586                                             struct ctdb_req_call *c)
587 {
588         struct ctdb_call_state *state;
589         struct ctdb_db_context *ctdb_db;
590         struct daemon_call_state *dstate;
591         struct ctdb_call *call;
592         struct ctdb_ltdb_header header;
593         TDB_DATA key, data;
594         int ret;
595         struct ctdb_context *ctdb = client->ctdb;
596         struct ctdb_daemon_packet_wrap *w;
597
598         CTDB_INCREMENT_STAT(ctdb, total_calls);
599         CTDB_INCREMENT_STAT(ctdb, pending_calls);
600
601         ctdb_db = find_ctdb_db(client->ctdb, c->db_id);
602         if (!ctdb_db) {
603                 DEBUG(DEBUG_ERR, (__location__ " Unknown database in request. db_id==0x%08x",
604                           c->db_id));
605                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
606                 return;
607         }
608
609         if (ctdb_db->unhealthy_reason) {
610                 /*
611                  * this is just a warning, as the tdb should be empty anyway,
612                  * and only persistent databases can be unhealthy, which doesn't
613                  * use this code patch
614                  */
615                 DEBUG(DEBUG_WARNING,("warn: db(%s) unhealty in daemon_request_call_from_client(): %s\n",
616                                      ctdb_db->db_name, ctdb_db->unhealthy_reason));
617         }
618
619         key.dptr = c->data;
620         key.dsize = c->keylen;
621
622         w = talloc(ctdb, struct ctdb_daemon_packet_wrap);
623         CTDB_NO_MEMORY_VOID(ctdb, w);   
624
625         w->ctdb = ctdb;
626         w->client_id = client->client_id;
627
628         ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header, 
629                                            (struct ctdb_req_header *)c, &data,
630                                            daemon_incoming_packet_wrap, w, true);
631         if (ret == -2) {
632                 /* will retry later */
633                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
634                 return;
635         }
636
637         talloc_free(w);
638
639         if (ret != 0) {
640                 DEBUG(DEBUG_ERR,(__location__ " Unable to fetch record\n"));
641                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
642                 return;
643         }
644
645
646         /* check if this fetch request is a duplicate for a
647            request we already have in flight. If so defer it until
648            the first request completes.
649         */
650         if (ctdb->tunable.fetch_collapse == 1) {
651                 if (requeue_duplicate_fetch(ctdb_db, client, key, c) == 0) {
652                         ret = ctdb_ltdb_unlock(ctdb_db, key);
653                         if (ret != 0) {
654                                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
655                         }
656                         CTDB_DECREMENT_STAT(ctdb, pending_calls);
657                         return;
658                 }
659         }
660
661         /* Dont do READONLY if we dont have a tracking database */
662         if ((c->flags & CTDB_WANT_READONLY) && !ctdb_db->readonly) {
663                 c->flags &= ~CTDB_WANT_READONLY;
664         }
665
666         if (header.flags & CTDB_REC_RO_REVOKE_COMPLETE) {
667                 header.flags &= ~CTDB_REC_RO_FLAGS;
668                 CTDB_INCREMENT_STAT(ctdb, total_ro_revokes);
669                 CTDB_INCREMENT_DB_STAT(ctdb_db, db_ro_revokes);
670                 if (ctdb_ltdb_store(ctdb_db, key, &header, data) != 0) {
671                         ctdb_fatal(ctdb, "Failed to write header with cleared REVOKE flag");
672                 }
673                 /* and clear out the tracking data */
674                 if (tdb_delete(ctdb_db->rottdb, key) != 0) {
675                         DEBUG(DEBUG_ERR,(__location__ " Failed to clear out trackingdb record\n"));
676                 }
677         }
678
679         /* if we are revoking, we must defer all other calls until the revoke
680          * had completed.
681          */
682         if (header.flags & CTDB_REC_RO_REVOKING_READONLY) {
683                 talloc_free(data.dptr);
684                 ret = ctdb_ltdb_unlock(ctdb_db, key);
685
686                 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, key, (struct ctdb_req_header *)c, daemon_incoming_packet, client) != 0) {
687                         ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
688                 }
689                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
690                 return;
691         }
692
693         if ((header.dmaster == ctdb->pnn)
694         && (!(c->flags & CTDB_WANT_READONLY))
695         && (header.flags & (CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY)) ) {
696                 header.flags   |= CTDB_REC_RO_REVOKING_READONLY;
697                 if (ctdb_ltdb_store(ctdb_db, key, &header, data) != 0) {
698                         ctdb_fatal(ctdb, "Failed to store record with HAVE_DELEGATIONS set");
699                 }
700                 ret = ctdb_ltdb_unlock(ctdb_db, key);
701
702                 if (ctdb_start_revoke_ro_record(ctdb, ctdb_db, key, &header, data) != 0) {
703                         ctdb_fatal(ctdb, "Failed to start record revoke");
704                 }
705                 talloc_free(data.dptr);
706
707                 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, key, (struct ctdb_req_header *)c, daemon_incoming_packet, client) != 0) {
708                         ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
709                 }
710
711                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
712                 return;
713         }               
714
715         dstate = talloc(client, struct daemon_call_state);
716         if (dstate == NULL) {
717                 ret = ctdb_ltdb_unlock(ctdb_db, key);
718                 if (ret != 0) {
719                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
720                 }
721
722                 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate dstate\n"));
723                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
724                 return;
725         }
726         dstate->start_time = timeval_current();
727         dstate->client = client;
728         dstate->reqid  = c->hdr.reqid;
729         talloc_steal(dstate, data.dptr);
730
731         call = dstate->call = talloc_zero(dstate, struct ctdb_call);
732         if (call == NULL) {
733                 ret = ctdb_ltdb_unlock(ctdb_db, key);
734                 if (ret != 0) {
735                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
736                 }
737
738                 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate call\n"));
739                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
740                 CTDB_UPDATE_LATENCY(ctdb, ctdb_db, "call_from_client 1", call_latency, dstate->start_time);
741                 return;
742         }
743
744         dstate->readonly_fetch = 0;
745         call->call_id = c->callid;
746         call->key = key;
747         call->call_data.dptr = c->data + c->keylen;
748         call->call_data.dsize = c->calldatalen;
749         call->flags = c->flags;
750
751         if (c->flags & CTDB_WANT_READONLY) {
752                 /* client wants readonly record, so translate this into a 
753                    fetch with header. remember what the client asked for
754                    so we can remap the reply back to the proper format for
755                    the client in the reply
756                  */
757                 dstate->client_callid = call->call_id;
758                 call->call_id = CTDB_FETCH_WITH_HEADER_FUNC;
759                 dstate->readonly_fetch = 1;
760         }
761
762         if (header.dmaster == ctdb->pnn) {
763                 state = ctdb_call_local_send(ctdb_db, call, &header, &data);
764         } else {
765                 state = ctdb_daemon_call_send_remote(ctdb_db, call, &header);
766                 if (ctdb->tunable.fetch_collapse == 1) {
767                         /* This request triggered a remote fetch-lock.
768                            set up a deferral for this key so any additional
769                            fetch-locks are deferred until the current one
770                            finishes.
771                          */
772                         setup_deferred_fetch_locks(ctdb_db, call);
773                 }
774         }
775
776         ret = ctdb_ltdb_unlock(ctdb_db, key);
777         if (ret != 0) {
778                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
779         }
780
781         if (state == NULL) {
782                 DEBUG(DEBUG_ERR,(__location__ " Unable to setup call send\n"));
783                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
784                 CTDB_UPDATE_LATENCY(ctdb, ctdb_db, "call_from_client 2", call_latency, dstate->start_time);
785                 return;
786         }
787         talloc_steal(state, dstate);
788         talloc_steal(client, state);
789
790         state->async.fn = daemon_call_from_client_callback;
791         state->async.private_data = dstate;
792 }
793
794
795 static void daemon_request_control_from_client(struct ctdb_client *client, 
796                                                struct ctdb_req_control *c);
797
798 /* data contains a packet from the client */
799 static void daemon_incoming_packet(void *p, struct ctdb_req_header *hdr)
800 {
801         struct ctdb_client *client = talloc_get_type(p, struct ctdb_client);
802         TALLOC_CTX *tmp_ctx;
803         struct ctdb_context *ctdb = client->ctdb;
804
805         /* place the packet as a child of a tmp_ctx. We then use
806            talloc_free() below to free it. If any of the calls want
807            to keep it, then they will steal it somewhere else, and the
808            talloc_free() will be a no-op */
809         tmp_ctx = talloc_new(client);
810         talloc_steal(tmp_ctx, hdr);
811
812         if (hdr->ctdb_magic != CTDB_MAGIC) {
813                 ctdb_set_error(client->ctdb, "Non CTDB packet rejected in daemon\n");
814                 goto done;
815         }
816
817         if (hdr->ctdb_version != CTDB_PROTOCOL) {
818                 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
819                 goto done;
820         }
821
822         switch (hdr->operation) {
823         case CTDB_REQ_CALL:
824                 CTDB_INCREMENT_STAT(ctdb, client.req_call);
825                 daemon_request_call_from_client(client, (struct ctdb_req_call *)hdr);
826                 break;
827
828         case CTDB_REQ_MESSAGE:
829                 CTDB_INCREMENT_STAT(ctdb, client.req_message);
830                 daemon_request_message_from_client(client, (struct ctdb_req_message *)hdr);
831                 break;
832
833         case CTDB_REQ_CONTROL:
834                 CTDB_INCREMENT_STAT(ctdb, client.req_control);
835                 daemon_request_control_from_client(client, (struct ctdb_req_control *)hdr);
836                 break;
837
838         default:
839                 DEBUG(DEBUG_CRIT,(__location__ " daemon: unrecognized operation %u\n",
840                          hdr->operation));
841         }
842
843 done:
844         talloc_free(tmp_ctx);
845 }
846
847 /*
848   called when the daemon gets a incoming packet
849  */
850 static void ctdb_daemon_read_cb(uint8_t *data, size_t cnt, void *args)
851 {
852         struct ctdb_client *client = talloc_get_type(args, struct ctdb_client);
853         struct ctdb_req_header *hdr;
854
855         if (cnt == 0) {
856                 talloc_free(client);
857                 return;
858         }
859
860         CTDB_INCREMENT_STAT(client->ctdb, client_packets_recv);
861
862         if (cnt < sizeof(*hdr)) {
863                 ctdb_set_error(client->ctdb, "Bad packet length %u in daemon\n", 
864                                (unsigned)cnt);
865                 return;
866         }
867         hdr = (struct ctdb_req_header *)data;
868         if (cnt != hdr->length) {
869                 ctdb_set_error(client->ctdb, "Bad header length %u expected %u\n in daemon", 
870                                (unsigned)hdr->length, (unsigned)cnt);
871                 return;
872         }
873
874         if (hdr->ctdb_magic != CTDB_MAGIC) {
875                 ctdb_set_error(client->ctdb, "Non CTDB packet rejected\n");
876                 return;
877         }
878
879         if (hdr->ctdb_version != CTDB_PROTOCOL) {
880                 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
881                 return;
882         }
883
884         DEBUG(DEBUG_DEBUG,(__location__ " client request %u of type %u length %u from "
885                  "node %u to %u\n", hdr->reqid, hdr->operation, hdr->length,
886                  hdr->srcnode, hdr->destnode));
887
888         /* it is the responsibility of the incoming packet function to free 'data' */
889         daemon_incoming_packet(client, hdr);
890 }
891
892
893 static int ctdb_clientpid_destructor(struct ctdb_client_pid_list *client_pid)
894 {
895         if (client_pid->ctdb->client_pids != NULL) {
896                 DLIST_REMOVE(client_pid->ctdb->client_pids, client_pid);
897         }
898
899         return 0;
900 }
901
902
903 static void ctdb_accept_client(struct event_context *ev, struct fd_event *fde, 
904                          uint16_t flags, void *private_data)
905 {
906         struct sockaddr_un addr;
907         socklen_t len;
908         int fd;
909         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
910         struct ctdb_client *client;
911         struct ctdb_client_pid_list *client_pid;
912         pid_t peer_pid = 0;
913
914         memset(&addr, 0, sizeof(addr));
915         len = sizeof(addr);
916         fd = accept(ctdb->daemon.sd, (struct sockaddr *)&addr, &len);
917         if (fd == -1) {
918                 return;
919         }
920
921         set_nonblocking(fd);
922         set_close_on_exec(fd);
923
924         DEBUG(DEBUG_DEBUG,(__location__ " Created SOCKET FD:%d to connected child\n", fd));
925
926         client = talloc_zero(ctdb, struct ctdb_client);
927         if (ctdb_get_peer_pid(fd, &peer_pid) == 0) {
928                 DEBUG(DEBUG_INFO,("Connected client with pid:%u\n", (unsigned)peer_pid));
929         }
930
931         client->ctdb = ctdb;
932         client->fd = fd;
933         client->client_id = ctdb_reqid_new(ctdb, client);
934         client->pid = peer_pid;
935
936         client_pid = talloc(client, struct ctdb_client_pid_list);
937         if (client_pid == NULL) {
938                 DEBUG(DEBUG_ERR,("Failed to allocate client pid structure\n"));
939                 close(fd);
940                 talloc_free(client);
941                 return;
942         }               
943         client_pid->ctdb   = ctdb;
944         client_pid->pid    = peer_pid;
945         client_pid->client = client;
946
947         DLIST_ADD(ctdb->client_pids, client_pid);
948
949         client->queue = ctdb_queue_setup(ctdb, client, fd, CTDB_DS_ALIGNMENT, 
950                                          ctdb_daemon_read_cb, client,
951                                          "client-%u", client->pid);
952
953         talloc_set_destructor(client, ctdb_client_destructor);
954         talloc_set_destructor(client_pid, ctdb_clientpid_destructor);
955         ctdb->num_clients++;
956 }
957
958
959
960 /*
961   create a unix domain socket and bind it
962   return a file descriptor open on the socket 
963 */
964 static int ux_socket_bind(struct ctdb_context *ctdb)
965 {
966         struct sockaddr_un addr;
967
968         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
969         if (ctdb->daemon.sd == -1) {
970                 return -1;
971         }
972
973         memset(&addr, 0, sizeof(addr));
974         addr.sun_family = AF_UNIX;
975         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path)-1);
976
977         /* First check if an old ctdbd might be running */
978         if (connect(ctdb->daemon.sd,
979                     (struct sockaddr *)&addr, sizeof(addr)) == 0) {
980                 DEBUG(DEBUG_CRIT,
981                       ("Something is already listening on ctdb socket '%s'\n",
982                        ctdb->daemon.name));
983                 goto failed;
984         }
985
986         /* Remove any old socket */
987         unlink(ctdb->daemon.name);
988
989         set_close_on_exec(ctdb->daemon.sd);
990         set_nonblocking(ctdb->daemon.sd);
991
992         if (bind(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
993                 DEBUG(DEBUG_CRIT,("Unable to bind on ctdb socket '%s'\n", ctdb->daemon.name));
994                 goto failed;
995         }
996
997         if (chown(ctdb->daemon.name, geteuid(), getegid()) != 0 ||
998             chmod(ctdb->daemon.name, 0700) != 0) {
999                 DEBUG(DEBUG_CRIT,("Unable to secure ctdb socket '%s', ctdb->daemon.name\n", ctdb->daemon.name));
1000                 goto failed;
1001         }
1002
1003
1004         if (listen(ctdb->daemon.sd, 100) != 0) {
1005                 DEBUG(DEBUG_CRIT,("Unable to listen on ctdb socket '%s'\n", ctdb->daemon.name));
1006                 goto failed;
1007         }
1008
1009         return 0;
1010
1011 failed:
1012         close(ctdb->daemon.sd);
1013         ctdb->daemon.sd = -1;
1014         return -1;      
1015 }
1016
1017 static void initialise_node_flags (struct ctdb_context *ctdb)
1018 {
1019         if (ctdb->pnn == -1) {
1020                 ctdb_fatal(ctdb, "PNN is set to -1 (unknown value)");
1021         }
1022
1023         ctdb->nodes[ctdb->pnn]->flags &= ~NODE_FLAGS_DISCONNECTED;
1024
1025         /* do we start out in DISABLED mode? */
1026         if (ctdb->start_as_disabled != 0) {
1027                 DEBUG(DEBUG_INFO, ("This node is configured to start in DISABLED state\n"));
1028                 ctdb->nodes[ctdb->pnn]->flags |= NODE_FLAGS_DISABLED;
1029         }
1030         /* do we start out in STOPPED mode? */
1031         if (ctdb->start_as_stopped != 0) {
1032                 DEBUG(DEBUG_INFO, ("This node is configured to start in STOPPED state\n"));
1033                 ctdb->nodes[ctdb->pnn]->flags |= NODE_FLAGS_STOPPED;
1034         }
1035 }
1036
1037 static void ctdb_setup_event_callback(struct ctdb_context *ctdb, int status,
1038                                       void *private_data)
1039 {
1040         if (status != 0) {
1041                 ctdb_die(ctdb, "Failed to run setup event");
1042         }
1043         ctdb_run_notification_script(ctdb, "setup");
1044
1045         /* tell all other nodes we've just started up */
1046         ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL,
1047                                  0, CTDB_CONTROL_STARTUP, 0,
1048                                  CTDB_CTRL_FLAG_NOREPLY,
1049                                  tdb_null, NULL, NULL);
1050
1051         /* Start the recovery daemon */
1052         if (ctdb_start_recoverd(ctdb) != 0) {
1053                 DEBUG(DEBUG_ALERT,("Failed to start recovery daemon\n"));
1054                 exit(11);
1055         }
1056
1057         ctdb_start_periodic_events(ctdb);
1058
1059         ctdb_wait_for_first_recovery(ctdb);
1060 }
1061
1062 static struct timeval tevent_before_wait_ts;
1063 static struct timeval tevent_after_wait_ts;
1064
1065 static void ctdb_tevent_trace(enum tevent_trace_point tp,
1066                               void *private_data)
1067 {
1068         struct timeval diff;
1069         struct timeval now;
1070         struct ctdb_context *ctdb =
1071                 talloc_get_type(private_data, struct ctdb_context);
1072
1073         if (getpid() != ctdb->ctdbd_pid) {
1074                 return;
1075         }
1076
1077         now = timeval_current();
1078
1079         switch (tp) {
1080         case TEVENT_TRACE_BEFORE_WAIT:
1081                 if (!timeval_is_zero(&tevent_after_wait_ts)) {
1082                         diff = timeval_until(&tevent_after_wait_ts, &now);
1083                         if (diff.tv_sec > 3) {
1084                                 DEBUG(DEBUG_ERR,
1085                                       ("Handling event took %ld seconds!\n",
1086                                        diff.tv_sec));
1087                         }
1088                 }
1089                 tevent_before_wait_ts = now;
1090                 break;
1091
1092         case TEVENT_TRACE_AFTER_WAIT:
1093                 if (!timeval_is_zero(&tevent_before_wait_ts)) {
1094                         diff = timeval_until(&tevent_before_wait_ts, &now);
1095                         if (diff.tv_sec > 3) {
1096                                 DEBUG(DEBUG_CRIT,
1097                                       ("No event for %ld seconds!\n",
1098                                        diff.tv_sec));
1099                         }
1100                 }
1101                 tevent_after_wait_ts = now;
1102                 break;
1103
1104         default:
1105                 /* Do nothing for future tevent trace points */ ;
1106         }
1107 }
1108
1109 static void ctdb_remove_pidfile(void)
1110 {
1111         /* Only the main ctdbd's PID matches the SID */
1112         if (ctdbd_pidfile != NULL && getsid(0) == getpid()) {
1113                 if (unlink(ctdbd_pidfile) == 0) {
1114                         DEBUG(DEBUG_NOTICE, ("Removed PID file %s\n",
1115                                              ctdbd_pidfile));
1116                 } else {
1117                         DEBUG(DEBUG_WARNING, ("Failed to Remove PID file %s\n",
1118                                               ctdbd_pidfile));
1119                 }
1120         }
1121 }
1122
1123 static void ctdb_create_pidfile(pid_t pid)
1124 {
1125         if (ctdbd_pidfile != NULL) {
1126                 FILE *fp;
1127
1128                 fp = fopen(ctdbd_pidfile, "w");
1129                 if (fp == NULL) {
1130                         DEBUG(DEBUG_ALERT,
1131                               ("Failed to open PID file %s\n", ctdbd_pidfile));
1132                         exit(11);
1133                 }
1134
1135                 fprintf(fp, "%d\n", pid);
1136                 fclose(fp);
1137                 DEBUG(DEBUG_NOTICE, ("Created PID file %s\n", ctdbd_pidfile));
1138                 atexit(ctdb_remove_pidfile);
1139         }
1140 }
1141
1142 static void ctdb_initialise_vnn_map(struct ctdb_context *ctdb)
1143 {
1144         int i, j, count;
1145
1146         /* initialize the vnn mapping table, skipping any deleted nodes */
1147         ctdb->vnn_map = talloc(ctdb, struct ctdb_vnn_map);
1148         CTDB_NO_MEMORY_FATAL(ctdb, ctdb->vnn_map);
1149
1150         count = 0;
1151         for (i = 0; i < ctdb->num_nodes; i++) {
1152                 if ((ctdb->nodes[i]->flags & NODE_FLAGS_DELETED) == 0) {
1153                         count++;
1154                 }
1155         }
1156
1157         ctdb->vnn_map->generation = INVALID_GENERATION;
1158         ctdb->vnn_map->size = count;
1159         ctdb->vnn_map->map = talloc_array(ctdb->vnn_map, uint32_t, ctdb->vnn_map->size);
1160         CTDB_NO_MEMORY_FATAL(ctdb, ctdb->vnn_map->map);
1161
1162         for(i=0, j=0; i < ctdb->vnn_map->size; i++) {
1163                 if (ctdb->nodes[i]->flags & NODE_FLAGS_DELETED) {
1164                         continue;
1165                 }
1166                 ctdb->vnn_map->map[j] = i;
1167                 j++;
1168         }
1169 }
1170
1171 static void ctdb_set_my_pnn(struct ctdb_context *ctdb)
1172 {
1173         int nodeid;
1174
1175         if (ctdb->address == NULL) {
1176                 ctdb_fatal(ctdb,
1177                            "Can not determine PNN - node address is not set\n");
1178         }
1179
1180         nodeid = ctdb_ip_to_nodeid(ctdb, ctdb->address);
1181         if (nodeid == -1) {
1182                 ctdb_fatal(ctdb,
1183                            "Can not determine PNN - node address not found in node list\n");
1184         }
1185
1186         ctdb->pnn = ctdb->nodes[nodeid]->pnn;
1187         DEBUG(DEBUG_NOTICE, ("PNN is %u\n", ctdb->pnn));
1188 }
1189
1190 /*
1191   start the protocol going as a daemon
1192 */
1193 int ctdb_start_daemon(struct ctdb_context *ctdb, bool do_fork)
1194 {
1195         int res, ret = -1;
1196         struct fd_event *fde;
1197         const char *domain_socket_name;
1198
1199         /* create a unix domain stream socket to listen to */
1200         res = ux_socket_bind(ctdb);
1201         if (res!=0) {
1202                 DEBUG(DEBUG_ALERT,("Cannot continue.  Exiting!\n"));
1203                 exit(10);
1204         }
1205
1206         if (do_fork && fork()) {
1207                 return 0;
1208         }
1209
1210         tdb_reopen_all(false);
1211
1212         if (do_fork) {
1213                 if (setsid() == -1) {
1214                         ctdb_die(ctdb, "Failed to setsid()\n");
1215                 }
1216                 close(0);
1217                 if (open("/dev/null", O_RDONLY) != 0) {
1218                         DEBUG(DEBUG_ALERT,(__location__ " Failed to setup stdin on /dev/null\n"));
1219                         exit(11);
1220                 }
1221         }
1222         ignore_signal(SIGPIPE);
1223
1224         ctdb->ctdbd_pid = getpid();
1225         DEBUG(DEBUG_ERR, ("Starting CTDBD (Version %s) as PID: %u\n",
1226                           CTDB_VERSION_STRING, ctdb->ctdbd_pid));
1227         ctdb_create_pidfile(ctdb->ctdbd_pid);
1228
1229         /* Make sure we log something when the daemon terminates.
1230          * This must be the first exit handler to run (so the last to
1231          * be registered.
1232          */
1233         atexit(print_exit_message);
1234
1235         if (ctdb->do_setsched) {
1236                 /* try to set us up as realtime */
1237                 if (!set_scheduler()) {
1238                         exit(1);
1239                 }
1240                 DEBUG(DEBUG_NOTICE, ("Set real-time scheduler priority\n"));
1241         }
1242
1243         /* ensure the socket is deleted on exit of the daemon */
1244         domain_socket_name = talloc_strdup(talloc_autofree_context(), ctdb->daemon.name);
1245         if (domain_socket_name == NULL) {
1246                 DEBUG(DEBUG_ALERT,(__location__ " talloc_strdup failed.\n"));
1247                 exit(12);
1248         }
1249
1250         ctdb->ev = event_context_init(NULL);
1251         tevent_loop_allow_nesting(ctdb->ev);
1252         tevent_set_trace_callback(ctdb->ev, ctdb_tevent_trace, ctdb);
1253         ret = ctdb_init_tevent_logging(ctdb);
1254         if (ret != 0) {
1255                 DEBUG(DEBUG_ALERT,("Failed to initialize TEVENT logging\n"));
1256                 exit(1);
1257         }
1258
1259         /* set up a handler to pick up sigchld */
1260         if (ctdb_init_sigchld(ctdb) == NULL) {
1261                 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD\n"));
1262                 exit(1);
1263         }
1264
1265         ctdb_set_child_logging(ctdb);
1266
1267         /* initialize statistics collection */
1268         ctdb_statistics_init(ctdb);
1269
1270         /* force initial recovery for election */
1271         ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
1272
1273         ctdb_set_runstate(ctdb, CTDB_RUNSTATE_INIT);
1274         ret = ctdb_event_script(ctdb, CTDB_EVENT_INIT);
1275         if (ret != 0) {
1276                 ctdb_die(ctdb, "Failed to run init event\n");
1277         }
1278         ctdb_run_notification_script(ctdb, "init");
1279
1280         if (strcmp(ctdb->transport, "tcp") == 0) {
1281                 ret = ctdb_tcp_init(ctdb);
1282         }
1283 #ifdef USE_INFINIBAND
1284         if (strcmp(ctdb->transport, "ib") == 0) {
1285                 ret = ctdb_ibw_init(ctdb);
1286         }
1287 #endif
1288         if (ret != 0) {
1289                 DEBUG(DEBUG_ERR,("Failed to initialise transport '%s'\n", ctdb->transport));
1290                 return -1;
1291         }
1292
1293         if (ctdb->methods == NULL) {
1294                 DEBUG(DEBUG_ALERT,(__location__ " Can not initialize transport. ctdb->methods is NULL\n"));
1295                 ctdb_fatal(ctdb, "transport is unavailable. can not initialize.");
1296         }
1297
1298         /* Initialise the transport.  This sets the node address if it
1299          * was not set via the command-line. */
1300         if (ctdb->methods->initialise(ctdb) != 0) {
1301                 ctdb_fatal(ctdb, "transport failed to initialise");
1302         }
1303
1304         ctdb_set_my_pnn(ctdb);
1305
1306         initialise_node_flags(ctdb);
1307
1308         if (ctdb->public_addresses_file) {
1309                 ret = ctdb_set_public_addresses(ctdb, true);
1310                 if (ret == -1) {
1311                         DEBUG(DEBUG_ALERT,("Unable to setup public address list\n"));
1312                         exit(1);
1313                 }
1314                 if (ctdb->do_checkpublicip) {
1315                         ctdb_start_monitoring_interfaces(ctdb);
1316                 }
1317         }
1318
1319         ctdb_initialise_vnn_map(ctdb);
1320
1321         /* attach to existing databases */
1322         if (ctdb_attach_databases(ctdb) != 0) {
1323                 ctdb_fatal(ctdb, "Failed to attach to databases\n");
1324         }
1325
1326         /* start frozen, then let the first election sort things out */
1327         if (!ctdb_blocking_freeze(ctdb)) {
1328                 ctdb_fatal(ctdb, "Failed to get initial freeze\n");
1329         }
1330
1331         /* now start accepting clients, only can do this once frozen */
1332         fde = event_add_fd(ctdb->ev, ctdb, ctdb->daemon.sd, 
1333                            EVENT_FD_READ,
1334                            ctdb_accept_client, ctdb);
1335         if (fde == NULL) {
1336                 ctdb_fatal(ctdb, "Failed to add daemon socket to event loop");
1337         }
1338         tevent_fd_set_auto_close(fde);
1339
1340         /* release any IPs we hold from previous runs of the daemon */
1341         if (ctdb->tunable.disable_ip_failover == 0) {
1342                 ctdb_release_all_ips(ctdb);
1343         }
1344
1345         /* Start the transport */
1346         if (ctdb->methods->start(ctdb) != 0) {
1347                 DEBUG(DEBUG_ALERT,("transport failed to start!\n"));
1348                 ctdb_fatal(ctdb, "transport failed to start");
1349         }
1350
1351         /* Recovery daemon and timed events are started from the
1352          * callback, only after the setup event completes
1353          * successfully.
1354          */
1355         ctdb_set_runstate(ctdb, CTDB_RUNSTATE_SETUP);
1356         ret = ctdb_event_script_callback(ctdb,
1357                                          ctdb,
1358                                          ctdb_setup_event_callback,
1359                                          ctdb,
1360                                          CTDB_EVENT_SETUP,
1361                                          "%s",
1362                                          "");
1363         if (ret != 0) {
1364                 DEBUG(DEBUG_CRIT,("Failed to set up 'setup' event\n"));
1365                 exit(1);
1366         }
1367
1368         lockdown_memory(ctdb->valgrinding);
1369
1370         /* go into a wait loop to allow other nodes to complete */
1371         event_loop_wait(ctdb->ev);
1372
1373         DEBUG(DEBUG_CRIT,("event_loop_wait() returned. this should not happen\n"));
1374         exit(1);
1375 }
1376
1377 /*
1378   allocate a packet for use in daemon<->daemon communication
1379  */
1380 struct ctdb_req_header *_ctdb_transport_allocate(struct ctdb_context *ctdb,
1381                                                  TALLOC_CTX *mem_ctx, 
1382                                                  enum ctdb_operation operation, 
1383                                                  size_t length, size_t slength,
1384                                                  const char *type)
1385 {
1386         int size;
1387         struct ctdb_req_header *hdr;
1388
1389         length = MAX(length, slength);
1390         size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
1391
1392         if (ctdb->methods == NULL) {
1393                 DEBUG(DEBUG_INFO,(__location__ " Unable to allocate transport packet for operation %u of length %u. Transport is DOWN.\n",
1394                          operation, (unsigned)length));
1395                 return NULL;
1396         }
1397
1398         hdr = (struct ctdb_req_header *)ctdb->methods->allocate_pkt(mem_ctx, size);
1399         if (hdr == NULL) {
1400                 DEBUG(DEBUG_ERR,("Unable to allocate transport packet for operation %u of length %u\n",
1401                          operation, (unsigned)length));
1402                 return NULL;
1403         }
1404         talloc_set_name_const(hdr, type);
1405         memset(hdr, 0, slength);
1406         hdr->length       = length;
1407         hdr->operation    = operation;
1408         hdr->ctdb_magic   = CTDB_MAGIC;
1409         hdr->ctdb_version = CTDB_PROTOCOL;
1410         hdr->generation   = ctdb->vnn_map->generation;
1411         hdr->srcnode      = ctdb->pnn;
1412
1413         return hdr;     
1414 }
1415
1416 struct daemon_control_state {
1417         struct daemon_control_state *next, *prev;
1418         struct ctdb_client *client;
1419         struct ctdb_req_control *c;
1420         uint32_t reqid;
1421         struct ctdb_node *node;
1422 };
1423
1424 /*
1425   callback when a control reply comes in
1426  */
1427 static void daemon_control_callback(struct ctdb_context *ctdb,
1428                                     int32_t status, TDB_DATA data, 
1429                                     const char *errormsg,
1430                                     void *private_data)
1431 {
1432         struct daemon_control_state *state = talloc_get_type(private_data, 
1433                                                              struct daemon_control_state);
1434         struct ctdb_client *client = state->client;
1435         struct ctdb_reply_control *r;
1436         size_t len;
1437         int ret;
1438
1439         /* construct a message to send to the client containing the data */
1440         len = offsetof(struct ctdb_reply_control, data) + data.dsize;
1441         if (errormsg) {
1442                 len += strlen(errormsg);
1443         }
1444         r = ctdbd_allocate_pkt(ctdb, state, CTDB_REPLY_CONTROL, len, 
1445                                struct ctdb_reply_control);
1446         CTDB_NO_MEMORY_VOID(ctdb, r);
1447
1448         r->hdr.reqid     = state->reqid;
1449         r->status        = status;
1450         r->datalen       = data.dsize;
1451         r->errorlen = 0;
1452         memcpy(&r->data[0], data.dptr, data.dsize);
1453         if (errormsg) {
1454                 r->errorlen = strlen(errormsg);
1455                 memcpy(&r->data[r->datalen], errormsg, r->errorlen);
1456         }
1457
1458         ret = daemon_queue_send(client, &r->hdr);
1459         if (ret != -1) {
1460                 talloc_free(state);
1461         }
1462 }
1463
1464 /*
1465   fail all pending controls to a disconnected node
1466  */
1467 void ctdb_daemon_cancel_controls(struct ctdb_context *ctdb, struct ctdb_node *node)
1468 {
1469         struct daemon_control_state *state;
1470         while ((state = node->pending_controls)) {
1471                 DLIST_REMOVE(node->pending_controls, state);
1472                 daemon_control_callback(ctdb, (uint32_t)-1, tdb_null, 
1473                                         "node is disconnected", state);
1474         }
1475 }
1476
1477 /*
1478   destroy a daemon_control_state
1479  */
1480 static int daemon_control_destructor(struct daemon_control_state *state)
1481 {
1482         if (state->node) {
1483                 DLIST_REMOVE(state->node->pending_controls, state);
1484         }
1485         return 0;
1486 }
1487
1488 /*
1489   this is called when the ctdb daemon received a ctdb request control
1490   from a local client over the unix domain socket
1491  */
1492 static void daemon_request_control_from_client(struct ctdb_client *client, 
1493                                                struct ctdb_req_control *c)
1494 {
1495         TDB_DATA data;
1496         int res;
1497         struct daemon_control_state *state;
1498         TALLOC_CTX *tmp_ctx = talloc_new(client);
1499
1500         if (c->hdr.destnode == CTDB_CURRENT_NODE) {
1501                 c->hdr.destnode = client->ctdb->pnn;
1502         }
1503
1504         state = talloc(client, struct daemon_control_state);
1505         CTDB_NO_MEMORY_VOID(client->ctdb, state);
1506
1507         state->client = client;
1508         state->c = talloc_steal(state, c);
1509         state->reqid = c->hdr.reqid;
1510         if (ctdb_validate_pnn(client->ctdb, c->hdr.destnode)) {
1511                 state->node = client->ctdb->nodes[c->hdr.destnode];
1512                 DLIST_ADD(state->node->pending_controls, state);
1513         } else {
1514                 state->node = NULL;
1515         }
1516
1517         talloc_set_destructor(state, daemon_control_destructor);
1518
1519         if (c->flags & CTDB_CTRL_FLAG_NOREPLY) {
1520                 talloc_steal(tmp_ctx, state);
1521         }
1522         
1523         data.dptr = &c->data[0];
1524         data.dsize = c->datalen;
1525         res = ctdb_daemon_send_control(client->ctdb, c->hdr.destnode,
1526                                        c->srvid, c->opcode, client->client_id,
1527                                        c->flags,
1528                                        data, daemon_control_callback,
1529                                        state);
1530         if (res != 0) {
1531                 DEBUG(DEBUG_ERR,(__location__ " Failed to send control to remote node %u\n",
1532                          c->hdr.destnode));
1533         }
1534
1535         talloc_free(tmp_ctx);
1536 }
1537
1538 /*
1539   register a call function
1540 */
1541 int ctdb_daemon_set_call(struct ctdb_context *ctdb, uint32_t db_id,
1542                          ctdb_fn_t fn, int id)
1543 {
1544         struct ctdb_registered_call *call;
1545         struct ctdb_db_context *ctdb_db;
1546
1547         ctdb_db = find_ctdb_db(ctdb, db_id);
1548         if (ctdb_db == NULL) {
1549                 return -1;
1550         }
1551
1552         call = talloc(ctdb_db, struct ctdb_registered_call);
1553         call->fn = fn;
1554         call->id = id;
1555
1556         DLIST_ADD(ctdb_db->calls, call);        
1557         return 0;
1558 }
1559
1560
1561
1562 /*
1563   this local messaging handler is ugly, but is needed to prevent
1564   recursion in ctdb_send_message() when the destination node is the
1565   same as the source node
1566  */
1567 struct ctdb_local_message {
1568         struct ctdb_context *ctdb;
1569         uint64_t srvid;
1570         TDB_DATA data;
1571 };
1572
1573 static void ctdb_local_message_trigger(struct event_context *ev, struct timed_event *te, 
1574                                        struct timeval t, void *private_data)
1575 {
1576         struct ctdb_local_message *m = talloc_get_type(private_data, 
1577                                                        struct ctdb_local_message);
1578         int res;
1579
1580         res = ctdb_dispatch_message(m->ctdb, m->srvid, m->data);
1581         if (res != 0) {
1582                 DEBUG(DEBUG_ERR, (__location__ " Failed to dispatch message for srvid=%llu\n", 
1583                           (unsigned long long)m->srvid));
1584         }
1585         talloc_free(m);
1586 }
1587
1588 static int ctdb_local_message(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data)
1589 {
1590         struct ctdb_local_message *m;
1591         m = talloc(ctdb, struct ctdb_local_message);
1592         CTDB_NO_MEMORY(ctdb, m);
1593
1594         m->ctdb = ctdb;
1595         m->srvid = srvid;
1596         m->data  = data;
1597         m->data.dptr = talloc_memdup(m, m->data.dptr, m->data.dsize);
1598         if (m->data.dptr == NULL) {
1599                 talloc_free(m);
1600                 return -1;
1601         }
1602
1603         /* this needs to be done as an event to prevent recursion */
1604         event_add_timed(ctdb->ev, m, timeval_zero(), ctdb_local_message_trigger, m);
1605         return 0;
1606 }
1607
1608 /*
1609   send a ctdb message
1610 */
1611 int ctdb_daemon_send_message(struct ctdb_context *ctdb, uint32_t pnn,
1612                              uint64_t srvid, TDB_DATA data)
1613 {
1614         struct ctdb_req_message *r;
1615         int len;
1616
1617         if (ctdb->methods == NULL) {
1618                 DEBUG(DEBUG_INFO,(__location__ " Failed to send message. Transport is DOWN\n"));
1619                 return -1;
1620         }
1621
1622         /* see if this is a message to ourselves */
1623         if (pnn == ctdb->pnn) {
1624                 return ctdb_local_message(ctdb, srvid, data);
1625         }
1626
1627         len = offsetof(struct ctdb_req_message, data) + data.dsize;
1628         r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_MESSAGE, len,
1629                                     struct ctdb_req_message);
1630         CTDB_NO_MEMORY(ctdb, r);
1631
1632         r->hdr.destnode  = pnn;
1633         r->srvid         = srvid;
1634         r->datalen       = data.dsize;
1635         memcpy(&r->data[0], data.dptr, data.dsize);
1636
1637         ctdb_queue_packet(ctdb, &r->hdr);
1638
1639         talloc_free(r);
1640         return 0;
1641 }
1642
1643
1644
1645 struct ctdb_client_notify_list {
1646         struct ctdb_client_notify_list *next, *prev;
1647         struct ctdb_context *ctdb;
1648         uint64_t srvid;
1649         TDB_DATA data;
1650 };
1651
1652
1653 static int ctdb_client_notify_destructor(struct ctdb_client_notify_list *nl)
1654 {
1655         int ret;
1656
1657         DEBUG(DEBUG_ERR,("Sending client notify message for srvid:%llu\n", (unsigned long long)nl->srvid));
1658
1659         ret = ctdb_daemon_send_message(nl->ctdb, CTDB_BROADCAST_CONNECTED, (unsigned long long)nl->srvid, nl->data);
1660         if (ret != 0) {
1661                 DEBUG(DEBUG_ERR,("Failed to send client notify message\n"));
1662         }
1663
1664         return 0;
1665 }
1666
1667 int32_t ctdb_control_register_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1668 {
1669         struct ctdb_client_notify_register *notify = (struct ctdb_client_notify_register *)indata.dptr;
1670         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client); 
1671         struct ctdb_client_notify_list *nl;
1672
1673         DEBUG(DEBUG_INFO,("Register srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
1674
1675         if (indata.dsize < offsetof(struct ctdb_client_notify_register, notify_data)) {
1676                 DEBUG(DEBUG_ERR,(__location__ " Too little data in control : %d\n", (int)indata.dsize));
1677                 return -1;
1678         }
1679
1680         if (indata.dsize != (notify->len + offsetof(struct ctdb_client_notify_register, notify_data))) {
1681                 DEBUG(DEBUG_ERR,(__location__ " Wrong amount of data in control. Got %d, expected %d\n", (int)indata.dsize, (int)(notify->len + offsetof(struct ctdb_client_notify_register, notify_data))));
1682                 return -1;
1683         }
1684
1685
1686         if (client == NULL) {
1687                 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1688                 return -1;
1689         }
1690
1691         for(nl=client->notify; nl; nl=nl->next) {
1692                 if (nl->srvid == notify->srvid) {
1693                         break;
1694                 }
1695         }
1696         if (nl != NULL) {
1697                 DEBUG(DEBUG_ERR,(__location__ " Notification for srvid:%llu already exists for this client\n", (unsigned long long)notify->srvid));
1698                 return -1;
1699         }
1700
1701         nl = talloc(client, struct ctdb_client_notify_list);
1702         CTDB_NO_MEMORY(ctdb, nl);
1703         nl->ctdb       = ctdb;
1704         nl->srvid      = notify->srvid;
1705         nl->data.dsize = notify->len;
1706         nl->data.dptr  = talloc_size(nl, nl->data.dsize);
1707         CTDB_NO_MEMORY(ctdb, nl->data.dptr);
1708         memcpy(nl->data.dptr, notify->notify_data, nl->data.dsize);
1709         
1710         DLIST_ADD(client->notify, nl);
1711         talloc_set_destructor(nl, ctdb_client_notify_destructor);
1712
1713         return 0;
1714 }
1715
1716 int32_t ctdb_control_deregister_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1717 {
1718         struct ctdb_client_notify_deregister *notify = (struct ctdb_client_notify_deregister *)indata.dptr;
1719         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client); 
1720         struct ctdb_client_notify_list *nl;
1721
1722         DEBUG(DEBUG_INFO,("Deregister srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
1723
1724         if (client == NULL) {
1725                 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1726                 return -1;
1727         }
1728
1729         for(nl=client->notify; nl; nl=nl->next) {
1730                 if (nl->srvid == notify->srvid) {
1731                         break;
1732                 }
1733         }
1734         if (nl == NULL) {
1735                 DEBUG(DEBUG_ERR,(__location__ " No notification for srvid:%llu found for this client\n", (unsigned long long)notify->srvid));
1736                 return -1;
1737         }
1738
1739         DLIST_REMOVE(client->notify, nl);
1740         talloc_set_destructor(nl, NULL);
1741         talloc_free(nl);
1742
1743         return 0;
1744 }
1745
1746 struct ctdb_client *ctdb_find_client_by_pid(struct ctdb_context *ctdb, pid_t pid)
1747 {
1748         struct ctdb_client_pid_list *client_pid;
1749
1750         for (client_pid = ctdb->client_pids; client_pid; client_pid=client_pid->next) {
1751                 if (client_pid->pid == pid) {
1752                         return client_pid->client;
1753                 }
1754         }
1755         return NULL;
1756 }
1757
1758
1759 /* This control is used by samba when probing if a process (of a samba daemon)
1760    exists on the node.
1761    Samba does this when it needs/wants to check if a subrecord in one of the
1762    databases is still valied, or if it is stale and can be removed.
1763    If the node is in unhealthy or stopped state we just kill of the samba
1764    process holding htis sub-record and return to the calling samba that
1765    the process does not exist.
1766    This allows us to forcefully recall subrecords registered by samba processes
1767    on banned and stopped nodes.
1768 */
1769 int32_t ctdb_control_process_exists(struct ctdb_context *ctdb, pid_t pid)
1770 {
1771         struct ctdb_client *client;
1772
1773         if (ctdb->nodes[ctdb->pnn]->flags & (NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
1774                 client = ctdb_find_client_by_pid(ctdb, pid);
1775                 if (client != NULL) {
1776                         DEBUG(DEBUG_NOTICE,(__location__ " Killing client with pid:%d on banned/stopped node\n", (int)pid));
1777                         talloc_free(client);
1778                 }
1779                 return -1;
1780         }
1781
1782         return kill(pid, 0);
1783 }
1784
1785 int ctdb_control_getnodesfile(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
1786 {
1787         struct ctdb_node_map *node_map = NULL;
1788
1789         CHECK_CONTROL_DATA_SIZE(0);
1790
1791         node_map = ctdb_read_nodes_file(ctdb, ctdb->nodes_file);
1792         if (node_map == NULL) {
1793                 DEBUG(DEBUG_ERR, ("Failed to read nodes file\n"));
1794                 return -1;
1795         }
1796
1797         outdata->dptr  = (unsigned char *)node_map;
1798         outdata->dsize = talloc_get_size(outdata->dptr);
1799
1800         return 0;
1801 }
1802
1803 void ctdb_shutdown_sequence(struct ctdb_context *ctdb, int exit_code)
1804 {
1805         if (ctdb->runstate == CTDB_RUNSTATE_SHUTDOWN) {
1806                 DEBUG(DEBUG_NOTICE,("Already shutting down so will not proceed.\n"));
1807                 return;
1808         }
1809
1810         DEBUG(DEBUG_NOTICE,("Shutdown sequence commencing.\n"));
1811         ctdb_set_runstate(ctdb, CTDB_RUNSTATE_SHUTDOWN);
1812         ctdb_stop_recoverd(ctdb);
1813         ctdb_stop_keepalive(ctdb);
1814         ctdb_stop_monitoring(ctdb);
1815         ctdb_release_all_ips(ctdb);
1816         ctdb_event_script(ctdb, CTDB_EVENT_SHUTDOWN);
1817         if (ctdb->methods != NULL) {
1818                 ctdb->methods->shutdown(ctdb);
1819         }
1820
1821         DEBUG(DEBUG_NOTICE,("Shutdown sequence complete, exiting.\n"));
1822         exit(exit_code);
1823 }