ctdb-daemon: Stop using tevent compatibility definitions
[vlendec/samba-autobuild/.git] / ctdb / server / ctdb_daemon.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2006
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/tdb_wrap/tdb_wrap.h"
22 #include "tdb.h"
23 #include "lib/util/dlinklist.h"
24 #include "system/network.h"
25 #include "system/filesys.h"
26 #include "system/wait.h"
27 #include "../include/ctdb_version.h"
28 #include "../include/ctdb_client.h"
29 #include "../include/ctdb_private.h"
30 #include "../common/rb_tree.h"
31 #include <sys/socket.h>
32 #include "common/reqid.h"
33 #include "common/system.h"
34
35 struct ctdb_client_pid_list {
36         struct ctdb_client_pid_list *next, *prev;
37         struct ctdb_context *ctdb;
38         pid_t pid;
39         struct ctdb_client *client;
40 };
41
42 const char *ctdbd_pidfile = NULL;
43
44 static void daemon_incoming_packet(void *, struct ctdb_req_header *);
45
46 static void print_exit_message(void)
47 {
48         if (debug_extra != NULL && debug_extra[0] != '\0') {
49                 DEBUG(DEBUG_NOTICE,("CTDB %s shutting down\n", debug_extra));
50         } else {
51                 DEBUG(DEBUG_NOTICE,("CTDB daemon shutting down\n"));
52
53                 /* Wait a second to allow pending log messages to be flushed */
54                 sleep(1);
55         }
56 }
57
58
59
60 static void ctdb_time_tick(struct tevent_context *ev, struct tevent_timer *te,
61                                   struct timeval t, void *private_data)
62 {
63         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
64
65         if (getpid() != ctdb->ctdbd_pid) {
66                 return;
67         }
68
69         tevent_add_timer(ctdb->ev, ctdb,
70                          timeval_current_ofs(1, 0),
71                          ctdb_time_tick, ctdb);
72 }
73
74 /* Used to trigger a dummy event once per second, to make
75  * detection of hangs more reliable.
76  */
77 static void ctdb_start_time_tickd(struct ctdb_context *ctdb)
78 {
79         tevent_add_timer(ctdb->ev, ctdb,
80                          timeval_current_ofs(1, 0),
81                          ctdb_time_tick, ctdb);
82 }
83
84 static void ctdb_start_periodic_events(struct ctdb_context *ctdb)
85 {
86         /* start monitoring for connected/disconnected nodes */
87         ctdb_start_keepalive(ctdb);
88
89         /* start periodic update of tcp tickle lists */
90         ctdb_start_tcp_tickle_update(ctdb);
91
92         /* start listening for recovery daemon pings */
93         ctdb_control_recd_ping(ctdb);
94
95         /* start listening to timer ticks */
96         ctdb_start_time_tickd(ctdb);
97 }
98
99 static void ignore_signal(int signum)
100 {
101         struct sigaction act;
102
103         memset(&act, 0, sizeof(act));
104
105         act.sa_handler = SIG_IGN;
106         sigemptyset(&act.sa_mask);
107         sigaddset(&act.sa_mask, signum);
108         sigaction(signum, &act, NULL);
109 }
110
111
112 /*
113   send a packet to a client
114  */
115 static int daemon_queue_send(struct ctdb_client *client, struct ctdb_req_header *hdr)
116 {
117         CTDB_INCREMENT_STAT(client->ctdb, client_packets_sent);
118         if (hdr->operation == CTDB_REQ_MESSAGE) {
119                 if (ctdb_queue_length(client->queue) > client->ctdb->tunable.max_queue_depth_drop_msg) {
120                         DEBUG(DEBUG_ERR,("CTDB_REQ_MESSAGE queue full - killing client connection.\n"));
121                         talloc_free(client);
122                         return -1;
123                 }
124         }
125         return ctdb_queue_send(client->queue, (uint8_t *)hdr, hdr->length);
126 }
127
128 /*
129   message handler for when we are in daemon mode. This redirects the message
130   to the right client
131  */
132 static void daemon_message_handler(uint64_t srvid, TDB_DATA data,
133                                    void *private_data)
134 {
135         struct ctdb_client *client = talloc_get_type(private_data, struct ctdb_client);
136         struct ctdb_req_message *r;
137         int len;
138
139         /* construct a message to send to the client containing the data */
140         len = offsetof(struct ctdb_req_message, data) + data.dsize;
141         r = ctdbd_allocate_pkt(client->ctdb, client->ctdb, CTDB_REQ_MESSAGE,
142                                len, struct ctdb_req_message);
143         CTDB_NO_MEMORY_VOID(client->ctdb, r);
144
145         talloc_set_name_const(r, "req_message packet");
146
147         r->srvid         = srvid;
148         r->datalen       = data.dsize;
149         memcpy(&r->data[0], data.dptr, data.dsize);
150
151         daemon_queue_send(client, &r->hdr);
152
153         talloc_free(r);
154 }
155
156 /*
157   this is called when the ctdb daemon received a ctdb request to 
158   set the srvid from the client
159  */
160 int daemon_register_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
161 {
162         struct ctdb_client *client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
163         int res;
164         if (client == NULL) {
165                 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_register_message_handler\n"));
166                 return -1;
167         }
168         res = srvid_register(ctdb->srv, client, srvid, daemon_message_handler,
169                              client);
170         if (res != 0) {
171                 DEBUG(DEBUG_ERR,(__location__ " Failed to register handler %llu in daemon\n", 
172                          (unsigned long long)srvid));
173         } else {
174                 DEBUG(DEBUG_INFO,(__location__ " Registered message handler for srvid=%llu\n", 
175                          (unsigned long long)srvid));
176         }
177
178         return res;
179 }
180
181 /*
182   this is called when the ctdb daemon received a ctdb request to 
183   remove a srvid from the client
184  */
185 int daemon_deregister_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
186 {
187         struct ctdb_client *client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
188         if (client == NULL) {
189                 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_deregister_message_handler\n"));
190                 return -1;
191         }
192         return srvid_deregister(ctdb->srv, srvid, client);
193 }
194
195 int daemon_check_srvids(struct ctdb_context *ctdb, TDB_DATA indata,
196                         TDB_DATA *outdata)
197 {
198         uint64_t *ids;
199         int i, num_ids;
200         uint8_t *results;
201
202         if ((indata.dsize % sizeof(uint64_t)) != 0) {
203                 DEBUG(DEBUG_ERR, ("Bad indata in daemon_check_srvids, "
204                                   "size=%d\n", (int)indata.dsize));
205                 return -1;
206         }
207
208         ids = (uint64_t *)indata.dptr;
209         num_ids = indata.dsize / 8;
210
211         results = talloc_zero_array(outdata, uint8_t, (num_ids+7)/8);
212         if (results == NULL) {
213                 DEBUG(DEBUG_ERR, ("talloc failed in daemon_check_srvids\n"));
214                 return -1;
215         }
216         for (i=0; i<num_ids; i++) {
217                 if (srvid_exists(ctdb->srv, ids[i]) == 0) {
218                         results[i/8] |= (1 << (i%8));
219                 }
220         }
221         outdata->dptr = (uint8_t *)results;
222         outdata->dsize = talloc_get_size(results);
223         return 0;
224 }
225
226 /*
227   destroy a ctdb_client
228 */
229 static int ctdb_client_destructor(struct ctdb_client *client)
230 {
231         struct ctdb_db_context *ctdb_db;
232
233         ctdb_takeover_client_destructor_hook(client);
234         reqid_remove(client->ctdb->idr, client->client_id);
235         client->ctdb->num_clients--;
236
237         if (client->num_persistent_updates != 0) {
238                 DEBUG(DEBUG_ERR,(__location__ " Client disconnecting with %u persistent updates in flight. Starting recovery\n", client->num_persistent_updates));
239                 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
240         }
241         ctdb_db = find_ctdb_db(client->ctdb, client->db_id);
242         if (ctdb_db) {
243                 DEBUG(DEBUG_ERR, (__location__ " client exit while transaction "
244                                   "commit active. Forcing recovery.\n"));
245                 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
246
247                 /*
248                  * trans3 transaction state:
249                  *
250                  * The destructor sets the pointer to NULL.
251                  */
252                 talloc_free(ctdb_db->persistent_state);
253         }
254
255         return 0;
256 }
257
258
259 /*
260   this is called when the ctdb daemon received a ctdb request message
261   from a local client over the unix domain socket
262  */
263 static void daemon_request_message_from_client(struct ctdb_client *client, 
264                                                struct ctdb_req_message *c)
265 {
266         TDB_DATA data;
267         int res;
268
269         if (c->hdr.destnode == CTDB_CURRENT_NODE) {
270                 c->hdr.destnode = ctdb_get_pnn(client->ctdb);
271         }
272
273         /* maybe the message is for another client on this node */
274         if (ctdb_get_pnn(client->ctdb)==c->hdr.destnode) {
275                 ctdb_request_message(client->ctdb, (struct ctdb_req_header *)c);
276                 return;
277         }
278
279         /* its for a remote node */
280         data.dptr = &c->data[0];
281         data.dsize = c->datalen;
282         res = ctdb_daemon_send_message(client->ctdb, c->hdr.destnode,
283                                        c->srvid, data);
284         if (res != 0) {
285                 DEBUG(DEBUG_ERR,(__location__ " Failed to send message to remote node %u\n",
286                          c->hdr.destnode));
287         }
288 }
289
290
291 struct daemon_call_state {
292         struct ctdb_client *client;
293         uint32_t reqid;
294         struct ctdb_call *call;
295         struct timeval start_time;
296
297         /* readonly request ? */
298         uint32_t readonly_fetch;
299         uint32_t client_callid;
300 };
301
302 /* 
303    complete a call from a client 
304 */
305 static void daemon_call_from_client_callback(struct ctdb_call_state *state)
306 {
307         struct daemon_call_state *dstate = talloc_get_type(state->async.private_data, 
308                                                            struct daemon_call_state);
309         struct ctdb_reply_call *r;
310         int res;
311         uint32_t length;
312         struct ctdb_client *client = dstate->client;
313         struct ctdb_db_context *ctdb_db = state->ctdb_db;
314
315         talloc_steal(client, dstate);
316         talloc_steal(dstate, dstate->call);
317
318         res = ctdb_daemon_call_recv(state, dstate->call);
319         if (res != 0) {
320                 DEBUG(DEBUG_ERR, (__location__ " ctdbd_call_recv() returned error\n"));
321                 CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
322
323                 CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 1", call_latency, dstate->start_time);
324                 return;
325         }
326
327         length = offsetof(struct ctdb_reply_call, data) + dstate->call->reply_data.dsize;
328         /* If the client asked for readonly FETCH, we remapped this to 
329            FETCH_WITH_HEADER when calling the daemon. So we must
330            strip the extra header off the reply data before passing
331            it back to the client.
332         */
333         if (dstate->readonly_fetch
334         && dstate->client_callid == CTDB_FETCH_FUNC) {
335                 length -= sizeof(struct ctdb_ltdb_header);
336         }
337
338         r = ctdbd_allocate_pkt(client->ctdb, dstate, CTDB_REPLY_CALL, 
339                                length, struct ctdb_reply_call);
340         if (r == NULL) {
341                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate reply_call in ctdb daemon\n"));
342                 CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
343                 CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 2", call_latency, dstate->start_time);
344                 return;
345         }
346         r->hdr.reqid        = dstate->reqid;
347         r->status           = dstate->call->status;
348
349         if (dstate->readonly_fetch
350         && dstate->client_callid == CTDB_FETCH_FUNC) {
351                 /* client only asked for a FETCH so we must strip off
352                    the extra ctdb_ltdb header
353                 */
354                 r->datalen          = dstate->call->reply_data.dsize - sizeof(struct ctdb_ltdb_header);
355                 memcpy(&r->data[0], dstate->call->reply_data.dptr + sizeof(struct ctdb_ltdb_header), r->datalen);
356         } else {
357                 r->datalen          = dstate->call->reply_data.dsize;
358                 memcpy(&r->data[0], dstate->call->reply_data.dptr, r->datalen);
359         }
360
361         res = daemon_queue_send(client, &r->hdr);
362         if (res == -1) {
363                 /* client is dead - return immediately */
364                 return;
365         }
366         if (res != 0) {
367                 DEBUG(DEBUG_ERR, (__location__ " Failed to queue packet from daemon to client\n"));
368         }
369         CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 3", call_latency, dstate->start_time);
370         CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
371         talloc_free(dstate);
372 }
373
374 struct ctdb_daemon_packet_wrap {
375         struct ctdb_context *ctdb;
376         uint32_t client_id;
377 };
378
379 /*
380   a wrapper to catch disconnected clients
381  */
382 static void daemon_incoming_packet_wrap(void *p, struct ctdb_req_header *hdr)
383 {
384         struct ctdb_client *client;
385         struct ctdb_daemon_packet_wrap *w = talloc_get_type(p, 
386                                                             struct ctdb_daemon_packet_wrap);
387         if (w == NULL) {
388                 DEBUG(DEBUG_CRIT,(__location__ " Bad packet type '%s'\n", talloc_get_name(p)));
389                 return;
390         }
391
392         client = reqid_find(w->ctdb->idr, w->client_id, struct ctdb_client);
393         if (client == NULL) {
394                 DEBUG(DEBUG_ERR,(__location__ " Packet for disconnected client %u\n",
395                          w->client_id));
396                 talloc_free(w);
397                 return;
398         }
399         talloc_free(w);
400
401         /* process it */
402         daemon_incoming_packet(client, hdr);    
403 }
404
405 struct ctdb_deferred_fetch_call {
406         struct ctdb_deferred_fetch_call *next, *prev;
407         struct ctdb_req_call *c;
408         struct ctdb_daemon_packet_wrap *w;
409 };
410
411 struct ctdb_deferred_fetch_queue {
412         struct ctdb_deferred_fetch_call *deferred_calls;
413 };
414
415 struct ctdb_deferred_requeue {
416         struct ctdb_deferred_fetch_call *dfc;
417         struct ctdb_client *client;
418 };
419
420 /* called from a timer event and starts reprocessing the deferred call.*/
421 static void reprocess_deferred_call(struct tevent_context *ev,
422                                     struct tevent_timer *te,
423                                     struct timeval t, void *private_data)
424 {
425         struct ctdb_deferred_requeue *dfr = (struct ctdb_deferred_requeue *)private_data;
426         struct ctdb_client *client = dfr->client;
427
428         talloc_steal(client, dfr->dfc->c);
429         daemon_incoming_packet(client, (struct ctdb_req_header *)dfr->dfc->c);
430         talloc_free(dfr);
431 }
432
433 /* the referral context is destroyed either after a timeout or when the initial
434    fetch-lock has finished.
435    at this stage, immediately start reprocessing the queued up deferred
436    calls so they get reprocessed immediately (and since we are dmaster at
437    this stage, trigger the waiting smbd processes to pick up and aquire the
438    record right away.
439 */
440 static int deferred_fetch_queue_destructor(struct ctdb_deferred_fetch_queue *dfq)
441 {
442
443         /* need to reprocess the packets from the queue explicitely instead of
444            just using a normal destructor since we want, need, to
445            call the clients in the same oder as the requests queued up
446         */
447         while (dfq->deferred_calls != NULL) {
448                 struct ctdb_client *client;
449                 struct ctdb_deferred_fetch_call *dfc = dfq->deferred_calls;
450                 struct ctdb_deferred_requeue *dfr;
451
452                 DLIST_REMOVE(dfq->deferred_calls, dfc);
453
454                 client = reqid_find(dfc->w->ctdb->idr, dfc->w->client_id, struct ctdb_client);
455                 if (client == NULL) {
456                         DEBUG(DEBUG_ERR,(__location__ " Packet for disconnected client %u\n",
457                                  dfc->w->client_id));
458                         continue;
459                 }
460
461                 /* process it by pushing it back onto the eventloop */
462                 dfr = talloc(client, struct ctdb_deferred_requeue);
463                 if (dfr == NULL) {
464                         DEBUG(DEBUG_ERR,("Failed to allocate deferred fetch requeue structure\n"));
465                         continue;
466                 }
467
468                 dfr->dfc    = talloc_steal(dfr, dfc);
469                 dfr->client = client;
470
471                 tevent_add_timer(dfc->w->ctdb->ev, client, timeval_zero(),
472                                  reprocess_deferred_call, dfr);
473         }
474
475         return 0;
476 }
477
478 /* insert the new deferral context into the rb tree.
479    there should never be a pre-existing context here, but check for it
480    warn and destroy the previous context if there is already a deferral context
481    for this key.
482 */
483 static void *insert_dfq_callback(void *parm, void *data)
484 {
485         if (data) {
486                 DEBUG(DEBUG_ERR,("Already have DFQ registered. Free old %p and create new %p\n", data, parm));
487                 talloc_free(data);
488         }
489         return parm;
490 }
491
492 /* if the original fetch-lock did not complete within a reasonable time,
493    free the context and context for all deferred requests to cause them to be
494    re-inserted into the event system.
495 */
496 static void dfq_timeout(struct tevent_context *ev, struct tevent_timer *te,
497                         struct timeval t, void *private_data)
498 {
499         talloc_free(private_data);
500 }
501
502 /* This function is used in the local daemon to register a KEY in a database
503    for being "fetched"
504    While the remote fetch is in-flight, any futher attempts to re-fetch the
505    same record will be deferred until the fetch completes.
506 */
507 static int setup_deferred_fetch_locks(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
508 {
509         uint32_t *k;
510         struct ctdb_deferred_fetch_queue *dfq;
511
512         k = ctdb_key_to_idkey(call, call->key);
513         if (k == NULL) {
514                 DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch\n"));
515                 return -1;
516         }
517
518         dfq  = talloc(call, struct ctdb_deferred_fetch_queue);
519         if (dfq == NULL) {
520                 DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch queue structure\n"));
521                 talloc_free(k);
522                 return -1;
523         }
524         dfq->deferred_calls = NULL;
525
526         trbt_insertarray32_callback(ctdb_db->deferred_fetch, k[0], &k[0], insert_dfq_callback, dfq);
527
528         talloc_set_destructor(dfq, deferred_fetch_queue_destructor);
529
530         /* if the fetch havent completed in 30 seconds, just tear it all down
531            and let it try again as the events are reissued */
532         tevent_add_timer(ctdb_db->ctdb->ev, dfq, timeval_current_ofs(30, 0),
533                          dfq_timeout, dfq);
534
535         talloc_free(k);
536         return 0;
537 }
538
539 /* check if this is a duplicate request to a fetch already in-flight
540    if it is, make this call deferred to be reprocessed later when
541    the in-flight fetch completes.
542 */
543 static int requeue_duplicate_fetch(struct ctdb_db_context *ctdb_db, struct ctdb_client *client, TDB_DATA key, struct ctdb_req_call *c)
544 {
545         uint32_t *k;
546         struct ctdb_deferred_fetch_queue *dfq;
547         struct ctdb_deferred_fetch_call *dfc;
548
549         k = ctdb_key_to_idkey(c, key);
550         if (k == NULL) {
551                 DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch\n"));
552                 return -1;
553         }
554
555         dfq = trbt_lookuparray32(ctdb_db->deferred_fetch, k[0], &k[0]);
556         if (dfq == NULL) {
557                 talloc_free(k);
558                 return -1;
559         }
560
561
562         talloc_free(k);
563
564         dfc = talloc(dfq, struct ctdb_deferred_fetch_call);
565         if (dfc == NULL) {
566                 DEBUG(DEBUG_ERR, ("Failed to allocate deferred fetch call structure\n"));
567                 return -1;
568         }
569
570         dfc->w = talloc(dfc, struct ctdb_daemon_packet_wrap);
571         if (dfc->w == NULL) {
572                 DEBUG(DEBUG_ERR,("Failed to allocate deferred fetch daemon packet wrap structure\n"));
573                 talloc_free(dfc);
574                 return -1;
575         }
576
577         dfc->c = talloc_steal(dfc, c);
578         dfc->w->ctdb = ctdb_db->ctdb;
579         dfc->w->client_id = client->client_id;
580
581         DLIST_ADD_END(dfq->deferred_calls, dfc, NULL);
582
583         return 0;
584 }
585
586
587 /*
588   this is called when the ctdb daemon received a ctdb request call
589   from a local client over the unix domain socket
590  */
591 static void daemon_request_call_from_client(struct ctdb_client *client, 
592                                             struct ctdb_req_call *c)
593 {
594         struct ctdb_call_state *state;
595         struct ctdb_db_context *ctdb_db;
596         struct daemon_call_state *dstate;
597         struct ctdb_call *call;
598         struct ctdb_ltdb_header header;
599         TDB_DATA key, data;
600         int ret;
601         struct ctdb_context *ctdb = client->ctdb;
602         struct ctdb_daemon_packet_wrap *w;
603
604         CTDB_INCREMENT_STAT(ctdb, total_calls);
605         CTDB_INCREMENT_STAT(ctdb, pending_calls);
606
607         ctdb_db = find_ctdb_db(client->ctdb, c->db_id);
608         if (!ctdb_db) {
609                 DEBUG(DEBUG_ERR, (__location__ " Unknown database in request. db_id==0x%08x",
610                           c->db_id));
611                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
612                 return;
613         }
614
615         if (ctdb_db->unhealthy_reason) {
616                 /*
617                  * this is just a warning, as the tdb should be empty anyway,
618                  * and only persistent databases can be unhealthy, which doesn't
619                  * use this code patch
620                  */
621                 DEBUG(DEBUG_WARNING,("warn: db(%s) unhealty in daemon_request_call_from_client(): %s\n",
622                                      ctdb_db->db_name, ctdb_db->unhealthy_reason));
623         }
624
625         key.dptr = c->data;
626         key.dsize = c->keylen;
627
628         w = talloc(ctdb, struct ctdb_daemon_packet_wrap);
629         CTDB_NO_MEMORY_VOID(ctdb, w);   
630
631         w->ctdb = ctdb;
632         w->client_id = client->client_id;
633
634         ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header, 
635                                            (struct ctdb_req_header *)c, &data,
636                                            daemon_incoming_packet_wrap, w, true);
637         if (ret == -2) {
638                 /* will retry later */
639                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
640                 return;
641         }
642
643         talloc_free(w);
644
645         if (ret != 0) {
646                 DEBUG(DEBUG_ERR,(__location__ " Unable to fetch record\n"));
647                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
648                 return;
649         }
650
651
652         /* check if this fetch request is a duplicate for a
653            request we already have in flight. If so defer it until
654            the first request completes.
655         */
656         if (ctdb->tunable.fetch_collapse == 1) {
657                 if (requeue_duplicate_fetch(ctdb_db, client, key, c) == 0) {
658                         ret = ctdb_ltdb_unlock(ctdb_db, key);
659                         if (ret != 0) {
660                                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
661                         }
662                         CTDB_DECREMENT_STAT(ctdb, pending_calls);
663                         return;
664                 }
665         }
666
667         /* Dont do READONLY if we dont have a tracking database */
668         if ((c->flags & CTDB_WANT_READONLY) && !ctdb_db->readonly) {
669                 c->flags &= ~CTDB_WANT_READONLY;
670         }
671
672         if (header.flags & CTDB_REC_RO_REVOKE_COMPLETE) {
673                 header.flags &= ~CTDB_REC_RO_FLAGS;
674                 CTDB_INCREMENT_STAT(ctdb, total_ro_revokes);
675                 CTDB_INCREMENT_DB_STAT(ctdb_db, db_ro_revokes);
676                 if (ctdb_ltdb_store(ctdb_db, key, &header, data) != 0) {
677                         ctdb_fatal(ctdb, "Failed to write header with cleared REVOKE flag");
678                 }
679                 /* and clear out the tracking data */
680                 if (tdb_delete(ctdb_db->rottdb, key) != 0) {
681                         DEBUG(DEBUG_ERR,(__location__ " Failed to clear out trackingdb record\n"));
682                 }
683         }
684
685         /* if we are revoking, we must defer all other calls until the revoke
686          * had completed.
687          */
688         if (header.flags & CTDB_REC_RO_REVOKING_READONLY) {
689                 talloc_free(data.dptr);
690                 ret = ctdb_ltdb_unlock(ctdb_db, key);
691
692                 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, key, (struct ctdb_req_header *)c, daemon_incoming_packet, client) != 0) {
693                         ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
694                 }
695                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
696                 return;
697         }
698
699         if ((header.dmaster == ctdb->pnn)
700         && (!(c->flags & CTDB_WANT_READONLY))
701         && (header.flags & (CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY)) ) {
702                 header.flags   |= CTDB_REC_RO_REVOKING_READONLY;
703                 if (ctdb_ltdb_store(ctdb_db, key, &header, data) != 0) {
704                         ctdb_fatal(ctdb, "Failed to store record with HAVE_DELEGATIONS set");
705                 }
706                 ret = ctdb_ltdb_unlock(ctdb_db, key);
707
708                 if (ctdb_start_revoke_ro_record(ctdb, ctdb_db, key, &header, data) != 0) {
709                         ctdb_fatal(ctdb, "Failed to start record revoke");
710                 }
711                 talloc_free(data.dptr);
712
713                 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, key, (struct ctdb_req_header *)c, daemon_incoming_packet, client) != 0) {
714                         ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
715                 }
716
717                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
718                 return;
719         }               
720
721         dstate = talloc(client, struct daemon_call_state);
722         if (dstate == NULL) {
723                 ret = ctdb_ltdb_unlock(ctdb_db, key);
724                 if (ret != 0) {
725                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
726                 }
727
728                 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate dstate\n"));
729                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
730                 return;
731         }
732         dstate->start_time = timeval_current();
733         dstate->client = client;
734         dstate->reqid  = c->hdr.reqid;
735         talloc_steal(dstate, data.dptr);
736
737         call = dstate->call = talloc_zero(dstate, struct ctdb_call);
738         if (call == NULL) {
739                 ret = ctdb_ltdb_unlock(ctdb_db, key);
740                 if (ret != 0) {
741                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
742                 }
743
744                 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate call\n"));
745                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
746                 CTDB_UPDATE_LATENCY(ctdb, ctdb_db, "call_from_client 1", call_latency, dstate->start_time);
747                 return;
748         }
749
750         dstate->readonly_fetch = 0;
751         call->call_id = c->callid;
752         call->key = key;
753         call->call_data.dptr = c->data + c->keylen;
754         call->call_data.dsize = c->calldatalen;
755         call->flags = c->flags;
756
757         if (c->flags & CTDB_WANT_READONLY) {
758                 /* client wants readonly record, so translate this into a 
759                    fetch with header. remember what the client asked for
760                    so we can remap the reply back to the proper format for
761                    the client in the reply
762                  */
763                 dstate->client_callid = call->call_id;
764                 call->call_id = CTDB_FETCH_WITH_HEADER_FUNC;
765                 dstate->readonly_fetch = 1;
766         }
767
768         if (header.dmaster == ctdb->pnn) {
769                 state = ctdb_call_local_send(ctdb_db, call, &header, &data);
770         } else {
771                 state = ctdb_daemon_call_send_remote(ctdb_db, call, &header);
772                 if (ctdb->tunable.fetch_collapse == 1) {
773                         /* This request triggered a remote fetch-lock.
774                            set up a deferral for this key so any additional
775                            fetch-locks are deferred until the current one
776                            finishes.
777                          */
778                         setup_deferred_fetch_locks(ctdb_db, call);
779                 }
780         }
781
782         ret = ctdb_ltdb_unlock(ctdb_db, key);
783         if (ret != 0) {
784                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
785         }
786
787         if (state == NULL) {
788                 DEBUG(DEBUG_ERR,(__location__ " Unable to setup call send\n"));
789                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
790                 CTDB_UPDATE_LATENCY(ctdb, ctdb_db, "call_from_client 2", call_latency, dstate->start_time);
791                 return;
792         }
793         talloc_steal(state, dstate);
794         talloc_steal(client, state);
795
796         state->async.fn = daemon_call_from_client_callback;
797         state->async.private_data = dstate;
798 }
799
800
801 static void daemon_request_control_from_client(struct ctdb_client *client, 
802                                                struct ctdb_req_control *c);
803
804 /* data contains a packet from the client */
805 static void daemon_incoming_packet(void *p, struct ctdb_req_header *hdr)
806 {
807         struct ctdb_client *client = talloc_get_type(p, struct ctdb_client);
808         TALLOC_CTX *tmp_ctx;
809         struct ctdb_context *ctdb = client->ctdb;
810
811         /* place the packet as a child of a tmp_ctx. We then use
812            talloc_free() below to free it. If any of the calls want
813            to keep it, then they will steal it somewhere else, and the
814            talloc_free() will be a no-op */
815         tmp_ctx = talloc_new(client);
816         talloc_steal(tmp_ctx, hdr);
817
818         if (hdr->ctdb_magic != CTDB_MAGIC) {
819                 ctdb_set_error(client->ctdb, "Non CTDB packet rejected in daemon\n");
820                 goto done;
821         }
822
823         if (hdr->ctdb_version != CTDB_PROTOCOL) {
824                 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
825                 goto done;
826         }
827
828         switch (hdr->operation) {
829         case CTDB_REQ_CALL:
830                 CTDB_INCREMENT_STAT(ctdb, client.req_call);
831                 daemon_request_call_from_client(client, (struct ctdb_req_call *)hdr);
832                 break;
833
834         case CTDB_REQ_MESSAGE:
835                 CTDB_INCREMENT_STAT(ctdb, client.req_message);
836                 daemon_request_message_from_client(client, (struct ctdb_req_message *)hdr);
837                 break;
838
839         case CTDB_REQ_CONTROL:
840                 CTDB_INCREMENT_STAT(ctdb, client.req_control);
841                 daemon_request_control_from_client(client, (struct ctdb_req_control *)hdr);
842                 break;
843
844         default:
845                 DEBUG(DEBUG_CRIT,(__location__ " daemon: unrecognized operation %u\n",
846                          hdr->operation));
847         }
848
849 done:
850         talloc_free(tmp_ctx);
851 }
852
853 /*
854   called when the daemon gets a incoming packet
855  */
856 static void ctdb_daemon_read_cb(uint8_t *data, size_t cnt, void *args)
857 {
858         struct ctdb_client *client = talloc_get_type(args, struct ctdb_client);
859         struct ctdb_req_header *hdr;
860
861         if (cnt == 0) {
862                 talloc_free(client);
863                 return;
864         }
865
866         CTDB_INCREMENT_STAT(client->ctdb, client_packets_recv);
867
868         if (cnt < sizeof(*hdr)) {
869                 ctdb_set_error(client->ctdb, "Bad packet length %u in daemon\n", 
870                                (unsigned)cnt);
871                 return;
872         }
873         hdr = (struct ctdb_req_header *)data;
874         if (cnt != hdr->length) {
875                 ctdb_set_error(client->ctdb, "Bad header length %u expected %u\n in daemon", 
876                                (unsigned)hdr->length, (unsigned)cnt);
877                 return;
878         }
879
880         if (hdr->ctdb_magic != CTDB_MAGIC) {
881                 ctdb_set_error(client->ctdb, "Non CTDB packet rejected\n");
882                 return;
883         }
884
885         if (hdr->ctdb_version != CTDB_PROTOCOL) {
886                 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
887                 return;
888         }
889
890         DEBUG(DEBUG_DEBUG,(__location__ " client request %u of type %u length %u from "
891                  "node %u to %u\n", hdr->reqid, hdr->operation, hdr->length,
892                  hdr->srcnode, hdr->destnode));
893
894         /* it is the responsibility of the incoming packet function to free 'data' */
895         daemon_incoming_packet(client, hdr);
896 }
897
898
899 static int ctdb_clientpid_destructor(struct ctdb_client_pid_list *client_pid)
900 {
901         if (client_pid->ctdb->client_pids != NULL) {
902                 DLIST_REMOVE(client_pid->ctdb->client_pids, client_pid);
903         }
904
905         return 0;
906 }
907
908
909 static void ctdb_accept_client(struct tevent_context *ev,
910                                struct tevent_fd *fde, uint16_t flags,
911                                void *private_data)
912 {
913         struct sockaddr_un addr;
914         socklen_t len;
915         int fd;
916         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
917         struct ctdb_client *client;
918         struct ctdb_client_pid_list *client_pid;
919         pid_t peer_pid = 0;
920
921         memset(&addr, 0, sizeof(addr));
922         len = sizeof(addr);
923         fd = accept(ctdb->daemon.sd, (struct sockaddr *)&addr, &len);
924         if (fd == -1) {
925                 return;
926         }
927
928         set_nonblocking(fd);
929         set_close_on_exec(fd);
930
931         DEBUG(DEBUG_DEBUG,(__location__ " Created SOCKET FD:%d to connected child\n", fd));
932
933         client = talloc_zero(ctdb, struct ctdb_client);
934         if (ctdb_get_peer_pid(fd, &peer_pid) == 0) {
935                 DEBUG(DEBUG_INFO,("Connected client with pid:%u\n", (unsigned)peer_pid));
936         }
937
938         client->ctdb = ctdb;
939         client->fd = fd;
940         client->client_id = reqid_new(ctdb->idr, client);
941         client->pid = peer_pid;
942
943         client_pid = talloc(client, struct ctdb_client_pid_list);
944         if (client_pid == NULL) {
945                 DEBUG(DEBUG_ERR,("Failed to allocate client pid structure\n"));
946                 close(fd);
947                 talloc_free(client);
948                 return;
949         }               
950         client_pid->ctdb   = ctdb;
951         client_pid->pid    = peer_pid;
952         client_pid->client = client;
953
954         DLIST_ADD(ctdb->client_pids, client_pid);
955
956         client->queue = ctdb_queue_setup(ctdb, client, fd, CTDB_DS_ALIGNMENT, 
957                                          ctdb_daemon_read_cb, client,
958                                          "client-%u", client->pid);
959
960         talloc_set_destructor(client, ctdb_client_destructor);
961         talloc_set_destructor(client_pid, ctdb_clientpid_destructor);
962         ctdb->num_clients++;
963 }
964
965
966
967 /*
968   create a unix domain socket and bind it
969   return a file descriptor open on the socket 
970 */
971 static int ux_socket_bind(struct ctdb_context *ctdb)
972 {
973         struct sockaddr_un addr;
974
975         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
976         if (ctdb->daemon.sd == -1) {
977                 return -1;
978         }
979
980         memset(&addr, 0, sizeof(addr));
981         addr.sun_family = AF_UNIX;
982         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path)-1);
983
984         /* First check if an old ctdbd might be running */
985         if (connect(ctdb->daemon.sd,
986                     (struct sockaddr *)&addr, sizeof(addr)) == 0) {
987                 DEBUG(DEBUG_CRIT,
988                       ("Something is already listening on ctdb socket '%s'\n",
989                        ctdb->daemon.name));
990                 goto failed;
991         }
992
993         /* Remove any old socket */
994         unlink(ctdb->daemon.name);
995
996         set_close_on_exec(ctdb->daemon.sd);
997         set_nonblocking(ctdb->daemon.sd);
998
999         if (bind(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
1000                 DEBUG(DEBUG_CRIT,("Unable to bind on ctdb socket '%s'\n", ctdb->daemon.name));
1001                 goto failed;
1002         }
1003
1004         if (chown(ctdb->daemon.name, geteuid(), getegid()) != 0 ||
1005             chmod(ctdb->daemon.name, 0700) != 0) {
1006                 DEBUG(DEBUG_CRIT,("Unable to secure ctdb socket '%s', ctdb->daemon.name\n", ctdb->daemon.name));
1007                 goto failed;
1008         }
1009
1010
1011         if (listen(ctdb->daemon.sd, 100) != 0) {
1012                 DEBUG(DEBUG_CRIT,("Unable to listen on ctdb socket '%s'\n", ctdb->daemon.name));
1013                 goto failed;
1014         }
1015
1016         return 0;
1017
1018 failed:
1019         close(ctdb->daemon.sd);
1020         ctdb->daemon.sd = -1;
1021         return -1;      
1022 }
1023
1024 static void initialise_node_flags (struct ctdb_context *ctdb)
1025 {
1026         if (ctdb->pnn == -1) {
1027                 ctdb_fatal(ctdb, "PNN is set to -1 (unknown value)");
1028         }
1029
1030         ctdb->nodes[ctdb->pnn]->flags &= ~NODE_FLAGS_DISCONNECTED;
1031
1032         /* do we start out in DISABLED mode? */
1033         if (ctdb->start_as_disabled != 0) {
1034                 DEBUG(DEBUG_NOTICE, ("This node is configured to start in DISABLED state\n"));
1035                 ctdb->nodes[ctdb->pnn]->flags |= NODE_FLAGS_DISABLED;
1036         }
1037         /* do we start out in STOPPED mode? */
1038         if (ctdb->start_as_stopped != 0) {
1039                 DEBUG(DEBUG_NOTICE, ("This node is configured to start in STOPPED state\n"));
1040                 ctdb->nodes[ctdb->pnn]->flags |= NODE_FLAGS_STOPPED;
1041         }
1042 }
1043
1044 static void ctdb_setup_event_callback(struct ctdb_context *ctdb, int status,
1045                                       void *private_data)
1046 {
1047         if (status != 0) {
1048                 ctdb_die(ctdb, "Failed to run setup event");
1049         }
1050         ctdb_run_notification_script(ctdb, "setup");
1051
1052         /* tell all other nodes we've just started up */
1053         ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL,
1054                                  0, CTDB_CONTROL_STARTUP, 0,
1055                                  CTDB_CTRL_FLAG_NOREPLY,
1056                                  tdb_null, NULL, NULL);
1057
1058         /* Start the recovery daemon */
1059         if (ctdb_start_recoverd(ctdb) != 0) {
1060                 DEBUG(DEBUG_ALERT,("Failed to start recovery daemon\n"));
1061                 exit(11);
1062         }
1063
1064         ctdb_start_periodic_events(ctdb);
1065
1066         ctdb_wait_for_first_recovery(ctdb);
1067 }
1068
1069 static struct timeval tevent_before_wait_ts;
1070 static struct timeval tevent_after_wait_ts;
1071
1072 static void ctdb_tevent_trace(enum tevent_trace_point tp,
1073                               void *private_data)
1074 {
1075         struct timeval diff;
1076         struct timeval now;
1077         struct ctdb_context *ctdb =
1078                 talloc_get_type(private_data, struct ctdb_context);
1079
1080         if (getpid() != ctdb->ctdbd_pid) {
1081                 return;
1082         }
1083
1084         now = timeval_current();
1085
1086         switch (tp) {
1087         case TEVENT_TRACE_BEFORE_WAIT:
1088                 if (!timeval_is_zero(&tevent_after_wait_ts)) {
1089                         diff = timeval_until(&tevent_after_wait_ts, &now);
1090                         if (diff.tv_sec > 3) {
1091                                 DEBUG(DEBUG_ERR,
1092                                       ("Handling event took %ld seconds!\n",
1093                                        (long)diff.tv_sec));
1094                         }
1095                 }
1096                 tevent_before_wait_ts = now;
1097                 break;
1098
1099         case TEVENT_TRACE_AFTER_WAIT:
1100                 if (!timeval_is_zero(&tevent_before_wait_ts)) {
1101                         diff = timeval_until(&tevent_before_wait_ts, &now);
1102                         if (diff.tv_sec > 3) {
1103                                 DEBUG(DEBUG_CRIT,
1104                                       ("No event for %ld seconds!\n",
1105                                        (long)diff.tv_sec));
1106                         }
1107                 }
1108                 tevent_after_wait_ts = now;
1109                 break;
1110
1111         default:
1112                 /* Do nothing for future tevent trace points */ ;
1113         }
1114 }
1115
1116 static void ctdb_remove_pidfile(void)
1117 {
1118         /* Only the main ctdbd's PID matches the SID */
1119         if (ctdbd_pidfile != NULL && getsid(0) == getpid()) {
1120                 if (unlink(ctdbd_pidfile) == 0) {
1121                         DEBUG(DEBUG_NOTICE, ("Removed PID file %s\n",
1122                                              ctdbd_pidfile));
1123                 } else {
1124                         DEBUG(DEBUG_WARNING, ("Failed to Remove PID file %s\n",
1125                                               ctdbd_pidfile));
1126                 }
1127         }
1128 }
1129
1130 static void ctdb_create_pidfile(pid_t pid)
1131 {
1132         if (ctdbd_pidfile != NULL) {
1133                 FILE *fp;
1134
1135                 fp = fopen(ctdbd_pidfile, "w");
1136                 if (fp == NULL) {
1137                         DEBUG(DEBUG_ALERT,
1138                               ("Failed to open PID file %s\n", ctdbd_pidfile));
1139                         exit(11);
1140                 }
1141
1142                 fprintf(fp, "%d\n", pid);
1143                 fclose(fp);
1144                 DEBUG(DEBUG_NOTICE, ("Created PID file %s\n", ctdbd_pidfile));
1145                 atexit(ctdb_remove_pidfile);
1146         }
1147 }
1148
1149 static void ctdb_initialise_vnn_map(struct ctdb_context *ctdb)
1150 {
1151         int i, j, count;
1152
1153         /* initialize the vnn mapping table, skipping any deleted nodes */
1154         ctdb->vnn_map = talloc(ctdb, struct ctdb_vnn_map);
1155         CTDB_NO_MEMORY_FATAL(ctdb, ctdb->vnn_map);
1156
1157         count = 0;
1158         for (i = 0; i < ctdb->num_nodes; i++) {
1159                 if ((ctdb->nodes[i]->flags & NODE_FLAGS_DELETED) == 0) {
1160                         count++;
1161                 }
1162         }
1163
1164         ctdb->vnn_map->generation = INVALID_GENERATION;
1165         ctdb->vnn_map->size = count;
1166         ctdb->vnn_map->map = talloc_array(ctdb->vnn_map, uint32_t, ctdb->vnn_map->size);
1167         CTDB_NO_MEMORY_FATAL(ctdb, ctdb->vnn_map->map);
1168
1169         for(i=0, j=0; i < ctdb->vnn_map->size; i++) {
1170                 if (ctdb->nodes[i]->flags & NODE_FLAGS_DELETED) {
1171                         continue;
1172                 }
1173                 ctdb->vnn_map->map[j] = i;
1174                 j++;
1175         }
1176 }
1177
1178 static void ctdb_set_my_pnn(struct ctdb_context *ctdb)
1179 {
1180         int nodeid;
1181
1182         if (ctdb->address == NULL) {
1183                 ctdb_fatal(ctdb,
1184                            "Can not determine PNN - node address is not set\n");
1185         }
1186
1187         nodeid = ctdb_ip_to_nodeid(ctdb, ctdb->address);
1188         if (nodeid == -1) {
1189                 ctdb_fatal(ctdb,
1190                            "Can not determine PNN - node address not found in node list\n");
1191         }
1192
1193         ctdb->pnn = ctdb->nodes[nodeid]->pnn;
1194         DEBUG(DEBUG_NOTICE, ("PNN is %u\n", ctdb->pnn));
1195 }
1196
1197 /*
1198   start the protocol going as a daemon
1199 */
1200 int ctdb_start_daemon(struct ctdb_context *ctdb, bool do_fork)
1201 {
1202         int res, ret = -1;
1203         struct tevent_fd *fde;
1204
1205         /* create a unix domain stream socket to listen to */
1206         res = ux_socket_bind(ctdb);
1207         if (res!=0) {
1208                 DEBUG(DEBUG_ALERT,("Cannot continue.  Exiting!\n"));
1209                 exit(10);
1210         }
1211
1212         if (do_fork && fork()) {
1213                 return 0;
1214         }
1215
1216         tdb_reopen_all(false);
1217
1218         if (do_fork) {
1219                 if (setsid() == -1) {
1220                         ctdb_die(ctdb, "Failed to setsid()\n");
1221                 }
1222                 close(0);
1223                 if (open("/dev/null", O_RDONLY) != 0) {
1224                         DEBUG(DEBUG_ALERT,(__location__ " Failed to setup stdin on /dev/null\n"));
1225                         exit(11);
1226                 }
1227         }
1228         ignore_signal(SIGPIPE);
1229         ignore_signal(SIGUSR1);
1230
1231         ctdb->ctdbd_pid = getpid();
1232         DEBUG(DEBUG_ERR, ("Starting CTDBD (Version %s) as PID: %u\n",
1233                           CTDB_VERSION_STRING, ctdb->ctdbd_pid));
1234         ctdb_create_pidfile(ctdb->ctdbd_pid);
1235
1236         /* Make sure we log something when the daemon terminates.
1237          * This must be the first exit handler to run (so the last to
1238          * be registered.
1239          */
1240         atexit(print_exit_message);
1241
1242         if (ctdb->do_setsched) {
1243                 /* try to set us up as realtime */
1244                 if (!set_scheduler()) {
1245                         exit(1);
1246                 }
1247                 DEBUG(DEBUG_NOTICE, ("Set real-time scheduler priority\n"));
1248         }
1249
1250         ctdb->ev = tevent_context_init(NULL);
1251         tevent_loop_allow_nesting(ctdb->ev);
1252         tevent_set_trace_callback(ctdb->ev, ctdb_tevent_trace, ctdb);
1253         ret = ctdb_init_tevent_logging(ctdb);
1254         if (ret != 0) {
1255                 DEBUG(DEBUG_ALERT,("Failed to initialize TEVENT logging\n"));
1256                 exit(1);
1257         }
1258
1259         /* set up a handler to pick up sigchld */
1260         if (ctdb_init_sigchld(ctdb) == NULL) {
1261                 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD\n"));
1262                 exit(1);
1263         }
1264
1265         ctdb_set_child_logging(ctdb);
1266
1267         if (srvid_init(ctdb, &ctdb->srv) != 0) {
1268                 DEBUG(DEBUG_CRIT,("Failed to setup message srvid context\n"));
1269                 exit(1);
1270         }
1271
1272         /* initialize statistics collection */
1273         ctdb_statistics_init(ctdb);
1274
1275         /* force initial recovery for election */
1276         ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
1277
1278         ctdb_set_runstate(ctdb, CTDB_RUNSTATE_INIT);
1279         ret = ctdb_event_script(ctdb, CTDB_EVENT_INIT);
1280         if (ret != 0) {
1281                 ctdb_die(ctdb, "Failed to run init event\n");
1282         }
1283         ctdb_run_notification_script(ctdb, "init");
1284
1285         if (strcmp(ctdb->transport, "tcp") == 0) {
1286                 ret = ctdb_tcp_init(ctdb);
1287         }
1288 #ifdef USE_INFINIBAND
1289         if (strcmp(ctdb->transport, "ib") == 0) {
1290                 ret = ctdb_ibw_init(ctdb);
1291         }
1292 #endif
1293         if (ret != 0) {
1294                 DEBUG(DEBUG_ERR,("Failed to initialise transport '%s'\n", ctdb->transport));
1295                 return -1;
1296         }
1297
1298         if (ctdb->methods == NULL) {
1299                 DEBUG(DEBUG_ALERT,(__location__ " Can not initialize transport. ctdb->methods is NULL\n"));
1300                 ctdb_fatal(ctdb, "transport is unavailable. can not initialize.");
1301         }
1302
1303         /* Initialise the transport.  This sets the node address if it
1304          * was not set via the command-line. */
1305         if (ctdb->methods->initialise(ctdb) != 0) {
1306                 ctdb_fatal(ctdb, "transport failed to initialise");
1307         }
1308
1309         ctdb_set_my_pnn(ctdb);
1310
1311         initialise_node_flags(ctdb);
1312
1313         if (ctdb->public_addresses_file) {
1314                 ret = ctdb_set_public_addresses(ctdb, true);
1315                 if (ret == -1) {
1316                         DEBUG(DEBUG_ALERT,("Unable to setup public address list\n"));
1317                         exit(1);
1318                 }
1319         }
1320
1321         ctdb_initialise_vnn_map(ctdb);
1322
1323         /* attach to existing databases */
1324         if (ctdb_attach_databases(ctdb) != 0) {
1325                 ctdb_fatal(ctdb, "Failed to attach to databases\n");
1326         }
1327
1328         /* start frozen, then let the first election sort things out */
1329         if (!ctdb_blocking_freeze(ctdb)) {
1330                 ctdb_fatal(ctdb, "Failed to get initial freeze\n");
1331         }
1332
1333         /* now start accepting clients, only can do this once frozen */
1334         fde = tevent_add_fd(ctdb->ev, ctdb, ctdb->daemon.sd, TEVENT_FD_READ,
1335                             ctdb_accept_client, ctdb);
1336         if (fde == NULL) {
1337                 ctdb_fatal(ctdb, "Failed to add daemon socket to event loop");
1338         }
1339         tevent_fd_set_auto_close(fde);
1340
1341         /* Start the transport */
1342         if (ctdb->methods->start(ctdb) != 0) {
1343                 DEBUG(DEBUG_ALERT,("transport failed to start!\n"));
1344                 ctdb_fatal(ctdb, "transport failed to start");
1345         }
1346
1347         /* Recovery daemon and timed events are started from the
1348          * callback, only after the setup event completes
1349          * successfully.
1350          */
1351         ctdb_set_runstate(ctdb, CTDB_RUNSTATE_SETUP);
1352         ret = ctdb_event_script_callback(ctdb,
1353                                          ctdb,
1354                                          ctdb_setup_event_callback,
1355                                          ctdb,
1356                                          CTDB_EVENT_SETUP,
1357                                          "%s",
1358                                          "");
1359         if (ret != 0) {
1360                 DEBUG(DEBUG_CRIT,("Failed to set up 'setup' event\n"));
1361                 exit(1);
1362         }
1363
1364         lockdown_memory(ctdb->valgrinding);
1365
1366         /* go into a wait loop to allow other nodes to complete */
1367         tevent_loop_wait(ctdb->ev);
1368
1369         DEBUG(DEBUG_CRIT,("event_loop_wait() returned. this should not happen\n"));
1370         exit(1);
1371 }
1372
1373 /*
1374   allocate a packet for use in daemon<->daemon communication
1375  */
1376 struct ctdb_req_header *_ctdb_transport_allocate(struct ctdb_context *ctdb,
1377                                                  TALLOC_CTX *mem_ctx, 
1378                                                  enum ctdb_operation operation, 
1379                                                  size_t length, size_t slength,
1380                                                  const char *type)
1381 {
1382         int size;
1383         struct ctdb_req_header *hdr;
1384
1385         length = MAX(length, slength);
1386         size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
1387
1388         if (ctdb->methods == NULL) {
1389                 DEBUG(DEBUG_INFO,(__location__ " Unable to allocate transport packet for operation %u of length %u. Transport is DOWN.\n",
1390                          operation, (unsigned)length));
1391                 return NULL;
1392         }
1393
1394         hdr = (struct ctdb_req_header *)ctdb->methods->allocate_pkt(mem_ctx, size);
1395         if (hdr == NULL) {
1396                 DEBUG(DEBUG_ERR,("Unable to allocate transport packet for operation %u of length %u\n",
1397                          operation, (unsigned)length));
1398                 return NULL;
1399         }
1400         talloc_set_name_const(hdr, type);
1401         memset(hdr, 0, slength);
1402         hdr->length       = length;
1403         hdr->operation    = operation;
1404         hdr->ctdb_magic   = CTDB_MAGIC;
1405         hdr->ctdb_version = CTDB_PROTOCOL;
1406         hdr->generation   = ctdb->vnn_map->generation;
1407         hdr->srcnode      = ctdb->pnn;
1408
1409         return hdr;     
1410 }
1411
1412 struct daemon_control_state {
1413         struct daemon_control_state *next, *prev;
1414         struct ctdb_client *client;
1415         struct ctdb_req_control *c;
1416         uint32_t reqid;
1417         struct ctdb_node *node;
1418 };
1419
1420 /*
1421   callback when a control reply comes in
1422  */
1423 static void daemon_control_callback(struct ctdb_context *ctdb,
1424                                     int32_t status, TDB_DATA data, 
1425                                     const char *errormsg,
1426                                     void *private_data)
1427 {
1428         struct daemon_control_state *state = talloc_get_type(private_data, 
1429                                                              struct daemon_control_state);
1430         struct ctdb_client *client = state->client;
1431         struct ctdb_reply_control *r;
1432         size_t len;
1433         int ret;
1434
1435         /* construct a message to send to the client containing the data */
1436         len = offsetof(struct ctdb_reply_control, data) + data.dsize;
1437         if (errormsg) {
1438                 len += strlen(errormsg);
1439         }
1440         r = ctdbd_allocate_pkt(ctdb, state, CTDB_REPLY_CONTROL, len, 
1441                                struct ctdb_reply_control);
1442         CTDB_NO_MEMORY_VOID(ctdb, r);
1443
1444         r->hdr.reqid     = state->reqid;
1445         r->status        = status;
1446         r->datalen       = data.dsize;
1447         r->errorlen = 0;
1448         memcpy(&r->data[0], data.dptr, data.dsize);
1449         if (errormsg) {
1450                 r->errorlen = strlen(errormsg);
1451                 memcpy(&r->data[r->datalen], errormsg, r->errorlen);
1452         }
1453
1454         ret = daemon_queue_send(client, &r->hdr);
1455         if (ret != -1) {
1456                 talloc_free(state);
1457         }
1458 }
1459
1460 /*
1461   fail all pending controls to a disconnected node
1462  */
1463 void ctdb_daemon_cancel_controls(struct ctdb_context *ctdb, struct ctdb_node *node)
1464 {
1465         struct daemon_control_state *state;
1466         while ((state = node->pending_controls)) {
1467                 DLIST_REMOVE(node->pending_controls, state);
1468                 daemon_control_callback(ctdb, (uint32_t)-1, tdb_null, 
1469                                         "node is disconnected", state);
1470         }
1471 }
1472
1473 /*
1474   destroy a daemon_control_state
1475  */
1476 static int daemon_control_destructor(struct daemon_control_state *state)
1477 {
1478         if (state->node) {
1479                 DLIST_REMOVE(state->node->pending_controls, state);
1480         }
1481         return 0;
1482 }
1483
1484 /*
1485   this is called when the ctdb daemon received a ctdb request control
1486   from a local client over the unix domain socket
1487  */
1488 static void daemon_request_control_from_client(struct ctdb_client *client, 
1489                                                struct ctdb_req_control *c)
1490 {
1491         TDB_DATA data;
1492         int res;
1493         struct daemon_control_state *state;
1494         TALLOC_CTX *tmp_ctx = talloc_new(client);
1495
1496         if (c->hdr.destnode == CTDB_CURRENT_NODE) {
1497                 c->hdr.destnode = client->ctdb->pnn;
1498         }
1499
1500         state = talloc(client, struct daemon_control_state);
1501         CTDB_NO_MEMORY_VOID(client->ctdb, state);
1502
1503         state->client = client;
1504         state->c = talloc_steal(state, c);
1505         state->reqid = c->hdr.reqid;
1506         if (ctdb_validate_pnn(client->ctdb, c->hdr.destnode)) {
1507                 state->node = client->ctdb->nodes[c->hdr.destnode];
1508                 DLIST_ADD(state->node->pending_controls, state);
1509         } else {
1510                 state->node = NULL;
1511         }
1512
1513         talloc_set_destructor(state, daemon_control_destructor);
1514
1515         if (c->flags & CTDB_CTRL_FLAG_NOREPLY) {
1516                 talloc_steal(tmp_ctx, state);
1517         }
1518         
1519         data.dptr = &c->data[0];
1520         data.dsize = c->datalen;
1521         res = ctdb_daemon_send_control(client->ctdb, c->hdr.destnode,
1522                                        c->srvid, c->opcode, client->client_id,
1523                                        c->flags,
1524                                        data, daemon_control_callback,
1525                                        state);
1526         if (res != 0) {
1527                 DEBUG(DEBUG_ERR,(__location__ " Failed to send control to remote node %u\n",
1528                          c->hdr.destnode));
1529         }
1530
1531         talloc_free(tmp_ctx);
1532 }
1533
1534 /*
1535   register a call function
1536 */
1537 int ctdb_daemon_set_call(struct ctdb_context *ctdb, uint32_t db_id,
1538                          ctdb_fn_t fn, int id)
1539 {
1540         struct ctdb_registered_call *call;
1541         struct ctdb_db_context *ctdb_db;
1542
1543         ctdb_db = find_ctdb_db(ctdb, db_id);
1544         if (ctdb_db == NULL) {
1545                 return -1;
1546         }
1547
1548         call = talloc(ctdb_db, struct ctdb_registered_call);
1549         call->fn = fn;
1550         call->id = id;
1551
1552         DLIST_ADD(ctdb_db->calls, call);        
1553         return 0;
1554 }
1555
1556
1557
1558 /*
1559   this local messaging handler is ugly, but is needed to prevent
1560   recursion in ctdb_send_message() when the destination node is the
1561   same as the source node
1562  */
1563 struct ctdb_local_message {
1564         struct ctdb_context *ctdb;
1565         uint64_t srvid;
1566         TDB_DATA data;
1567 };
1568
1569 static void ctdb_local_message_trigger(struct tevent_context *ev,
1570                                        struct tevent_timer *te,
1571                                        struct timeval t, void *private_data)
1572 {
1573         struct ctdb_local_message *m = talloc_get_type(
1574                 private_data, struct ctdb_local_message);
1575
1576         srvid_dispatch(m->ctdb->srv, m->srvid, CTDB_SRVID_ALL, m->data);
1577         talloc_free(m);
1578 }
1579
1580 static int ctdb_local_message(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data)
1581 {
1582         struct ctdb_local_message *m;
1583         m = talloc(ctdb, struct ctdb_local_message);
1584         CTDB_NO_MEMORY(ctdb, m);
1585
1586         m->ctdb = ctdb;
1587         m->srvid = srvid;
1588         m->data  = data;
1589         m->data.dptr = talloc_memdup(m, m->data.dptr, m->data.dsize);
1590         if (m->data.dptr == NULL) {
1591                 talloc_free(m);
1592                 return -1;
1593         }
1594
1595         /* this needs to be done as an event to prevent recursion */
1596         tevent_add_timer(ctdb->ev, m, timeval_zero(),
1597                          ctdb_local_message_trigger, m);
1598         return 0;
1599 }
1600
1601 /*
1602   send a ctdb message
1603 */
1604 int ctdb_daemon_send_message(struct ctdb_context *ctdb, uint32_t pnn,
1605                              uint64_t srvid, TDB_DATA data)
1606 {
1607         struct ctdb_req_message *r;
1608         int len;
1609
1610         if (ctdb->methods == NULL) {
1611                 DEBUG(DEBUG_INFO,(__location__ " Failed to send message. Transport is DOWN\n"));
1612                 return -1;
1613         }
1614
1615         /* see if this is a message to ourselves */
1616         if (pnn == ctdb->pnn) {
1617                 return ctdb_local_message(ctdb, srvid, data);
1618         }
1619
1620         len = offsetof(struct ctdb_req_message, data) + data.dsize;
1621         r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_MESSAGE, len,
1622                                     struct ctdb_req_message);
1623         CTDB_NO_MEMORY(ctdb, r);
1624
1625         r->hdr.destnode  = pnn;
1626         r->srvid         = srvid;
1627         r->datalen       = data.dsize;
1628         memcpy(&r->data[0], data.dptr, data.dsize);
1629
1630         ctdb_queue_packet(ctdb, &r->hdr);
1631
1632         talloc_free(r);
1633         return 0;
1634 }
1635
1636
1637
1638 struct ctdb_client_notify_list {
1639         struct ctdb_client_notify_list *next, *prev;
1640         struct ctdb_context *ctdb;
1641         uint64_t srvid;
1642         TDB_DATA data;
1643 };
1644
1645
1646 static int ctdb_client_notify_destructor(struct ctdb_client_notify_list *nl)
1647 {
1648         int ret;
1649
1650         DEBUG(DEBUG_ERR,("Sending client notify message for srvid:%llu\n", (unsigned long long)nl->srvid));
1651
1652         ret = ctdb_daemon_send_message(nl->ctdb, CTDB_BROADCAST_CONNECTED, (unsigned long long)nl->srvid, nl->data);
1653         if (ret != 0) {
1654                 DEBUG(DEBUG_ERR,("Failed to send client notify message\n"));
1655         }
1656
1657         return 0;
1658 }
1659
1660 int32_t ctdb_control_register_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1661 {
1662         struct ctdb_client_notify_register *notify = (struct ctdb_client_notify_register *)indata.dptr;
1663         struct ctdb_client *client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
1664         struct ctdb_client_notify_list *nl;
1665
1666         DEBUG(DEBUG_INFO,("Register srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
1667
1668         if (indata.dsize < offsetof(struct ctdb_client_notify_register, notify_data)) {
1669                 DEBUG(DEBUG_ERR,(__location__ " Too little data in control : %d\n", (int)indata.dsize));
1670                 return -1;
1671         }
1672
1673         if (indata.dsize != (notify->len + offsetof(struct ctdb_client_notify_register, notify_data))) {
1674                 DEBUG(DEBUG_ERR,(__location__ " Wrong amount of data in control. Got %d, expected %d\n", (int)indata.dsize, (int)(notify->len + offsetof(struct ctdb_client_notify_register, notify_data))));
1675                 return -1;
1676         }
1677
1678
1679         if (client == NULL) {
1680                 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1681                 return -1;
1682         }
1683
1684         for(nl=client->notify; nl; nl=nl->next) {
1685                 if (nl->srvid == notify->srvid) {
1686                         break;
1687                 }
1688         }
1689         if (nl != NULL) {
1690                 DEBUG(DEBUG_ERR,(__location__ " Notification for srvid:%llu already exists for this client\n", (unsigned long long)notify->srvid));
1691                 return -1;
1692         }
1693
1694         nl = talloc(client, struct ctdb_client_notify_list);
1695         CTDB_NO_MEMORY(ctdb, nl);
1696         nl->ctdb       = ctdb;
1697         nl->srvid      = notify->srvid;
1698         nl->data.dsize = notify->len;
1699         nl->data.dptr  = talloc_size(nl, nl->data.dsize);
1700         CTDB_NO_MEMORY(ctdb, nl->data.dptr);
1701         memcpy(nl->data.dptr, notify->notify_data, nl->data.dsize);
1702         
1703         DLIST_ADD(client->notify, nl);
1704         talloc_set_destructor(nl, ctdb_client_notify_destructor);
1705
1706         return 0;
1707 }
1708
1709 int32_t ctdb_control_deregister_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1710 {
1711         struct ctdb_client_notify_deregister *notify = (struct ctdb_client_notify_deregister *)indata.dptr;
1712         struct ctdb_client *client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
1713         struct ctdb_client_notify_list *nl;
1714
1715         DEBUG(DEBUG_INFO,("Deregister srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
1716
1717         if (client == NULL) {
1718                 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1719                 return -1;
1720         }
1721
1722         for(nl=client->notify; nl; nl=nl->next) {
1723                 if (nl->srvid == notify->srvid) {
1724                         break;
1725                 }
1726         }
1727         if (nl == NULL) {
1728                 DEBUG(DEBUG_ERR,(__location__ " No notification for srvid:%llu found for this client\n", (unsigned long long)notify->srvid));
1729                 return -1;
1730         }
1731
1732         DLIST_REMOVE(client->notify, nl);
1733         talloc_set_destructor(nl, NULL);
1734         talloc_free(nl);
1735
1736         return 0;
1737 }
1738
1739 struct ctdb_client *ctdb_find_client_by_pid(struct ctdb_context *ctdb, pid_t pid)
1740 {
1741         struct ctdb_client_pid_list *client_pid;
1742
1743         for (client_pid = ctdb->client_pids; client_pid; client_pid=client_pid->next) {
1744                 if (client_pid->pid == pid) {
1745                         return client_pid->client;
1746                 }
1747         }
1748         return NULL;
1749 }
1750
1751
1752 /* This control is used by samba when probing if a process (of a samba daemon)
1753    exists on the node.
1754    Samba does this when it needs/wants to check if a subrecord in one of the
1755    databases is still valied, or if it is stale and can be removed.
1756    If the node is in unhealthy or stopped state we just kill of the samba
1757    process holding htis sub-record and return to the calling samba that
1758    the process does not exist.
1759    This allows us to forcefully recall subrecords registered by samba processes
1760    on banned and stopped nodes.
1761 */
1762 int32_t ctdb_control_process_exists(struct ctdb_context *ctdb, pid_t pid)
1763 {
1764         struct ctdb_client *client;
1765
1766         if (ctdb->nodes[ctdb->pnn]->flags & (NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
1767                 client = ctdb_find_client_by_pid(ctdb, pid);
1768                 if (client != NULL) {
1769                         DEBUG(DEBUG_NOTICE,(__location__ " Killing client with pid:%d on banned/stopped node\n", (int)pid));
1770                         talloc_free(client);
1771                 }
1772                 return -1;
1773         }
1774
1775         return kill(pid, 0);
1776 }
1777
1778 int ctdb_control_getnodesfile(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
1779 {
1780         struct ctdb_node_map *node_map = NULL;
1781
1782         CHECK_CONTROL_DATA_SIZE(0);
1783
1784         node_map = ctdb_read_nodes_file(ctdb, ctdb->nodes_file);
1785         if (node_map == NULL) {
1786                 DEBUG(DEBUG_ERR, ("Failed to read nodes file\n"));
1787                 return -1;
1788         }
1789
1790         outdata->dptr  = (unsigned char *)node_map;
1791         outdata->dsize = talloc_get_size(outdata->dptr);
1792
1793         return 0;
1794 }
1795
1796 void ctdb_shutdown_sequence(struct ctdb_context *ctdb, int exit_code)
1797 {
1798         if (ctdb->runstate == CTDB_RUNSTATE_SHUTDOWN) {
1799                 DEBUG(DEBUG_NOTICE,("Already shutting down so will not proceed.\n"));
1800                 return;
1801         }
1802
1803         DEBUG(DEBUG_NOTICE,("Shutdown sequence commencing.\n"));
1804         ctdb_set_runstate(ctdb, CTDB_RUNSTATE_SHUTDOWN);
1805         ctdb_stop_recoverd(ctdb);
1806         ctdb_stop_keepalive(ctdb);
1807         ctdb_stop_monitoring(ctdb);
1808         ctdb_release_all_ips(ctdb);
1809         ctdb_event_script(ctdb, CTDB_EVENT_SHUTDOWN);
1810         if (ctdb->methods != NULL) {
1811                 ctdb->methods->shutdown(ctdb);
1812         }
1813
1814         DEBUG(DEBUG_NOTICE,("Shutdown sequence complete, exiting.\n"));
1815         exit(exit_code);
1816 }