ctdb-daemon: Fix CID 1362723 Unchecked return value from library
[sfrench/samba-autobuild/.git] / ctdb / server / ctdb_daemon.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2006
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "replace.h"
21 #include "system/network.h"
22 #include "system/filesys.h"
23 #include "system/wait.h"
24 #include "system/time.h"
25
26 #include <talloc.h>
27 /* Allow use of deprecated function tevent_loop_allow_nesting() */
28 #define TEVENT_DEPRECATED
29 #include <tevent.h>
30 #include <tdb.h>
31
32 #include "lib/tdb_wrap/tdb_wrap.h"
33 #include "lib/util/dlinklist.h"
34 #include "lib/util/debug.h"
35 #include "lib/util/samba_util.h"
36 #include "lib/util/blocking.h"
37
38 #include "ctdb_version.h"
39 #include "ctdb_private.h"
40 #include "ctdb_client.h"
41
42 #include "common/rb_tree.h"
43 #include "common/reqid.h"
44 #include "common/system.h"
45 #include "common/common.h"
46 #include "common/logging.h"
47
48 struct ctdb_client_pid_list {
49         struct ctdb_client_pid_list *next, *prev;
50         struct ctdb_context *ctdb;
51         pid_t pid;
52         struct ctdb_client *client;
53 };
54
55 const char *ctdbd_pidfile = NULL;
56
57 static void daemon_incoming_packet(void *, struct ctdb_req_header *);
58
59 static void print_exit_message(void)
60 {
61         if (debug_extra != NULL && debug_extra[0] != '\0') {
62                 DEBUG(DEBUG_NOTICE,("CTDB %s shutting down\n", debug_extra));
63         } else {
64                 DEBUG(DEBUG_NOTICE,("CTDB daemon shutting down\n"));
65
66                 /* Wait a second to allow pending log messages to be flushed */
67                 sleep(1);
68         }
69 }
70
71
72
73 static void ctdb_time_tick(struct tevent_context *ev, struct tevent_timer *te,
74                                   struct timeval t, void *private_data)
75 {
76         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
77
78         if (getpid() != ctdb->ctdbd_pid) {
79                 return;
80         }
81
82         tevent_add_timer(ctdb->ev, ctdb,
83                          timeval_current_ofs(1, 0),
84                          ctdb_time_tick, ctdb);
85 }
86
87 /* Used to trigger a dummy event once per second, to make
88  * detection of hangs more reliable.
89  */
90 static void ctdb_start_time_tickd(struct ctdb_context *ctdb)
91 {
92         tevent_add_timer(ctdb->ev, ctdb,
93                          timeval_current_ofs(1, 0),
94                          ctdb_time_tick, ctdb);
95 }
96
97 static void ctdb_start_periodic_events(struct ctdb_context *ctdb)
98 {
99         /* start monitoring for connected/disconnected nodes */
100         ctdb_start_keepalive(ctdb);
101
102         /* start periodic update of tcp tickle lists */
103         ctdb_start_tcp_tickle_update(ctdb);
104
105         /* start listening for recovery daemon pings */
106         ctdb_control_recd_ping(ctdb);
107
108         /* start listening to timer ticks */
109         ctdb_start_time_tickd(ctdb);
110 }
111
112 static void ignore_signal(int signum)
113 {
114         struct sigaction act;
115
116         memset(&act, 0, sizeof(act));
117
118         act.sa_handler = SIG_IGN;
119         sigemptyset(&act.sa_mask);
120         sigaddset(&act.sa_mask, signum);
121         sigaction(signum, &act, NULL);
122 }
123
124
125 /*
126   send a packet to a client
127  */
128 static int daemon_queue_send(struct ctdb_client *client, struct ctdb_req_header *hdr)
129 {
130         CTDB_INCREMENT_STAT(client->ctdb, client_packets_sent);
131         if (hdr->operation == CTDB_REQ_MESSAGE) {
132                 if (ctdb_queue_length(client->queue) > client->ctdb->tunable.max_queue_depth_drop_msg) {
133                         DEBUG(DEBUG_ERR,("CTDB_REQ_MESSAGE queue full - killing client connection.\n"));
134                         talloc_free(client);
135                         return -1;
136                 }
137         }
138         return ctdb_queue_send(client->queue, (uint8_t *)hdr, hdr->length);
139 }
140
141 /*
142   message handler for when we are in daemon mode. This redirects the message
143   to the right client
144  */
145 static void daemon_message_handler(uint64_t srvid, TDB_DATA data,
146                                    void *private_data)
147 {
148         struct ctdb_client *client = talloc_get_type(private_data, struct ctdb_client);
149         struct ctdb_req_message_old *r;
150         int len;
151
152         /* construct a message to send to the client containing the data */
153         len = offsetof(struct ctdb_req_message_old, data) + data.dsize;
154         r = ctdbd_allocate_pkt(client->ctdb, client->ctdb, CTDB_REQ_MESSAGE,
155                                len, struct ctdb_req_message_old);
156         CTDB_NO_MEMORY_VOID(client->ctdb, r);
157
158         talloc_set_name_const(r, "req_message packet");
159
160         r->srvid         = srvid;
161         r->datalen       = data.dsize;
162         memcpy(&r->data[0], data.dptr, data.dsize);
163
164         daemon_queue_send(client, &r->hdr);
165
166         talloc_free(r);
167 }
168
169 /*
170   this is called when the ctdb daemon received a ctdb request to 
171   set the srvid from the client
172  */
173 int daemon_register_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
174 {
175         struct ctdb_client *client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
176         int res;
177         if (client == NULL) {
178                 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_register_message_handler\n"));
179                 return -1;
180         }
181         res = srvid_register(ctdb->srv, client, srvid, daemon_message_handler,
182                              client);
183         if (res != 0) {
184                 DEBUG(DEBUG_ERR,(__location__ " Failed to register handler %llu in daemon\n", 
185                          (unsigned long long)srvid));
186         } else {
187                 DEBUG(DEBUG_INFO,(__location__ " Registered message handler for srvid=%llu\n", 
188                          (unsigned long long)srvid));
189         }
190
191         return res;
192 }
193
194 /*
195   this is called when the ctdb daemon received a ctdb request to 
196   remove a srvid from the client
197  */
198 int daemon_deregister_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
199 {
200         struct ctdb_client *client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
201         if (client == NULL) {
202                 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_deregister_message_handler\n"));
203                 return -1;
204         }
205         return srvid_deregister(ctdb->srv, srvid, client);
206 }
207
208 int daemon_check_srvids(struct ctdb_context *ctdb, TDB_DATA indata,
209                         TDB_DATA *outdata)
210 {
211         uint64_t *ids;
212         int i, num_ids;
213         uint8_t *results;
214
215         if ((indata.dsize % sizeof(uint64_t)) != 0) {
216                 DEBUG(DEBUG_ERR, ("Bad indata in daemon_check_srvids, "
217                                   "size=%d\n", (int)indata.dsize));
218                 return -1;
219         }
220
221         ids = (uint64_t *)indata.dptr;
222         num_ids = indata.dsize / 8;
223
224         results = talloc_zero_array(outdata, uint8_t, (num_ids+7)/8);
225         if (results == NULL) {
226                 DEBUG(DEBUG_ERR, ("talloc failed in daemon_check_srvids\n"));
227                 return -1;
228         }
229         for (i=0; i<num_ids; i++) {
230                 if (srvid_exists(ctdb->srv, ids[i]) == 0) {
231                         results[i/8] |= (1 << (i%8));
232                 }
233         }
234         outdata->dptr = (uint8_t *)results;
235         outdata->dsize = talloc_get_size(results);
236         return 0;
237 }
238
239 /*
240   destroy a ctdb_client
241 */
242 static int ctdb_client_destructor(struct ctdb_client *client)
243 {
244         struct ctdb_db_context *ctdb_db;
245
246         ctdb_takeover_client_destructor_hook(client);
247         reqid_remove(client->ctdb->idr, client->client_id);
248         client->ctdb->num_clients--;
249
250         if (client->num_persistent_updates != 0) {
251                 DEBUG(DEBUG_ERR,(__location__ " Client disconnecting with %u persistent updates in flight. Starting recovery\n", client->num_persistent_updates));
252                 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
253         }
254         ctdb_db = find_ctdb_db(client->ctdb, client->db_id);
255         if (ctdb_db) {
256                 DEBUG(DEBUG_ERR, (__location__ " client exit while transaction "
257                                   "commit active. Forcing recovery.\n"));
258                 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
259
260                 /*
261                  * trans3 transaction state:
262                  *
263                  * The destructor sets the pointer to NULL.
264                  */
265                 talloc_free(ctdb_db->persistent_state);
266         }
267
268         return 0;
269 }
270
271
272 /*
273   this is called when the ctdb daemon received a ctdb request message
274   from a local client over the unix domain socket
275  */
276 static void daemon_request_message_from_client(struct ctdb_client *client, 
277                                                struct ctdb_req_message_old *c)
278 {
279         TDB_DATA data;
280         int res;
281
282         if (c->hdr.destnode == CTDB_CURRENT_NODE) {
283                 c->hdr.destnode = ctdb_get_pnn(client->ctdb);
284         }
285
286         /* maybe the message is for another client on this node */
287         if (ctdb_get_pnn(client->ctdb)==c->hdr.destnode) {
288                 ctdb_request_message(client->ctdb, (struct ctdb_req_header *)c);
289                 return;
290         }
291
292         /* its for a remote node */
293         data.dptr = &c->data[0];
294         data.dsize = c->datalen;
295         res = ctdb_daemon_send_message(client->ctdb, c->hdr.destnode,
296                                        c->srvid, data);
297         if (res != 0) {
298                 DEBUG(DEBUG_ERR,(__location__ " Failed to send message to remote node %u\n",
299                          c->hdr.destnode));
300         }
301 }
302
303
304 struct daemon_call_state {
305         struct ctdb_client *client;
306         uint32_t reqid;
307         struct ctdb_call *call;
308         struct timeval start_time;
309
310         /* readonly request ? */
311         uint32_t readonly_fetch;
312         uint32_t client_callid;
313 };
314
315 /* 
316    complete a call from a client 
317 */
318 static void daemon_call_from_client_callback(struct ctdb_call_state *state)
319 {
320         struct daemon_call_state *dstate = talloc_get_type(state->async.private_data, 
321                                                            struct daemon_call_state);
322         struct ctdb_reply_call_old *r;
323         int res;
324         uint32_t length;
325         struct ctdb_client *client = dstate->client;
326         struct ctdb_db_context *ctdb_db = state->ctdb_db;
327
328         talloc_steal(client, dstate);
329         talloc_steal(dstate, dstate->call);
330
331         res = ctdb_daemon_call_recv(state, dstate->call);
332         if (res != 0) {
333                 DEBUG(DEBUG_ERR, (__location__ " ctdbd_call_recv() returned error\n"));
334                 CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
335
336                 CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 1", call_latency, dstate->start_time);
337                 return;
338         }
339
340         length = offsetof(struct ctdb_reply_call_old, data) + dstate->call->reply_data.dsize;
341         /* If the client asked for readonly FETCH, we remapped this to 
342            FETCH_WITH_HEADER when calling the daemon. So we must
343            strip the extra header off the reply data before passing
344            it back to the client.
345         */
346         if (dstate->readonly_fetch
347         && dstate->client_callid == CTDB_FETCH_FUNC) {
348                 length -= sizeof(struct ctdb_ltdb_header);
349         }
350
351         r = ctdbd_allocate_pkt(client->ctdb, dstate, CTDB_REPLY_CALL, 
352                                length, struct ctdb_reply_call_old);
353         if (r == NULL) {
354                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate reply_call in ctdb daemon\n"));
355                 CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
356                 CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 2", call_latency, dstate->start_time);
357                 return;
358         }
359         r->hdr.reqid        = dstate->reqid;
360         r->status           = dstate->call->status;
361
362         if (dstate->readonly_fetch
363         && dstate->client_callid == CTDB_FETCH_FUNC) {
364                 /* client only asked for a FETCH so we must strip off
365                    the extra ctdb_ltdb header
366                 */
367                 r->datalen          = dstate->call->reply_data.dsize - sizeof(struct ctdb_ltdb_header);
368                 memcpy(&r->data[0], dstate->call->reply_data.dptr + sizeof(struct ctdb_ltdb_header), r->datalen);
369         } else {
370                 r->datalen          = dstate->call->reply_data.dsize;
371                 memcpy(&r->data[0], dstate->call->reply_data.dptr, r->datalen);
372         }
373
374         res = daemon_queue_send(client, &r->hdr);
375         if (res == -1) {
376                 /* client is dead - return immediately */
377                 return;
378         }
379         if (res != 0) {
380                 DEBUG(DEBUG_ERR, (__location__ " Failed to queue packet from daemon to client\n"));
381         }
382         CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 3", call_latency, dstate->start_time);
383         CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
384         talloc_free(dstate);
385 }
386
387 struct ctdb_daemon_packet_wrap {
388         struct ctdb_context *ctdb;
389         uint32_t client_id;
390 };
391
392 /*
393   a wrapper to catch disconnected clients
394  */
395 static void daemon_incoming_packet_wrap(void *p, struct ctdb_req_header *hdr)
396 {
397         struct ctdb_client *client;
398         struct ctdb_daemon_packet_wrap *w = talloc_get_type(p, 
399                                                             struct ctdb_daemon_packet_wrap);
400         if (w == NULL) {
401                 DEBUG(DEBUG_CRIT,(__location__ " Bad packet type '%s'\n", talloc_get_name(p)));
402                 return;
403         }
404
405         client = reqid_find(w->ctdb->idr, w->client_id, struct ctdb_client);
406         if (client == NULL) {
407                 DEBUG(DEBUG_ERR,(__location__ " Packet for disconnected client %u\n",
408                          w->client_id));
409                 talloc_free(w);
410                 return;
411         }
412         talloc_free(w);
413
414         /* process it */
415         daemon_incoming_packet(client, hdr);    
416 }
417
418 struct ctdb_deferred_fetch_call {
419         struct ctdb_deferred_fetch_call *next, *prev;
420         struct ctdb_req_call_old *c;
421         struct ctdb_daemon_packet_wrap *w;
422 };
423
424 struct ctdb_deferred_fetch_queue {
425         struct ctdb_deferred_fetch_call *deferred_calls;
426 };
427
428 struct ctdb_deferred_requeue {
429         struct ctdb_deferred_fetch_call *dfc;
430         struct ctdb_client *client;
431 };
432
433 /* called from a timer event and starts reprocessing the deferred call.*/
434 static void reprocess_deferred_call(struct tevent_context *ev,
435                                     struct tevent_timer *te,
436                                     struct timeval t, void *private_data)
437 {
438         struct ctdb_deferred_requeue *dfr = (struct ctdb_deferred_requeue *)private_data;
439         struct ctdb_client *client = dfr->client;
440
441         talloc_steal(client, dfr->dfc->c);
442         daemon_incoming_packet(client, (struct ctdb_req_header *)dfr->dfc->c);
443         talloc_free(dfr);
444 }
445
446 /* the referral context is destroyed either after a timeout or when the initial
447    fetch-lock has finished.
448    at this stage, immediately start reprocessing the queued up deferred
449    calls so they get reprocessed immediately (and since we are dmaster at
450    this stage, trigger the waiting smbd processes to pick up and aquire the
451    record right away.
452 */
453 static int deferred_fetch_queue_destructor(struct ctdb_deferred_fetch_queue *dfq)
454 {
455
456         /* need to reprocess the packets from the queue explicitely instead of
457            just using a normal destructor since we want, need, to
458            call the clients in the same oder as the requests queued up
459         */
460         while (dfq->deferred_calls != NULL) {
461                 struct ctdb_client *client;
462                 struct ctdb_deferred_fetch_call *dfc = dfq->deferred_calls;
463                 struct ctdb_deferred_requeue *dfr;
464
465                 DLIST_REMOVE(dfq->deferred_calls, dfc);
466
467                 client = reqid_find(dfc->w->ctdb->idr, dfc->w->client_id, struct ctdb_client);
468                 if (client == NULL) {
469                         DEBUG(DEBUG_ERR,(__location__ " Packet for disconnected client %u\n",
470                                  dfc->w->client_id));
471                         continue;
472                 }
473
474                 /* process it by pushing it back onto the eventloop */
475                 dfr = talloc(client, struct ctdb_deferred_requeue);
476                 if (dfr == NULL) {
477                         DEBUG(DEBUG_ERR,("Failed to allocate deferred fetch requeue structure\n"));
478                         continue;
479                 }
480
481                 dfr->dfc    = talloc_steal(dfr, dfc);
482                 dfr->client = client;
483
484                 tevent_add_timer(dfc->w->ctdb->ev, client, timeval_zero(),
485                                  reprocess_deferred_call, dfr);
486         }
487
488         return 0;
489 }
490
491 /* insert the new deferral context into the rb tree.
492    there should never be a pre-existing context here, but check for it
493    warn and destroy the previous context if there is already a deferral context
494    for this key.
495 */
496 static void *insert_dfq_callback(void *parm, void *data)
497 {
498         if (data) {
499                 DEBUG(DEBUG_ERR,("Already have DFQ registered. Free old %p and create new %p\n", data, parm));
500                 talloc_free(data);
501         }
502         return parm;
503 }
504
505 /* if the original fetch-lock did not complete within a reasonable time,
506    free the context and context for all deferred requests to cause them to be
507    re-inserted into the event system.
508 */
509 static void dfq_timeout(struct tevent_context *ev, struct tevent_timer *te,
510                         struct timeval t, void *private_data)
511 {
512         talloc_free(private_data);
513 }
514
515 /* This function is used in the local daemon to register a KEY in a database
516    for being "fetched"
517    While the remote fetch is in-flight, any futher attempts to re-fetch the
518    same record will be deferred until the fetch completes.
519 */
520 static int setup_deferred_fetch_locks(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
521 {
522         uint32_t *k;
523         struct ctdb_deferred_fetch_queue *dfq;
524
525         k = ctdb_key_to_idkey(call, call->key);
526         if (k == NULL) {
527                 DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch\n"));
528                 return -1;
529         }
530
531         dfq  = talloc(call, struct ctdb_deferred_fetch_queue);
532         if (dfq == NULL) {
533                 DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch queue structure\n"));
534                 talloc_free(k);
535                 return -1;
536         }
537         dfq->deferred_calls = NULL;
538
539         trbt_insertarray32_callback(ctdb_db->deferred_fetch, k[0], &k[0], insert_dfq_callback, dfq);
540
541         talloc_set_destructor(dfq, deferred_fetch_queue_destructor);
542
543         /* if the fetch havent completed in 30 seconds, just tear it all down
544            and let it try again as the events are reissued */
545         tevent_add_timer(ctdb_db->ctdb->ev, dfq, timeval_current_ofs(30, 0),
546                          dfq_timeout, dfq);
547
548         talloc_free(k);
549         return 0;
550 }
551
552 /* check if this is a duplicate request to a fetch already in-flight
553    if it is, make this call deferred to be reprocessed later when
554    the in-flight fetch completes.
555 */
556 static int requeue_duplicate_fetch(struct ctdb_db_context *ctdb_db, struct ctdb_client *client, TDB_DATA key, struct ctdb_req_call_old *c)
557 {
558         uint32_t *k;
559         struct ctdb_deferred_fetch_queue *dfq;
560         struct ctdb_deferred_fetch_call *dfc;
561
562         k = ctdb_key_to_idkey(c, key);
563         if (k == NULL) {
564                 DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch\n"));
565                 return -1;
566         }
567
568         dfq = trbt_lookuparray32(ctdb_db->deferred_fetch, k[0], &k[0]);
569         if (dfq == NULL) {
570                 talloc_free(k);
571                 return -1;
572         }
573
574
575         talloc_free(k);
576
577         dfc = talloc(dfq, struct ctdb_deferred_fetch_call);
578         if (dfc == NULL) {
579                 DEBUG(DEBUG_ERR, ("Failed to allocate deferred fetch call structure\n"));
580                 return -1;
581         }
582
583         dfc->w = talloc(dfc, struct ctdb_daemon_packet_wrap);
584         if (dfc->w == NULL) {
585                 DEBUG(DEBUG_ERR,("Failed to allocate deferred fetch daemon packet wrap structure\n"));
586                 talloc_free(dfc);
587                 return -1;
588         }
589
590         dfc->c = talloc_steal(dfc, c);
591         dfc->w->ctdb = ctdb_db->ctdb;
592         dfc->w->client_id = client->client_id;
593
594         DLIST_ADD_END(dfq->deferred_calls, dfc);
595
596         return 0;
597 }
598
599
600 /*
601   this is called when the ctdb daemon received a ctdb request call
602   from a local client over the unix domain socket
603  */
604 static void daemon_request_call_from_client(struct ctdb_client *client, 
605                                             struct ctdb_req_call_old *c)
606 {
607         struct ctdb_call_state *state;
608         struct ctdb_db_context *ctdb_db;
609         struct daemon_call_state *dstate;
610         struct ctdb_call *call;
611         struct ctdb_ltdb_header header;
612         TDB_DATA key, data;
613         int ret;
614         struct ctdb_context *ctdb = client->ctdb;
615         struct ctdb_daemon_packet_wrap *w;
616
617         CTDB_INCREMENT_STAT(ctdb, total_calls);
618         CTDB_INCREMENT_STAT(ctdb, pending_calls);
619
620         ctdb_db = find_ctdb_db(client->ctdb, c->db_id);
621         if (!ctdb_db) {
622                 DEBUG(DEBUG_ERR, (__location__ " Unknown database in request. db_id==0x%08x",
623                           c->db_id));
624                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
625                 return;
626         }
627
628         if (ctdb_db->unhealthy_reason) {
629                 /*
630                  * this is just a warning, as the tdb should be empty anyway,
631                  * and only persistent databases can be unhealthy, which doesn't
632                  * use this code patch
633                  */
634                 DEBUG(DEBUG_WARNING,("warn: db(%s) unhealty in daemon_request_call_from_client(): %s\n",
635                                      ctdb_db->db_name, ctdb_db->unhealthy_reason));
636         }
637
638         key.dptr = c->data;
639         key.dsize = c->keylen;
640
641         w = talloc(ctdb, struct ctdb_daemon_packet_wrap);
642         CTDB_NO_MEMORY_VOID(ctdb, w);   
643
644         w->ctdb = ctdb;
645         w->client_id = client->client_id;
646
647         ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header, 
648                                            (struct ctdb_req_header *)c, &data,
649                                            daemon_incoming_packet_wrap, w, true);
650         if (ret == -2) {
651                 /* will retry later */
652                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
653                 return;
654         }
655
656         talloc_free(w);
657
658         if (ret != 0) {
659                 DEBUG(DEBUG_ERR,(__location__ " Unable to fetch record\n"));
660                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
661                 return;
662         }
663
664
665         /* check if this fetch request is a duplicate for a
666            request we already have in flight. If so defer it until
667            the first request completes.
668         */
669         if (ctdb->tunable.fetch_collapse == 1) {
670                 if (requeue_duplicate_fetch(ctdb_db, client, key, c) == 0) {
671                         ret = ctdb_ltdb_unlock(ctdb_db, key);
672                         if (ret != 0) {
673                                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
674                         }
675                         CTDB_DECREMENT_STAT(ctdb, pending_calls);
676                         return;
677                 }
678         }
679
680         /* Dont do READONLY if we don't have a tracking database */
681         if ((c->flags & CTDB_WANT_READONLY) && !ctdb_db->readonly) {
682                 c->flags &= ~CTDB_WANT_READONLY;
683         }
684
685         if (header.flags & CTDB_REC_RO_REVOKE_COMPLETE) {
686                 header.flags &= ~CTDB_REC_RO_FLAGS;
687                 CTDB_INCREMENT_STAT(ctdb, total_ro_revokes);
688                 CTDB_INCREMENT_DB_STAT(ctdb_db, db_ro_revokes);
689                 if (ctdb_ltdb_store(ctdb_db, key, &header, data) != 0) {
690                         ctdb_fatal(ctdb, "Failed to write header with cleared REVOKE flag");
691                 }
692                 /* and clear out the tracking data */
693                 if (tdb_delete(ctdb_db->rottdb, key) != 0) {
694                         DEBUG(DEBUG_ERR,(__location__ " Failed to clear out trackingdb record\n"));
695                 }
696         }
697
698         /* if we are revoking, we must defer all other calls until the revoke
699          * had completed.
700          */
701         if (header.flags & CTDB_REC_RO_REVOKING_READONLY) {
702                 talloc_free(data.dptr);
703                 ret = ctdb_ltdb_unlock(ctdb_db, key);
704
705                 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, key, (struct ctdb_req_header *)c, daemon_incoming_packet, client) != 0) {
706                         ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
707                 }
708                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
709                 return;
710         }
711
712         if ((header.dmaster == ctdb->pnn)
713         && (!(c->flags & CTDB_WANT_READONLY))
714         && (header.flags & (CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY)) ) {
715                 header.flags   |= CTDB_REC_RO_REVOKING_READONLY;
716                 if (ctdb_ltdb_store(ctdb_db, key, &header, data) != 0) {
717                         ctdb_fatal(ctdb, "Failed to store record with HAVE_DELEGATIONS set");
718                 }
719                 ret = ctdb_ltdb_unlock(ctdb_db, key);
720
721                 if (ctdb_start_revoke_ro_record(ctdb, ctdb_db, key, &header, data) != 0) {
722                         ctdb_fatal(ctdb, "Failed to start record revoke");
723                 }
724                 talloc_free(data.dptr);
725
726                 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, key, (struct ctdb_req_header *)c, daemon_incoming_packet, client) != 0) {
727                         ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
728                 }
729
730                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
731                 return;
732         }               
733
734         dstate = talloc(client, struct daemon_call_state);
735         if (dstate == NULL) {
736                 ret = ctdb_ltdb_unlock(ctdb_db, key);
737                 if (ret != 0) {
738                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
739                 }
740
741                 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate dstate\n"));
742                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
743                 return;
744         }
745         dstate->start_time = timeval_current();
746         dstate->client = client;
747         dstate->reqid  = c->hdr.reqid;
748         talloc_steal(dstate, data.dptr);
749
750         call = dstate->call = talloc_zero(dstate, struct ctdb_call);
751         if (call == NULL) {
752                 ret = ctdb_ltdb_unlock(ctdb_db, key);
753                 if (ret != 0) {
754                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
755                 }
756
757                 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate call\n"));
758                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
759                 CTDB_UPDATE_LATENCY(ctdb, ctdb_db, "call_from_client 1", call_latency, dstate->start_time);
760                 return;
761         }
762
763         dstate->readonly_fetch = 0;
764         call->call_id = c->callid;
765         call->key = key;
766         call->call_data.dptr = c->data + c->keylen;
767         call->call_data.dsize = c->calldatalen;
768         call->flags = c->flags;
769
770         if (c->flags & CTDB_WANT_READONLY) {
771                 /* client wants readonly record, so translate this into a 
772                    fetch with header. remember what the client asked for
773                    so we can remap the reply back to the proper format for
774                    the client in the reply
775                  */
776                 dstate->client_callid = call->call_id;
777                 call->call_id = CTDB_FETCH_WITH_HEADER_FUNC;
778                 dstate->readonly_fetch = 1;
779         }
780
781         if (header.dmaster == ctdb->pnn) {
782                 state = ctdb_call_local_send(ctdb_db, call, &header, &data);
783         } else {
784                 state = ctdb_daemon_call_send_remote(ctdb_db, call, &header);
785                 if (ctdb->tunable.fetch_collapse == 1) {
786                         /* This request triggered a remote fetch-lock.
787                            set up a deferral for this key so any additional
788                            fetch-locks are deferred until the current one
789                            finishes.
790                          */
791                         setup_deferred_fetch_locks(ctdb_db, call);
792                 }
793         }
794
795         ret = ctdb_ltdb_unlock(ctdb_db, key);
796         if (ret != 0) {
797                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
798         }
799
800         if (state == NULL) {
801                 DEBUG(DEBUG_ERR,(__location__ " Unable to setup call send\n"));
802                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
803                 CTDB_UPDATE_LATENCY(ctdb, ctdb_db, "call_from_client 2", call_latency, dstate->start_time);
804                 return;
805         }
806         talloc_steal(state, dstate);
807         talloc_steal(client, state);
808
809         state->async.fn = daemon_call_from_client_callback;
810         state->async.private_data = dstate;
811 }
812
813
814 static void daemon_request_control_from_client(struct ctdb_client *client, 
815                                                struct ctdb_req_control_old *c);
816
817 /* data contains a packet from the client */
818 static void daemon_incoming_packet(void *p, struct ctdb_req_header *hdr)
819 {
820         struct ctdb_client *client = talloc_get_type(p, struct ctdb_client);
821         TALLOC_CTX *tmp_ctx;
822         struct ctdb_context *ctdb = client->ctdb;
823
824         /* place the packet as a child of a tmp_ctx. We then use
825            talloc_free() below to free it. If any of the calls want
826            to keep it, then they will steal it somewhere else, and the
827            talloc_free() will be a no-op */
828         tmp_ctx = talloc_new(client);
829         talloc_steal(tmp_ctx, hdr);
830
831         if (hdr->ctdb_magic != CTDB_MAGIC) {
832                 ctdb_set_error(client->ctdb, "Non CTDB packet rejected in daemon\n");
833                 goto done;
834         }
835
836         if (hdr->ctdb_version != CTDB_PROTOCOL) {
837                 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
838                 goto done;
839         }
840
841         switch (hdr->operation) {
842         case CTDB_REQ_CALL:
843                 CTDB_INCREMENT_STAT(ctdb, client.req_call);
844                 daemon_request_call_from_client(client, (struct ctdb_req_call_old *)hdr);
845                 break;
846
847         case CTDB_REQ_MESSAGE:
848                 CTDB_INCREMENT_STAT(ctdb, client.req_message);
849                 daemon_request_message_from_client(client, (struct ctdb_req_message_old *)hdr);
850                 break;
851
852         case CTDB_REQ_CONTROL:
853                 CTDB_INCREMENT_STAT(ctdb, client.req_control);
854                 daemon_request_control_from_client(client, (struct ctdb_req_control_old *)hdr);
855                 break;
856
857         default:
858                 DEBUG(DEBUG_CRIT,(__location__ " daemon: unrecognized operation %u\n",
859                          hdr->operation));
860         }
861
862 done:
863         talloc_free(tmp_ctx);
864 }
865
866 /*
867   called when the daemon gets a incoming packet
868  */
869 static void ctdb_daemon_read_cb(uint8_t *data, size_t cnt, void *args)
870 {
871         struct ctdb_client *client = talloc_get_type(args, struct ctdb_client);
872         struct ctdb_req_header *hdr;
873
874         if (cnt == 0) {
875                 talloc_free(client);
876                 return;
877         }
878
879         CTDB_INCREMENT_STAT(client->ctdb, client_packets_recv);
880
881         if (cnt < sizeof(*hdr)) {
882                 ctdb_set_error(client->ctdb, "Bad packet length %u in daemon\n", 
883                                (unsigned)cnt);
884                 return;
885         }
886         hdr = (struct ctdb_req_header *)data;
887         if (cnt != hdr->length) {
888                 ctdb_set_error(client->ctdb, "Bad header length %u expected %u\n in daemon", 
889                                (unsigned)hdr->length, (unsigned)cnt);
890                 return;
891         }
892
893         if (hdr->ctdb_magic != CTDB_MAGIC) {
894                 ctdb_set_error(client->ctdb, "Non CTDB packet rejected\n");
895                 return;
896         }
897
898         if (hdr->ctdb_version != CTDB_PROTOCOL) {
899                 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
900                 return;
901         }
902
903         DEBUG(DEBUG_DEBUG,(__location__ " client request %u of type %u length %u from "
904                  "node %u to %u\n", hdr->reqid, hdr->operation, hdr->length,
905                  hdr->srcnode, hdr->destnode));
906
907         /* it is the responsibility of the incoming packet function to free 'data' */
908         daemon_incoming_packet(client, hdr);
909 }
910
911
912 static int ctdb_clientpid_destructor(struct ctdb_client_pid_list *client_pid)
913 {
914         if (client_pid->ctdb->client_pids != NULL) {
915                 DLIST_REMOVE(client_pid->ctdb->client_pids, client_pid);
916         }
917
918         return 0;
919 }
920
921
922 static void ctdb_accept_client(struct tevent_context *ev,
923                                struct tevent_fd *fde, uint16_t flags,
924                                void *private_data)
925 {
926         struct sockaddr_un addr;
927         socklen_t len;
928         int fd;
929         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
930         struct ctdb_client *client;
931         struct ctdb_client_pid_list *client_pid;
932         pid_t peer_pid = 0;
933         int ret;
934
935         memset(&addr, 0, sizeof(addr));
936         len = sizeof(addr);
937         fd = accept(ctdb->daemon.sd, (struct sockaddr *)&addr, &len);
938         if (fd == -1) {
939                 return;
940         }
941
942         ret = set_blocking(fd, false);
943         if (ret != 0) {
944                 DEBUG(DEBUG_ERR,
945                       (__location__
946                        " failed to set socket non-blocking (%s)\n",
947                        strerror(errno)));
948                 close(fd);
949                 return;
950         }
951
952         set_close_on_exec(fd);
953
954         DEBUG(DEBUG_DEBUG,(__location__ " Created SOCKET FD:%d to connected child\n", fd));
955
956         client = talloc_zero(ctdb, struct ctdb_client);
957         if (ctdb_get_peer_pid(fd, &peer_pid) == 0) {
958                 DEBUG(DEBUG_INFO,("Connected client with pid:%u\n", (unsigned)peer_pid));
959         }
960
961         client->ctdb = ctdb;
962         client->fd = fd;
963         client->client_id = reqid_new(ctdb->idr, client);
964         client->pid = peer_pid;
965
966         client_pid = talloc(client, struct ctdb_client_pid_list);
967         if (client_pid == NULL) {
968                 DEBUG(DEBUG_ERR,("Failed to allocate client pid structure\n"));
969                 close(fd);
970                 talloc_free(client);
971                 return;
972         }               
973         client_pid->ctdb   = ctdb;
974         client_pid->pid    = peer_pid;
975         client_pid->client = client;
976
977         DLIST_ADD(ctdb->client_pids, client_pid);
978
979         client->queue = ctdb_queue_setup(ctdb, client, fd, CTDB_DS_ALIGNMENT, 
980                                          ctdb_daemon_read_cb, client,
981                                          "client-%u", client->pid);
982
983         talloc_set_destructor(client, ctdb_client_destructor);
984         talloc_set_destructor(client_pid, ctdb_clientpid_destructor);
985         ctdb->num_clients++;
986 }
987
988
989
990 /*
991   create a unix domain socket and bind it
992   return a file descriptor open on the socket 
993 */
994 static int ux_socket_bind(struct ctdb_context *ctdb)
995 {
996         struct sockaddr_un addr;
997         int ret;
998
999         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
1000         if (ctdb->daemon.sd == -1) {
1001                 return -1;
1002         }
1003
1004         memset(&addr, 0, sizeof(addr));
1005         addr.sun_family = AF_UNIX;
1006         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path)-1);
1007
1008         /* First check if an old ctdbd might be running */
1009         if (connect(ctdb->daemon.sd,
1010                     (struct sockaddr *)&addr, sizeof(addr)) == 0) {
1011                 DEBUG(DEBUG_CRIT,
1012                       ("Something is already listening on ctdb socket '%s'\n",
1013                        ctdb->daemon.name));
1014                 goto failed;
1015         }
1016
1017         /* Remove any old socket */
1018         unlink(ctdb->daemon.name);
1019
1020         set_close_on_exec(ctdb->daemon.sd);
1021
1022         ret = set_blocking(ctdb->daemon.sd, false);
1023         if (ret != 0) {
1024                 DEBUG(DEBUG_ERR,
1025                       (__location__
1026                        " failed to set socket non-blocking (%s)\n",
1027                        strerror(errno)));
1028                 goto failed;
1029         }
1030
1031         if (bind(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
1032                 DEBUG(DEBUG_CRIT,("Unable to bind on ctdb socket '%s'\n", ctdb->daemon.name));
1033                 goto failed;
1034         }
1035
1036         if (chown(ctdb->daemon.name, geteuid(), getegid()) != 0 ||
1037             chmod(ctdb->daemon.name, 0700) != 0) {
1038                 DEBUG(DEBUG_CRIT,("Unable to secure ctdb socket '%s', ctdb->daemon.name\n", ctdb->daemon.name));
1039                 goto failed;
1040         }
1041
1042
1043         if (listen(ctdb->daemon.sd, 100) != 0) {
1044                 DEBUG(DEBUG_CRIT,("Unable to listen on ctdb socket '%s'\n", ctdb->daemon.name));
1045                 goto failed;
1046         }
1047
1048         DEBUG(DEBUG_NOTICE, ("Listening to ctdb socket %s\n",
1049                              ctdb->daemon.name));
1050         return 0;
1051
1052 failed:
1053         close(ctdb->daemon.sd);
1054         ctdb->daemon.sd = -1;
1055         return -1;      
1056 }
1057
1058 static void initialise_node_flags (struct ctdb_context *ctdb)
1059 {
1060         if (ctdb->pnn == -1) {
1061                 ctdb_fatal(ctdb, "PNN is set to -1 (unknown value)");
1062         }
1063
1064         ctdb->nodes[ctdb->pnn]->flags &= ~NODE_FLAGS_DISCONNECTED;
1065
1066         /* do we start out in DISABLED mode? */
1067         if (ctdb->start_as_disabled != 0) {
1068                 DEBUG(DEBUG_NOTICE, ("This node is configured to start in DISABLED state\n"));
1069                 ctdb->nodes[ctdb->pnn]->flags |= NODE_FLAGS_DISABLED;
1070         }
1071         /* do we start out in STOPPED mode? */
1072         if (ctdb->start_as_stopped != 0) {
1073                 DEBUG(DEBUG_NOTICE, ("This node is configured to start in STOPPED state\n"));
1074                 ctdb->nodes[ctdb->pnn]->flags |= NODE_FLAGS_STOPPED;
1075         }
1076 }
1077
1078 static void ctdb_setup_event_callback(struct ctdb_context *ctdb, int status,
1079                                       void *private_data)
1080 {
1081         if (status != 0) {
1082                 ctdb_die(ctdb, "Failed to run setup event");
1083         }
1084         ctdb_run_notification_script(ctdb, "setup");
1085
1086         /* tell all other nodes we've just started up */
1087         ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL,
1088                                  0, CTDB_CONTROL_STARTUP, 0,
1089                                  CTDB_CTRL_FLAG_NOREPLY,
1090                                  tdb_null, NULL, NULL);
1091
1092         /* Start the recovery daemon */
1093         if (ctdb_start_recoverd(ctdb) != 0) {
1094                 DEBUG(DEBUG_ALERT,("Failed to start recovery daemon\n"));
1095                 exit(11);
1096         }
1097
1098         ctdb_start_periodic_events(ctdb);
1099
1100         ctdb_wait_for_first_recovery(ctdb);
1101 }
1102
1103 static struct timeval tevent_before_wait_ts;
1104 static struct timeval tevent_after_wait_ts;
1105
1106 static void ctdb_tevent_trace(enum tevent_trace_point tp,
1107                               void *private_data)
1108 {
1109         struct timeval diff;
1110         struct timeval now;
1111         struct ctdb_context *ctdb =
1112                 talloc_get_type(private_data, struct ctdb_context);
1113
1114         if (getpid() != ctdb->ctdbd_pid) {
1115                 return;
1116         }
1117
1118         now = timeval_current();
1119
1120         switch (tp) {
1121         case TEVENT_TRACE_BEFORE_WAIT:
1122                 if (!timeval_is_zero(&tevent_after_wait_ts)) {
1123                         diff = timeval_until(&tevent_after_wait_ts, &now);
1124                         if (diff.tv_sec > 3) {
1125                                 DEBUG(DEBUG_ERR,
1126                                       ("Handling event took %ld seconds!\n",
1127                                        (long)diff.tv_sec));
1128                         }
1129                 }
1130                 tevent_before_wait_ts = now;
1131                 break;
1132
1133         case TEVENT_TRACE_AFTER_WAIT:
1134                 if (!timeval_is_zero(&tevent_before_wait_ts)) {
1135                         diff = timeval_until(&tevent_before_wait_ts, &now);
1136                         if (diff.tv_sec > 3) {
1137                                 DEBUG(DEBUG_CRIT,
1138                                       ("No event for %ld seconds!\n",
1139                                        (long)diff.tv_sec));
1140                         }
1141                 }
1142                 tevent_after_wait_ts = now;
1143                 break;
1144
1145         default:
1146                 /* Do nothing for future tevent trace points */ ;
1147         }
1148 }
1149
1150 static void ctdb_remove_pidfile(void)
1151 {
1152         /* Only the main ctdbd's PID matches the SID */
1153         if (ctdbd_pidfile != NULL && getsid(0) == getpid()) {
1154                 if (unlink(ctdbd_pidfile) == 0) {
1155                         DEBUG(DEBUG_NOTICE, ("Removed PID file %s\n",
1156                                              ctdbd_pidfile));
1157                 } else {
1158                         DEBUG(DEBUG_WARNING, ("Failed to Remove PID file %s\n",
1159                                               ctdbd_pidfile));
1160                 }
1161         }
1162 }
1163
1164 static void ctdb_create_pidfile(pid_t pid)
1165 {
1166         if (ctdbd_pidfile != NULL) {
1167                 FILE *fp;
1168
1169                 fp = fopen(ctdbd_pidfile, "w");
1170                 if (fp == NULL) {
1171                         DEBUG(DEBUG_ALERT,
1172                               ("Failed to open PID file %s\n", ctdbd_pidfile));
1173                         exit(11);
1174                 }
1175
1176                 fprintf(fp, "%d\n", pid);
1177                 fclose(fp);
1178                 DEBUG(DEBUG_NOTICE, ("Created PID file %s\n", ctdbd_pidfile));
1179                 atexit(ctdb_remove_pidfile);
1180         }
1181 }
1182
1183 static void ctdb_initialise_vnn_map(struct ctdb_context *ctdb)
1184 {
1185         int i, j, count;
1186
1187         /* initialize the vnn mapping table, skipping any deleted nodes */
1188         ctdb->vnn_map = talloc(ctdb, struct ctdb_vnn_map);
1189         CTDB_NO_MEMORY_FATAL(ctdb, ctdb->vnn_map);
1190
1191         count = 0;
1192         for (i = 0; i < ctdb->num_nodes; i++) {
1193                 if ((ctdb->nodes[i]->flags & NODE_FLAGS_DELETED) == 0) {
1194                         count++;
1195                 }
1196         }
1197
1198         ctdb->vnn_map->generation = INVALID_GENERATION;
1199         ctdb->vnn_map->size = count;
1200         ctdb->vnn_map->map = talloc_array(ctdb->vnn_map, uint32_t, ctdb->vnn_map->size);
1201         CTDB_NO_MEMORY_FATAL(ctdb, ctdb->vnn_map->map);
1202
1203         for(i=0, j=0; i < ctdb->vnn_map->size; i++) {
1204                 if (ctdb->nodes[i]->flags & NODE_FLAGS_DELETED) {
1205                         continue;
1206                 }
1207                 ctdb->vnn_map->map[j] = i;
1208                 j++;
1209         }
1210 }
1211
1212 static void ctdb_set_my_pnn(struct ctdb_context *ctdb)
1213 {
1214         int nodeid;
1215
1216         if (ctdb->address == NULL) {
1217                 ctdb_fatal(ctdb,
1218                            "Can not determine PNN - node address is not set\n");
1219         }
1220
1221         nodeid = ctdb_ip_to_nodeid(ctdb, ctdb->address);
1222         if (nodeid == -1) {
1223                 ctdb_fatal(ctdb,
1224                            "Can not determine PNN - node address not found in node list\n");
1225         }
1226
1227         ctdb->pnn = ctdb->nodes[nodeid]->pnn;
1228         DEBUG(DEBUG_NOTICE, ("PNN is %u\n", ctdb->pnn));
1229 }
1230
1231 /*
1232   start the protocol going as a daemon
1233 */
1234 int ctdb_start_daemon(struct ctdb_context *ctdb, bool do_fork)
1235 {
1236         int res, ret = -1;
1237         struct tevent_fd *fde;
1238
1239         /* create a unix domain stream socket to listen to */
1240         res = ux_socket_bind(ctdb);
1241         if (res!=0) {
1242                 DEBUG(DEBUG_ALERT,("Cannot continue.  Exiting!\n"));
1243                 exit(10);
1244         }
1245
1246         if (do_fork && fork()) {
1247                 return 0;
1248         }
1249
1250         tdb_reopen_all(false);
1251
1252         if (do_fork) {
1253                 if (setsid() == -1) {
1254                         ctdb_die(ctdb, "Failed to setsid()\n");
1255                 }
1256                 close(0);
1257                 if (open("/dev/null", O_RDONLY) != 0) {
1258                         DEBUG(DEBUG_ALERT,(__location__ " Failed to setup stdin on /dev/null\n"));
1259                         exit(11);
1260                 }
1261         }
1262         ignore_signal(SIGPIPE);
1263         ignore_signal(SIGUSR1);
1264
1265         ctdb->ctdbd_pid = getpid();
1266         DEBUG(DEBUG_ERR, ("Starting CTDBD (Version %s) as PID: %u\n",
1267                           CTDB_VERSION_STRING, ctdb->ctdbd_pid));
1268         ctdb_create_pidfile(ctdb->ctdbd_pid);
1269
1270         /* Make sure we log something when the daemon terminates.
1271          * This must be the first exit handler to run (so the last to
1272          * be registered.
1273          */
1274         atexit(print_exit_message);
1275
1276         if (ctdb->do_setsched) {
1277                 /* try to set us up as realtime */
1278                 if (!set_scheduler()) {
1279                         exit(1);
1280                 }
1281                 DEBUG(DEBUG_NOTICE, ("Set real-time scheduler priority\n"));
1282         }
1283
1284         ctdb->ev = tevent_context_init(NULL);
1285         if (ctdb->ev == NULL) {
1286                 DEBUG(DEBUG_ALERT,("tevent_context_init() failed\n"));
1287                 exit(1);
1288         }
1289         tevent_loop_allow_nesting(ctdb->ev);
1290         tevent_set_trace_callback(ctdb->ev, ctdb_tevent_trace, ctdb);
1291         ret = ctdb_init_tevent_logging(ctdb);
1292         if (ret != 0) {
1293                 DEBUG(DEBUG_ALERT,("Failed to initialize TEVENT logging\n"));
1294                 exit(1);
1295         }
1296
1297         /* set up a handler to pick up sigchld */
1298         if (ctdb_init_sigchld(ctdb) == NULL) {
1299                 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD\n"));
1300                 exit(1);
1301         }
1302
1303         ctdb_set_child_logging(ctdb);
1304
1305         TALLOC_FREE(ctdb->srv);
1306         if (srvid_init(ctdb, &ctdb->srv) != 0) {
1307                 DEBUG(DEBUG_CRIT,("Failed to setup message srvid context\n"));
1308                 exit(1);
1309         }
1310
1311         /* initialize statistics collection */
1312         ctdb_statistics_init(ctdb);
1313
1314         /* force initial recovery for election */
1315         ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
1316
1317         ctdb_set_runstate(ctdb, CTDB_RUNSTATE_INIT);
1318         ret = ctdb_event_script(ctdb, CTDB_EVENT_INIT);
1319         if (ret != 0) {
1320                 ctdb_die(ctdb, "Failed to run init event\n");
1321         }
1322         ctdb_run_notification_script(ctdb, "init");
1323
1324         if (strcmp(ctdb->transport, "tcp") == 0) {
1325                 ret = ctdb_tcp_init(ctdb);
1326         }
1327 #ifdef USE_INFINIBAND
1328         if (strcmp(ctdb->transport, "ib") == 0) {
1329                 ret = ctdb_ibw_init(ctdb);
1330         }
1331 #endif
1332         if (ret != 0) {
1333                 DEBUG(DEBUG_ERR,("Failed to initialise transport '%s'\n", ctdb->transport));
1334                 return -1;
1335         }
1336
1337         if (ctdb->methods == NULL) {
1338                 DEBUG(DEBUG_ALERT,(__location__ " Can not initialize transport. ctdb->methods is NULL\n"));
1339                 ctdb_fatal(ctdb, "transport is unavailable. can not initialize.");
1340         }
1341
1342         /* Initialise the transport.  This sets the node address if it
1343          * was not set via the command-line. */
1344         if (ctdb->methods->initialise(ctdb) != 0) {
1345                 ctdb_fatal(ctdb, "transport failed to initialise");
1346         }
1347
1348         ctdb_set_my_pnn(ctdb);
1349
1350         initialise_node_flags(ctdb);
1351
1352         if (ctdb->public_addresses_file) {
1353                 ret = ctdb_set_public_addresses(ctdb, true);
1354                 if (ret == -1) {
1355                         DEBUG(DEBUG_ALERT,("Unable to setup public address list\n"));
1356                         exit(1);
1357                 }
1358         }
1359
1360         ctdb_initialise_vnn_map(ctdb);
1361
1362         /* attach to existing databases */
1363         if (ctdb_attach_databases(ctdb) != 0) {
1364                 ctdb_fatal(ctdb, "Failed to attach to databases\n");
1365         }
1366
1367         /* start frozen, then let the first election sort things out */
1368         if (!ctdb_blocking_freeze(ctdb)) {
1369                 ctdb_fatal(ctdb, "Failed to get initial freeze\n");
1370         }
1371
1372         /* now start accepting clients, only can do this once frozen */
1373         fde = tevent_add_fd(ctdb->ev, ctdb, ctdb->daemon.sd, TEVENT_FD_READ,
1374                             ctdb_accept_client, ctdb);
1375         if (fde == NULL) {
1376                 ctdb_fatal(ctdb, "Failed to add daemon socket to event loop");
1377         }
1378         tevent_fd_set_auto_close(fde);
1379
1380         /* Start the transport */
1381         if (ctdb->methods->start(ctdb) != 0) {
1382                 DEBUG(DEBUG_ALERT,("transport failed to start!\n"));
1383                 ctdb_fatal(ctdb, "transport failed to start");
1384         }
1385
1386         /* Recovery daemon and timed events are started from the
1387          * callback, only after the setup event completes
1388          * successfully.
1389          */
1390         ctdb_set_runstate(ctdb, CTDB_RUNSTATE_SETUP);
1391         ret = ctdb_event_script_callback(ctdb,
1392                                          ctdb,
1393                                          ctdb_setup_event_callback,
1394                                          ctdb,
1395                                          CTDB_EVENT_SETUP,
1396                                          "%s",
1397                                          "");
1398         if (ret != 0) {
1399                 DEBUG(DEBUG_CRIT,("Failed to set up 'setup' event\n"));
1400                 exit(1);
1401         }
1402
1403         lockdown_memory(ctdb->valgrinding);
1404
1405         /* go into a wait loop to allow other nodes to complete */
1406         tevent_loop_wait(ctdb->ev);
1407
1408         DEBUG(DEBUG_CRIT,("event_loop_wait() returned. this should not happen\n"));
1409         exit(1);
1410 }
1411
1412 /*
1413   allocate a packet for use in daemon<->daemon communication
1414  */
1415 struct ctdb_req_header *_ctdb_transport_allocate(struct ctdb_context *ctdb,
1416                                                  TALLOC_CTX *mem_ctx, 
1417                                                  enum ctdb_operation operation, 
1418                                                  size_t length, size_t slength,
1419                                                  const char *type)
1420 {
1421         int size;
1422         struct ctdb_req_header *hdr;
1423
1424         length = MAX(length, slength);
1425         size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
1426
1427         if (ctdb->methods == NULL) {
1428                 DEBUG(DEBUG_INFO,(__location__ " Unable to allocate transport packet for operation %u of length %u. Transport is DOWN.\n",
1429                          operation, (unsigned)length));
1430                 return NULL;
1431         }
1432
1433         hdr = (struct ctdb_req_header *)ctdb->methods->allocate_pkt(mem_ctx, size);
1434         if (hdr == NULL) {
1435                 DEBUG(DEBUG_ERR,("Unable to allocate transport packet for operation %u of length %u\n",
1436                          operation, (unsigned)length));
1437                 return NULL;
1438         }
1439         talloc_set_name_const(hdr, type);
1440         memset(hdr, 0, slength);
1441         hdr->length       = length;
1442         hdr->operation    = operation;
1443         hdr->ctdb_magic   = CTDB_MAGIC;
1444         hdr->ctdb_version = CTDB_PROTOCOL;
1445         hdr->generation   = ctdb->vnn_map->generation;
1446         hdr->srcnode      = ctdb->pnn;
1447
1448         return hdr;     
1449 }
1450
1451 struct daemon_control_state {
1452         struct daemon_control_state *next, *prev;
1453         struct ctdb_client *client;
1454         struct ctdb_req_control_old *c;
1455         uint32_t reqid;
1456         struct ctdb_node *node;
1457 };
1458
1459 /*
1460   callback when a control reply comes in
1461  */
1462 static void daemon_control_callback(struct ctdb_context *ctdb,
1463                                     int32_t status, TDB_DATA data, 
1464                                     const char *errormsg,
1465                                     void *private_data)
1466 {
1467         struct daemon_control_state *state = talloc_get_type(private_data, 
1468                                                              struct daemon_control_state);
1469         struct ctdb_client *client = state->client;
1470         struct ctdb_reply_control_old *r;
1471         size_t len;
1472         int ret;
1473
1474         /* construct a message to send to the client containing the data */
1475         len = offsetof(struct ctdb_reply_control_old, data) + data.dsize;
1476         if (errormsg) {
1477                 len += strlen(errormsg);
1478         }
1479         r = ctdbd_allocate_pkt(ctdb, state, CTDB_REPLY_CONTROL, len, 
1480                                struct ctdb_reply_control_old);
1481         CTDB_NO_MEMORY_VOID(ctdb, r);
1482
1483         r->hdr.reqid     = state->reqid;
1484         r->status        = status;
1485         r->datalen       = data.dsize;
1486         r->errorlen = 0;
1487         memcpy(&r->data[0], data.dptr, data.dsize);
1488         if (errormsg) {
1489                 r->errorlen = strlen(errormsg);
1490                 memcpy(&r->data[r->datalen], errormsg, r->errorlen);
1491         }
1492
1493         ret = daemon_queue_send(client, &r->hdr);
1494         if (ret != -1) {
1495                 talloc_free(state);
1496         }
1497 }
1498
1499 /*
1500   fail all pending controls to a disconnected node
1501  */
1502 void ctdb_daemon_cancel_controls(struct ctdb_context *ctdb, struct ctdb_node *node)
1503 {
1504         struct daemon_control_state *state;
1505         while ((state = node->pending_controls)) {
1506                 DLIST_REMOVE(node->pending_controls, state);
1507                 daemon_control_callback(ctdb, (uint32_t)-1, tdb_null, 
1508                                         "node is disconnected", state);
1509         }
1510 }
1511
1512 /*
1513   destroy a daemon_control_state
1514  */
1515 static int daemon_control_destructor(struct daemon_control_state *state)
1516 {
1517         if (state->node) {
1518                 DLIST_REMOVE(state->node->pending_controls, state);
1519         }
1520         return 0;
1521 }
1522
1523 /*
1524   this is called when the ctdb daemon received a ctdb request control
1525   from a local client over the unix domain socket
1526  */
1527 static void daemon_request_control_from_client(struct ctdb_client *client, 
1528                                                struct ctdb_req_control_old *c)
1529 {
1530         TDB_DATA data;
1531         int res;
1532         struct daemon_control_state *state;
1533         TALLOC_CTX *tmp_ctx = talloc_new(client);
1534
1535         if (c->hdr.destnode == CTDB_CURRENT_NODE) {
1536                 c->hdr.destnode = client->ctdb->pnn;
1537         }
1538
1539         state = talloc(client, struct daemon_control_state);
1540         CTDB_NO_MEMORY_VOID(client->ctdb, state);
1541
1542         state->client = client;
1543         state->c = talloc_steal(state, c);
1544         state->reqid = c->hdr.reqid;
1545         if (ctdb_validate_pnn(client->ctdb, c->hdr.destnode)) {
1546                 state->node = client->ctdb->nodes[c->hdr.destnode];
1547                 DLIST_ADD(state->node->pending_controls, state);
1548         } else {
1549                 state->node = NULL;
1550         }
1551
1552         talloc_set_destructor(state, daemon_control_destructor);
1553
1554         if (c->flags & CTDB_CTRL_FLAG_NOREPLY) {
1555                 talloc_steal(tmp_ctx, state);
1556         }
1557         
1558         data.dptr = &c->data[0];
1559         data.dsize = c->datalen;
1560         res = ctdb_daemon_send_control(client->ctdb, c->hdr.destnode,
1561                                        c->srvid, c->opcode, client->client_id,
1562                                        c->flags,
1563                                        data, daemon_control_callback,
1564                                        state);
1565         if (res != 0) {
1566                 DEBUG(DEBUG_ERR,(__location__ " Failed to send control to remote node %u\n",
1567                          c->hdr.destnode));
1568         }
1569
1570         talloc_free(tmp_ctx);
1571 }
1572
1573 /*
1574   register a call function
1575 */
1576 int ctdb_daemon_set_call(struct ctdb_context *ctdb, uint32_t db_id,
1577                          ctdb_fn_t fn, int id)
1578 {
1579         struct ctdb_registered_call *call;
1580         struct ctdb_db_context *ctdb_db;
1581
1582         ctdb_db = find_ctdb_db(ctdb, db_id);
1583         if (ctdb_db == NULL) {
1584                 return -1;
1585         }
1586
1587         call = talloc(ctdb_db, struct ctdb_registered_call);
1588         call->fn = fn;
1589         call->id = id;
1590
1591         DLIST_ADD(ctdb_db->calls, call);        
1592         return 0;
1593 }
1594
1595
1596
1597 /*
1598   this local messaging handler is ugly, but is needed to prevent
1599   recursion in ctdb_send_message() when the destination node is the
1600   same as the source node
1601  */
1602 struct ctdb_local_message {
1603         struct ctdb_context *ctdb;
1604         uint64_t srvid;
1605         TDB_DATA data;
1606 };
1607
1608 static void ctdb_local_message_trigger(struct tevent_context *ev,
1609                                        struct tevent_timer *te,
1610                                        struct timeval t, void *private_data)
1611 {
1612         struct ctdb_local_message *m = talloc_get_type(
1613                 private_data, struct ctdb_local_message);
1614
1615         srvid_dispatch(m->ctdb->srv, m->srvid, CTDB_SRVID_ALL, m->data);
1616         talloc_free(m);
1617 }
1618
1619 static int ctdb_local_message(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data)
1620 {
1621         struct ctdb_local_message *m;
1622         m = talloc(ctdb, struct ctdb_local_message);
1623         CTDB_NO_MEMORY(ctdb, m);
1624
1625         m->ctdb = ctdb;
1626         m->srvid = srvid;
1627         m->data  = data;
1628         m->data.dptr = talloc_memdup(m, m->data.dptr, m->data.dsize);
1629         if (m->data.dptr == NULL) {
1630                 talloc_free(m);
1631                 return -1;
1632         }
1633
1634         /* this needs to be done as an event to prevent recursion */
1635         tevent_add_timer(ctdb->ev, m, timeval_zero(),
1636                          ctdb_local_message_trigger, m);
1637         return 0;
1638 }
1639
1640 /*
1641   send a ctdb message
1642 */
1643 int ctdb_daemon_send_message(struct ctdb_context *ctdb, uint32_t pnn,
1644                              uint64_t srvid, TDB_DATA data)
1645 {
1646         struct ctdb_req_message_old *r;
1647         int len;
1648
1649         if (ctdb->methods == NULL) {
1650                 DEBUG(DEBUG_INFO,(__location__ " Failed to send message. Transport is DOWN\n"));
1651                 return -1;
1652         }
1653
1654         /* see if this is a message to ourselves */
1655         if (pnn == ctdb->pnn) {
1656                 return ctdb_local_message(ctdb, srvid, data);
1657         }
1658
1659         len = offsetof(struct ctdb_req_message_old, data) + data.dsize;
1660         r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_MESSAGE, len,
1661                                     struct ctdb_req_message_old);
1662         CTDB_NO_MEMORY(ctdb, r);
1663
1664         r->hdr.destnode  = pnn;
1665         r->srvid         = srvid;
1666         r->datalen       = data.dsize;
1667         memcpy(&r->data[0], data.dptr, data.dsize);
1668
1669         ctdb_queue_packet(ctdb, &r->hdr);
1670
1671         talloc_free(r);
1672         return 0;
1673 }
1674
1675
1676
1677 struct ctdb_client_notify_list {
1678         struct ctdb_client_notify_list *next, *prev;
1679         struct ctdb_context *ctdb;
1680         uint64_t srvid;
1681         TDB_DATA data;
1682 };
1683
1684
1685 static int ctdb_client_notify_destructor(struct ctdb_client_notify_list *nl)
1686 {
1687         int ret;
1688
1689         DEBUG(DEBUG_ERR,("Sending client notify message for srvid:%llu\n", (unsigned long long)nl->srvid));
1690
1691         ret = ctdb_daemon_send_message(nl->ctdb, CTDB_BROADCAST_CONNECTED, (unsigned long long)nl->srvid, nl->data);
1692         if (ret != 0) {
1693                 DEBUG(DEBUG_ERR,("Failed to send client notify message\n"));
1694         }
1695
1696         return 0;
1697 }
1698
1699 int32_t ctdb_control_register_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1700 {
1701         struct ctdb_notify_data_old *notify = (struct ctdb_notify_data_old *)indata.dptr;
1702         struct ctdb_client *client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
1703         struct ctdb_client_notify_list *nl;
1704
1705         DEBUG(DEBUG_INFO,("Register srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
1706
1707         if (indata.dsize < offsetof(struct ctdb_notify_data_old, notify_data)) {
1708                 DEBUG(DEBUG_ERR,(__location__ " Too little data in control : %d\n", (int)indata.dsize));
1709                 return -1;
1710         }
1711
1712         if (indata.dsize != (notify->len + offsetof(struct ctdb_notify_data_old, notify_data))) {
1713                 DEBUG(DEBUG_ERR,(__location__ " Wrong amount of data in control. Got %d, expected %d\n", (int)indata.dsize, (int)(notify->len + offsetof(struct ctdb_notify_data_old, notify_data))));
1714                 return -1;
1715         }
1716
1717
1718         if (client == NULL) {
1719                 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1720                 return -1;
1721         }
1722
1723         for(nl=client->notify; nl; nl=nl->next) {
1724                 if (nl->srvid == notify->srvid) {
1725                         break;
1726                 }
1727         }
1728         if (nl != NULL) {
1729                 DEBUG(DEBUG_ERR,(__location__ " Notification for srvid:%llu already exists for this client\n", (unsigned long long)notify->srvid));
1730                 return -1;
1731         }
1732
1733         nl = talloc(client, struct ctdb_client_notify_list);
1734         CTDB_NO_MEMORY(ctdb, nl);
1735         nl->ctdb       = ctdb;
1736         nl->srvid      = notify->srvid;
1737         nl->data.dsize = notify->len;
1738         nl->data.dptr  = talloc_memdup(nl, notify->notify_data,
1739                                        nl->data.dsize);
1740         CTDB_NO_MEMORY(ctdb, nl->data.dptr);
1741         
1742         DLIST_ADD(client->notify, nl);
1743         talloc_set_destructor(nl, ctdb_client_notify_destructor);
1744
1745         return 0;
1746 }
1747
1748 int32_t ctdb_control_deregister_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1749 {
1750         uint64_t srvid = *(uint64_t *)indata.dptr;
1751         struct ctdb_client *client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
1752         struct ctdb_client_notify_list *nl;
1753
1754         DEBUG(DEBUG_INFO,("Deregister srvid %llu for client %d\n", (unsigned long long)srvid, client_id));
1755
1756         if (client == NULL) {
1757                 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1758                 return -1;
1759         }
1760
1761         for(nl=client->notify; nl; nl=nl->next) {
1762                 if (nl->srvid == srvid) {
1763                         break;
1764                 }
1765         }
1766         if (nl == NULL) {
1767                 DEBUG(DEBUG_ERR,(__location__ " No notification for srvid:%llu found for this client\n", (unsigned long long)srvid));
1768                 return -1;
1769         }
1770
1771         DLIST_REMOVE(client->notify, nl);
1772         talloc_set_destructor(nl, NULL);
1773         talloc_free(nl);
1774
1775         return 0;
1776 }
1777
1778 struct ctdb_client *ctdb_find_client_by_pid(struct ctdb_context *ctdb, pid_t pid)
1779 {
1780         struct ctdb_client_pid_list *client_pid;
1781
1782         for (client_pid = ctdb->client_pids; client_pid; client_pid=client_pid->next) {
1783                 if (client_pid->pid == pid) {
1784                         return client_pid->client;
1785                 }
1786         }
1787         return NULL;
1788 }
1789
1790
1791 /* This control is used by samba when probing if a process (of a samba daemon)
1792    exists on the node.
1793    Samba does this when it needs/wants to check if a subrecord in one of the
1794    databases is still valied, or if it is stale and can be removed.
1795    If the node is in unhealthy or stopped state we just kill of the samba
1796    process holding htis sub-record and return to the calling samba that
1797    the process does not exist.
1798    This allows us to forcefully recall subrecords registered by samba processes
1799    on banned and stopped nodes.
1800 */
1801 int32_t ctdb_control_process_exists(struct ctdb_context *ctdb, pid_t pid)
1802 {
1803         struct ctdb_client *client;
1804
1805         if (ctdb->nodes[ctdb->pnn]->flags & (NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
1806                 client = ctdb_find_client_by_pid(ctdb, pid);
1807                 if (client != NULL) {
1808                         DEBUG(DEBUG_NOTICE,(__location__ " Killing client with pid:%d on banned/stopped node\n", (int)pid));
1809                         talloc_free(client);
1810                 }
1811                 return -1;
1812         }
1813
1814         return kill(pid, 0);
1815 }
1816
1817 int ctdb_control_getnodesfile(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
1818 {
1819         struct ctdb_node_map_old *node_map = NULL;
1820
1821         CHECK_CONTROL_DATA_SIZE(0);
1822
1823         node_map = ctdb_read_nodes_file(ctdb, ctdb->nodes_file);
1824         if (node_map == NULL) {
1825                 DEBUG(DEBUG_ERR, ("Failed to read nodes file\n"));
1826                 return -1;
1827         }
1828
1829         outdata->dptr  = (unsigned char *)node_map;
1830         outdata->dsize = talloc_get_size(outdata->dptr);
1831
1832         return 0;
1833 }
1834
1835 void ctdb_shutdown_sequence(struct ctdb_context *ctdb, int exit_code)
1836 {
1837         if (ctdb->runstate == CTDB_RUNSTATE_SHUTDOWN) {
1838                 DEBUG(DEBUG_NOTICE,("Already shutting down so will not proceed.\n"));
1839                 return;
1840         }
1841
1842         DEBUG(DEBUG_NOTICE,("Shutdown sequence commencing.\n"));
1843         ctdb_set_runstate(ctdb, CTDB_RUNSTATE_SHUTDOWN);
1844         ctdb_stop_recoverd(ctdb);
1845         ctdb_stop_keepalive(ctdb);
1846         ctdb_stop_monitoring(ctdb);
1847         ctdb_release_all_ips(ctdb);
1848         ctdb_event_script(ctdb, CTDB_EVENT_SHUTDOWN);
1849         if (ctdb->methods != NULL && ctdb->methods->shutdown != NULL) {
1850                 ctdb->methods->shutdown(ctdb);
1851         }
1852
1853         DEBUG(DEBUG_NOTICE,("Shutdown sequence complete, exiting.\n"));
1854         exit(exit_code);
1855 }
1856
1857 /* When forking the main daemon and the child process needs to connect
1858  * back to the daemon as a client process, this function can be used
1859  * to change the ctdb context from daemon into client mode.  The child
1860  * process must be created using ctdb_fork() and not fork() -
1861  * ctdb_fork() does some necessary housekeeping.
1862  */
1863 int switch_from_server_to_client(struct ctdb_context *ctdb, const char *fmt, ...)
1864 {
1865         int ret;
1866         va_list ap;
1867
1868         /* Add extra information so we can identify this in the logs */
1869         va_start(ap, fmt);
1870         debug_extra = talloc_strdup_append(talloc_vasprintf(NULL, fmt, ap), ":");
1871         va_end(ap);
1872
1873         /* get a new event context */
1874         ctdb->ev = tevent_context_init(ctdb);
1875         if (ctdb->ev == NULL) {
1876                 DEBUG(DEBUG_ALERT,("tevent_context_init() failed\n"));
1877                 exit(1);
1878         }
1879         tevent_loop_allow_nesting(ctdb->ev);
1880
1881         /* Connect to main CTDB daemon */
1882         ret = ctdb_socket_connect(ctdb);
1883         if (ret != 0) {
1884                 DEBUG(DEBUG_ALERT, (__location__ " Failed to init ctdb client\n"));
1885                 return -1;
1886         }
1887
1888         ctdb->can_send_controls = true;
1889
1890         return 0;
1891 }