ctdb-daemon: Send STARTUP control after startup event
[samba.git] / ctdb / server / ctdb_daemon.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2006
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "replace.h"
21 #include "system/network.h"
22 #include "system/filesys.h"
23 #include "system/wait.h"
24 #include "system/time.h"
25
26 #include <talloc.h>
27 /* Allow use of deprecated function tevent_loop_allow_nesting() */
28 #define TEVENT_DEPRECATED
29 #include <tevent.h>
30 #include <tdb.h>
31
32 #include "lib/tdb_wrap/tdb_wrap.h"
33 #include "lib/util/dlinklist.h"
34 #include "lib/util/debug.h"
35 #include "lib/util/time.h"
36 #include "lib/util/blocking.h"
37 #include "lib/util/become_daemon.h"
38
39 #include "common/version.h"
40 #include "ctdb_private.h"
41 #include "ctdb_client.h"
42
43 #include "common/rb_tree.h"
44 #include "common/reqid.h"
45 #include "common/system.h"
46 #include "common/common.h"
47 #include "common/logging.h"
48 #include "common/pidfile.h"
49 #include "common/sock_io.h"
50
51 struct ctdb_client_pid_list {
52         struct ctdb_client_pid_list *next, *prev;
53         struct ctdb_context *ctdb;
54         pid_t pid;
55         struct ctdb_client *client;
56 };
57
58 const char *ctdbd_pidfile = NULL;
59 static struct pidfile_context *ctdbd_pidfile_ctx = NULL;
60
61 static void daemon_incoming_packet(void *, struct ctdb_req_header *);
62
63 static pid_t __ctdbd_pid;
64
65 static void print_exit_message(void)
66 {
67         if (getpid() == __ctdbd_pid) {
68                 DEBUG(DEBUG_NOTICE,("CTDB daemon shutting down\n"));
69
70                 /* Wait a second to allow pending log messages to be flushed */
71                 sleep(1);
72         }
73 }
74
75
76
77 static void ctdb_time_tick(struct tevent_context *ev, struct tevent_timer *te,
78                                   struct timeval t, void *private_data)
79 {
80         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
81
82         if (getpid() != ctdb->ctdbd_pid) {
83                 return;
84         }
85
86         tevent_add_timer(ctdb->ev, ctdb,
87                          timeval_current_ofs(1, 0),
88                          ctdb_time_tick, ctdb);
89 }
90
91 /* Used to trigger a dummy event once per second, to make
92  * detection of hangs more reliable.
93  */
94 static void ctdb_start_time_tickd(struct ctdb_context *ctdb)
95 {
96         tevent_add_timer(ctdb->ev, ctdb,
97                          timeval_current_ofs(1, 0),
98                          ctdb_time_tick, ctdb);
99 }
100
101 static void ctdb_start_periodic_events(struct ctdb_context *ctdb)
102 {
103         /* start monitoring for connected/disconnected nodes */
104         ctdb_start_keepalive(ctdb);
105
106         /* start periodic update of tcp tickle lists */
107         ctdb_start_tcp_tickle_update(ctdb);
108
109         /* start listening for recovery daemon pings */
110         ctdb_control_recd_ping(ctdb);
111
112         /* start listening to timer ticks */
113         ctdb_start_time_tickd(ctdb);
114 }
115
116 static void ignore_signal(int signum)
117 {
118         struct sigaction act;
119
120         memset(&act, 0, sizeof(act));
121
122         act.sa_handler = SIG_IGN;
123         sigemptyset(&act.sa_mask);
124         sigaddset(&act.sa_mask, signum);
125         sigaction(signum, &act, NULL);
126 }
127
128
129 /*
130   send a packet to a client
131  */
132 static int daemon_queue_send(struct ctdb_client *client, struct ctdb_req_header *hdr)
133 {
134         CTDB_INCREMENT_STAT(client->ctdb, client_packets_sent);
135         if (hdr->operation == CTDB_REQ_MESSAGE) {
136                 if (ctdb_queue_length(client->queue) > client->ctdb->tunable.max_queue_depth_drop_msg) {
137                         DEBUG(DEBUG_ERR,("CTDB_REQ_MESSAGE queue full - killing client connection.\n"));
138                         talloc_free(client);
139                         return -1;
140                 }
141         }
142         return ctdb_queue_send(client->queue, (uint8_t *)hdr, hdr->length);
143 }
144
145 /*
146   message handler for when we are in daemon mode. This redirects the message
147   to the right client
148  */
149 static void daemon_message_handler(uint64_t srvid, TDB_DATA data,
150                                    void *private_data)
151 {
152         struct ctdb_client *client = talloc_get_type(private_data, struct ctdb_client);
153         struct ctdb_req_message_old *r;
154         int len;
155
156         /* construct a message to send to the client containing the data */
157         len = offsetof(struct ctdb_req_message_old, data) + data.dsize;
158         r = ctdbd_allocate_pkt(client->ctdb, client->ctdb, CTDB_REQ_MESSAGE,
159                                len, struct ctdb_req_message_old);
160         CTDB_NO_MEMORY_VOID(client->ctdb, r);
161
162         talloc_set_name_const(r, "req_message packet");
163
164         r->srvid         = srvid;
165         r->datalen       = data.dsize;
166         memcpy(&r->data[0], data.dptr, data.dsize);
167
168         daemon_queue_send(client, &r->hdr);
169
170         talloc_free(r);
171 }
172
173 /*
174   this is called when the ctdb daemon received a ctdb request to 
175   set the srvid from the client
176  */
177 int daemon_register_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
178 {
179         struct ctdb_client *client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
180         int res;
181         if (client == NULL) {
182                 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_register_message_handler\n"));
183                 return -1;
184         }
185         res = srvid_register(ctdb->srv, client, srvid, daemon_message_handler,
186                              client);
187         if (res != 0) {
188                 DEBUG(DEBUG_ERR,(__location__ " Failed to register handler %llu in daemon\n", 
189                          (unsigned long long)srvid));
190         } else {
191                 DEBUG(DEBUG_INFO,(__location__ " Registered message handler for srvid=%llu\n", 
192                          (unsigned long long)srvid));
193         }
194
195         return res;
196 }
197
198 /*
199   this is called when the ctdb daemon received a ctdb request to 
200   remove a srvid from the client
201  */
202 int daemon_deregister_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
203 {
204         struct ctdb_client *client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
205         if (client == NULL) {
206                 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_deregister_message_handler\n"));
207                 return -1;
208         }
209         return srvid_deregister(ctdb->srv, srvid, client);
210 }
211
212 void daemon_tunnel_handler(uint64_t tunnel_id, TDB_DATA data,
213                            void *private_data)
214 {
215         struct ctdb_client *client =
216                 talloc_get_type_abort(private_data, struct ctdb_client);
217         struct ctdb_req_tunnel_old *c, *pkt;
218         size_t len;
219
220         pkt = (struct ctdb_req_tunnel_old *)data.dptr;
221
222         len = offsetof(struct ctdb_req_tunnel_old, data) + pkt->datalen;
223         c = ctdbd_allocate_pkt(client->ctdb, client->ctdb, CTDB_REQ_TUNNEL,
224                                len, struct ctdb_req_tunnel_old);
225         if (c == NULL) {
226                 DEBUG(DEBUG_ERR, ("Memory error in daemon_tunnel_handler\n"));
227                 return;
228         }
229
230         talloc_set_name_const(c, "req_tunnel packet");
231
232         c->tunnel_id = tunnel_id;
233         c->flags = pkt->flags;
234         c->datalen = pkt->datalen;
235         memcpy(c->data, pkt->data, pkt->datalen);
236
237         daemon_queue_send(client, &c->hdr);
238
239         talloc_free(c);
240 }
241
242 /*
243   destroy a ctdb_client
244 */
245 static int ctdb_client_destructor(struct ctdb_client *client)
246 {
247         struct ctdb_db_context *ctdb_db;
248
249         ctdb_takeover_client_destructor_hook(client);
250         reqid_remove(client->ctdb->idr, client->client_id);
251         client->ctdb->num_clients--;
252
253         if (client->num_persistent_updates != 0) {
254                 DEBUG(DEBUG_ERR,(__location__ " Client disconnecting with %u persistent updates in flight. Starting recovery\n", client->num_persistent_updates));
255                 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
256         }
257         ctdb_db = find_ctdb_db(client->ctdb, client->db_id);
258         if (ctdb_db) {
259                 DEBUG(DEBUG_ERR, (__location__ " client exit while transaction "
260                                   "commit active. Forcing recovery.\n"));
261                 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
262
263                 /*
264                  * trans3 transaction state:
265                  *
266                  * The destructor sets the pointer to NULL.
267                  */
268                 talloc_free(ctdb_db->persistent_state);
269         }
270
271         return 0;
272 }
273
274
275 /*
276   this is called when the ctdb daemon received a ctdb request message
277   from a local client over the unix domain socket
278  */
279 static void daemon_request_message_from_client(struct ctdb_client *client, 
280                                                struct ctdb_req_message_old *c)
281 {
282         TDB_DATA data;
283         int res;
284
285         if (c->hdr.destnode == CTDB_CURRENT_NODE) {
286                 c->hdr.destnode = ctdb_get_pnn(client->ctdb);
287         }
288
289         /* maybe the message is for another client on this node */
290         if (ctdb_get_pnn(client->ctdb)==c->hdr.destnode) {
291                 ctdb_request_message(client->ctdb, (struct ctdb_req_header *)c);
292                 return;
293         }
294
295         /* its for a remote node */
296         data.dptr = &c->data[0];
297         data.dsize = c->datalen;
298         res = ctdb_daemon_send_message(client->ctdb, c->hdr.destnode,
299                                        c->srvid, data);
300         if (res != 0) {
301                 DEBUG(DEBUG_ERR,(__location__ " Failed to send message to remote node %u\n",
302                          c->hdr.destnode));
303         }
304 }
305
306
307 struct daemon_call_state {
308         struct ctdb_client *client;
309         uint32_t reqid;
310         struct ctdb_call *call;
311         struct timeval start_time;
312
313         /* readonly request ? */
314         uint32_t readonly_fetch;
315         uint32_t client_callid;
316 };
317
318 /* 
319    complete a call from a client 
320 */
321 static void daemon_call_from_client_callback(struct ctdb_call_state *state)
322 {
323         struct daemon_call_state *dstate = talloc_get_type(state->async.private_data, 
324                                                            struct daemon_call_state);
325         struct ctdb_reply_call_old *r;
326         int res;
327         uint32_t length;
328         struct ctdb_client *client = dstate->client;
329         struct ctdb_db_context *ctdb_db = state->ctdb_db;
330
331         talloc_steal(client, dstate);
332         talloc_steal(dstate, dstate->call);
333
334         res = ctdb_daemon_call_recv(state, dstate->call);
335         if (res != 0) {
336                 DEBUG(DEBUG_ERR, (__location__ " ctdbd_call_recv() returned error\n"));
337                 CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
338
339                 CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 1", call_latency, dstate->start_time);
340                 return;
341         }
342
343         length = offsetof(struct ctdb_reply_call_old, data) + dstate->call->reply_data.dsize;
344         /* If the client asked for readonly FETCH, we remapped this to 
345            FETCH_WITH_HEADER when calling the daemon. So we must
346            strip the extra header off the reply data before passing
347            it back to the client.
348         */
349         if (dstate->readonly_fetch
350         && dstate->client_callid == CTDB_FETCH_FUNC) {
351                 length -= sizeof(struct ctdb_ltdb_header);
352         }
353
354         r = ctdbd_allocate_pkt(client->ctdb, dstate, CTDB_REPLY_CALL, 
355                                length, struct ctdb_reply_call_old);
356         if (r == NULL) {
357                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate reply_call in ctdb daemon\n"));
358                 CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
359                 CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 2", call_latency, dstate->start_time);
360                 return;
361         }
362         r->hdr.reqid        = dstate->reqid;
363         r->status           = dstate->call->status;
364
365         if (dstate->readonly_fetch
366         && dstate->client_callid == CTDB_FETCH_FUNC) {
367                 /* client only asked for a FETCH so we must strip off
368                    the extra ctdb_ltdb header
369                 */
370                 r->datalen          = dstate->call->reply_data.dsize - sizeof(struct ctdb_ltdb_header);
371                 memcpy(&r->data[0], dstate->call->reply_data.dptr + sizeof(struct ctdb_ltdb_header), r->datalen);
372         } else {
373                 r->datalen          = dstate->call->reply_data.dsize;
374                 memcpy(&r->data[0], dstate->call->reply_data.dptr, r->datalen);
375         }
376
377         res = daemon_queue_send(client, &r->hdr);
378         if (res == -1) {
379                 /* client is dead - return immediately */
380                 return;
381         }
382         if (res != 0) {
383                 DEBUG(DEBUG_ERR, (__location__ " Failed to queue packet from daemon to client\n"));
384         }
385         CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 3", call_latency, dstate->start_time);
386         CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
387         talloc_free(dstate);
388 }
389
390 struct ctdb_daemon_packet_wrap {
391         struct ctdb_context *ctdb;
392         uint32_t client_id;
393 };
394
395 /*
396   a wrapper to catch disconnected clients
397  */
398 static void daemon_incoming_packet_wrap(void *p, struct ctdb_req_header *hdr)
399 {
400         struct ctdb_client *client;
401         struct ctdb_daemon_packet_wrap *w = talloc_get_type(p, 
402                                                             struct ctdb_daemon_packet_wrap);
403         if (w == NULL) {
404                 DEBUG(DEBUG_CRIT,(__location__ " Bad packet type '%s'\n", talloc_get_name(p)));
405                 return;
406         }
407
408         client = reqid_find(w->ctdb->idr, w->client_id, struct ctdb_client);
409         if (client == NULL) {
410                 DEBUG(DEBUG_ERR,(__location__ " Packet for disconnected client %u\n",
411                          w->client_id));
412                 talloc_free(w);
413                 return;
414         }
415         talloc_free(w);
416
417         /* process it */
418         daemon_incoming_packet(client, hdr);    
419 }
420
421 struct ctdb_deferred_fetch_call {
422         struct ctdb_deferred_fetch_call *next, *prev;
423         struct ctdb_req_call_old *c;
424         struct ctdb_daemon_packet_wrap *w;
425 };
426
427 struct ctdb_deferred_fetch_queue {
428         struct ctdb_deferred_fetch_call *deferred_calls;
429 };
430
431 struct ctdb_deferred_requeue {
432         struct ctdb_deferred_fetch_call *dfc;
433         struct ctdb_client *client;
434 };
435
436 /* called from a timer event and starts reprocessing the deferred call.*/
437 static void reprocess_deferred_call(struct tevent_context *ev,
438                                     struct tevent_timer *te,
439                                     struct timeval t, void *private_data)
440 {
441         struct ctdb_deferred_requeue *dfr = (struct ctdb_deferred_requeue *)private_data;
442         struct ctdb_client *client = dfr->client;
443
444         talloc_steal(client, dfr->dfc->c);
445         daemon_incoming_packet(client, (struct ctdb_req_header *)dfr->dfc->c);
446         talloc_free(dfr);
447 }
448
449 /* the referral context is destroyed either after a timeout or when the initial
450    fetch-lock has finished.
451    at this stage, immediately start reprocessing the queued up deferred
452    calls so they get reprocessed immediately (and since we are dmaster at
453    this stage, trigger the waiting smbd processes to pick up and aquire the
454    record right away.
455 */
456 static int deferred_fetch_queue_destructor(struct ctdb_deferred_fetch_queue *dfq)
457 {
458
459         /* need to reprocess the packets from the queue explicitely instead of
460            just using a normal destructor since we want, need, to
461            call the clients in the same oder as the requests queued up
462         */
463         while (dfq->deferred_calls != NULL) {
464                 struct ctdb_client *client;
465                 struct ctdb_deferred_fetch_call *dfc = dfq->deferred_calls;
466                 struct ctdb_deferred_requeue *dfr;
467
468                 DLIST_REMOVE(dfq->deferred_calls, dfc);
469
470                 client = reqid_find(dfc->w->ctdb->idr, dfc->w->client_id, struct ctdb_client);
471                 if (client == NULL) {
472                         DEBUG(DEBUG_ERR,(__location__ " Packet for disconnected client %u\n",
473                                  dfc->w->client_id));
474                         continue;
475                 }
476
477                 /* process it by pushing it back onto the eventloop */
478                 dfr = talloc(client, struct ctdb_deferred_requeue);
479                 if (dfr == NULL) {
480                         DEBUG(DEBUG_ERR,("Failed to allocate deferred fetch requeue structure\n"));
481                         continue;
482                 }
483
484                 dfr->dfc    = talloc_steal(dfr, dfc);
485                 dfr->client = client;
486
487                 tevent_add_timer(dfc->w->ctdb->ev, client, timeval_zero(),
488                                  reprocess_deferred_call, dfr);
489         }
490
491         return 0;
492 }
493
494 /* insert the new deferral context into the rb tree.
495    there should never be a pre-existing context here, but check for it
496    warn and destroy the previous context if there is already a deferral context
497    for this key.
498 */
499 static void *insert_dfq_callback(void *parm, void *data)
500 {
501         if (data) {
502                 DEBUG(DEBUG_ERR,("Already have DFQ registered. Free old %p and create new %p\n", data, parm));
503                 talloc_free(data);
504         }
505         return parm;
506 }
507
508 /* if the original fetch-lock did not complete within a reasonable time,
509    free the context and context for all deferred requests to cause them to be
510    re-inserted into the event system.
511 */
512 static void dfq_timeout(struct tevent_context *ev, struct tevent_timer *te,
513                         struct timeval t, void *private_data)
514 {
515         talloc_free(private_data);
516 }
517
518 /* This function is used in the local daemon to register a KEY in a database
519    for being "fetched"
520    While the remote fetch is in-flight, any futher attempts to re-fetch the
521    same record will be deferred until the fetch completes.
522 */
523 static int setup_deferred_fetch_locks(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
524 {
525         uint32_t *k;
526         struct ctdb_deferred_fetch_queue *dfq;
527
528         k = ctdb_key_to_idkey(call, call->key);
529         if (k == NULL) {
530                 DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch\n"));
531                 return -1;
532         }
533
534         dfq  = talloc(call, struct ctdb_deferred_fetch_queue);
535         if (dfq == NULL) {
536                 DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch queue structure\n"));
537                 talloc_free(k);
538                 return -1;
539         }
540         dfq->deferred_calls = NULL;
541
542         trbt_insertarray32_callback(ctdb_db->deferred_fetch, k[0], &k[0], insert_dfq_callback, dfq);
543
544         talloc_set_destructor(dfq, deferred_fetch_queue_destructor);
545
546         /* if the fetch havent completed in 30 seconds, just tear it all down
547            and let it try again as the events are reissued */
548         tevent_add_timer(ctdb_db->ctdb->ev, dfq, timeval_current_ofs(30, 0),
549                          dfq_timeout, dfq);
550
551         talloc_free(k);
552         return 0;
553 }
554
555 /* check if this is a duplicate request to a fetch already in-flight
556    if it is, make this call deferred to be reprocessed later when
557    the in-flight fetch completes.
558 */
559 static int requeue_duplicate_fetch(struct ctdb_db_context *ctdb_db, struct ctdb_client *client, TDB_DATA key, struct ctdb_req_call_old *c)
560 {
561         uint32_t *k;
562         struct ctdb_deferred_fetch_queue *dfq;
563         struct ctdb_deferred_fetch_call *dfc;
564
565         k = ctdb_key_to_idkey(c, key);
566         if (k == NULL) {
567                 DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch\n"));
568                 return -1;
569         }
570
571         dfq = trbt_lookuparray32(ctdb_db->deferred_fetch, k[0], &k[0]);
572         if (dfq == NULL) {
573                 talloc_free(k);
574                 return -1;
575         }
576
577
578         talloc_free(k);
579
580         dfc = talloc(dfq, struct ctdb_deferred_fetch_call);
581         if (dfc == NULL) {
582                 DEBUG(DEBUG_ERR, ("Failed to allocate deferred fetch call structure\n"));
583                 return -1;
584         }
585
586         dfc->w = talloc(dfc, struct ctdb_daemon_packet_wrap);
587         if (dfc->w == NULL) {
588                 DEBUG(DEBUG_ERR,("Failed to allocate deferred fetch daemon packet wrap structure\n"));
589                 talloc_free(dfc);
590                 return -1;
591         }
592
593         dfc->c = talloc_steal(dfc, c);
594         dfc->w->ctdb = ctdb_db->ctdb;
595         dfc->w->client_id = client->client_id;
596
597         DLIST_ADD_END(dfq->deferred_calls, dfc);
598
599         return 0;
600 }
601
602
603 /*
604   this is called when the ctdb daemon received a ctdb request call
605   from a local client over the unix domain socket
606  */
607 static void daemon_request_call_from_client(struct ctdb_client *client, 
608                                             struct ctdb_req_call_old *c)
609 {
610         struct ctdb_call_state *state;
611         struct ctdb_db_context *ctdb_db;
612         struct daemon_call_state *dstate;
613         struct ctdb_call *call;
614         struct ctdb_ltdb_header header;
615         TDB_DATA key, data;
616         int ret;
617         struct ctdb_context *ctdb = client->ctdb;
618         struct ctdb_daemon_packet_wrap *w;
619
620         CTDB_INCREMENT_STAT(ctdb, total_calls);
621         CTDB_INCREMENT_STAT(ctdb, pending_calls);
622
623         ctdb_db = find_ctdb_db(client->ctdb, c->db_id);
624         if (!ctdb_db) {
625                 DEBUG(DEBUG_ERR, (__location__ " Unknown database in request. db_id==0x%08x",
626                           c->db_id));
627                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
628                 return;
629         }
630
631         if (ctdb_db->unhealthy_reason) {
632                 /*
633                  * this is just a warning, as the tdb should be empty anyway,
634                  * and only persistent databases can be unhealthy, which doesn't
635                  * use this code patch
636                  */
637                 DEBUG(DEBUG_WARNING,("warn: db(%s) unhealty in daemon_request_call_from_client(): %s\n",
638                                      ctdb_db->db_name, ctdb_db->unhealthy_reason));
639         }
640
641         key.dptr = c->data;
642         key.dsize = c->keylen;
643
644         w = talloc(ctdb, struct ctdb_daemon_packet_wrap);
645         CTDB_NO_MEMORY_VOID(ctdb, w);   
646
647         w->ctdb = ctdb;
648         w->client_id = client->client_id;
649
650         ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header, 
651                                            (struct ctdb_req_header *)c, &data,
652                                            daemon_incoming_packet_wrap, w, true);
653         if (ret == -2) {
654                 /* will retry later */
655                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
656                 return;
657         }
658
659         talloc_free(w);
660
661         if (ret != 0) {
662                 DEBUG(DEBUG_ERR,(__location__ " Unable to fetch record\n"));
663                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
664                 return;
665         }
666
667
668         /* check if this fetch request is a duplicate for a
669            request we already have in flight. If so defer it until
670            the first request completes.
671         */
672         if (ctdb->tunable.fetch_collapse == 1) {
673                 if (requeue_duplicate_fetch(ctdb_db, client, key, c) == 0) {
674                         ret = ctdb_ltdb_unlock(ctdb_db, key);
675                         if (ret != 0) {
676                                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
677                         }
678                         CTDB_DECREMENT_STAT(ctdb, pending_calls);
679                         talloc_free(data.dptr);
680                         return;
681                 }
682         }
683
684         /* Dont do READONLY if we don't have a tracking database */
685         if ((c->flags & CTDB_WANT_READONLY) && !ctdb_db_readonly(ctdb_db)) {
686                 c->flags &= ~CTDB_WANT_READONLY;
687         }
688
689         if (header.flags & CTDB_REC_RO_REVOKE_COMPLETE) {
690                 header.flags &= ~CTDB_REC_RO_FLAGS;
691                 CTDB_INCREMENT_STAT(ctdb, total_ro_revokes);
692                 CTDB_INCREMENT_DB_STAT(ctdb_db, db_ro_revokes);
693                 if (ctdb_ltdb_store(ctdb_db, key, &header, data) != 0) {
694                         ctdb_fatal(ctdb, "Failed to write header with cleared REVOKE flag");
695                 }
696                 /* and clear out the tracking data */
697                 if (tdb_delete(ctdb_db->rottdb, key) != 0) {
698                         DEBUG(DEBUG_ERR,(__location__ " Failed to clear out trackingdb record\n"));
699                 }
700         }
701
702         /* if we are revoking, we must defer all other calls until the revoke
703          * had completed.
704          */
705         if (header.flags & CTDB_REC_RO_REVOKING_READONLY) {
706                 talloc_free(data.dptr);
707                 ret = ctdb_ltdb_unlock(ctdb_db, key);
708
709                 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, key, (struct ctdb_req_header *)c, daemon_incoming_packet, client) != 0) {
710                         ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
711                 }
712                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
713                 return;
714         }
715
716         if ((header.dmaster == ctdb->pnn)
717         && (!(c->flags & CTDB_WANT_READONLY))
718         && (header.flags & (CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY)) ) {
719                 header.flags   |= CTDB_REC_RO_REVOKING_READONLY;
720                 if (ctdb_ltdb_store(ctdb_db, key, &header, data) != 0) {
721                         ctdb_fatal(ctdb, "Failed to store record with HAVE_DELEGATIONS set");
722                 }
723                 ret = ctdb_ltdb_unlock(ctdb_db, key);
724
725                 if (ctdb_start_revoke_ro_record(ctdb, ctdb_db, key, &header, data) != 0) {
726                         ctdb_fatal(ctdb, "Failed to start record revoke");
727                 }
728                 talloc_free(data.dptr);
729
730                 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, key, (struct ctdb_req_header *)c, daemon_incoming_packet, client) != 0) {
731                         ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
732                 }
733
734                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
735                 return;
736         }               
737
738         dstate = talloc(client, struct daemon_call_state);
739         if (dstate == NULL) {
740                 ret = ctdb_ltdb_unlock(ctdb_db, key);
741                 if (ret != 0) {
742                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
743                 }
744
745                 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate dstate\n"));
746                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
747                 return;
748         }
749         dstate->start_time = timeval_current();
750         dstate->client = client;
751         dstate->reqid  = c->hdr.reqid;
752         talloc_steal(dstate, data.dptr);
753
754         call = dstate->call = talloc_zero(dstate, struct ctdb_call);
755         if (call == NULL) {
756                 ret = ctdb_ltdb_unlock(ctdb_db, key);
757                 if (ret != 0) {
758                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
759                 }
760
761                 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate call\n"));
762                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
763                 CTDB_UPDATE_LATENCY(ctdb, ctdb_db, "call_from_client 1", call_latency, dstate->start_time);
764                 return;
765         }
766
767         dstate->readonly_fetch = 0;
768         call->call_id = c->callid;
769         call->key = key;
770         call->call_data.dptr = c->data + c->keylen;
771         call->call_data.dsize = c->calldatalen;
772         call->flags = c->flags;
773
774         if (c->flags & CTDB_WANT_READONLY) {
775                 /* client wants readonly record, so translate this into a 
776                    fetch with header. remember what the client asked for
777                    so we can remap the reply back to the proper format for
778                    the client in the reply
779                  */
780                 dstate->client_callid = call->call_id;
781                 call->call_id = CTDB_FETCH_WITH_HEADER_FUNC;
782                 dstate->readonly_fetch = 1;
783         }
784
785         if (header.dmaster == ctdb->pnn) {
786                 state = ctdb_call_local_send(ctdb_db, call, &header, &data);
787         } else {
788                 state = ctdb_daemon_call_send_remote(ctdb_db, call, &header);
789                 if (ctdb->tunable.fetch_collapse == 1) {
790                         /* This request triggered a remote fetch-lock.
791                            set up a deferral for this key so any additional
792                            fetch-locks are deferred until the current one
793                            finishes.
794                          */
795                         setup_deferred_fetch_locks(ctdb_db, call);
796                 }
797         }
798
799         ret = ctdb_ltdb_unlock(ctdb_db, key);
800         if (ret != 0) {
801                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
802         }
803
804         if (state == NULL) {
805                 DEBUG(DEBUG_ERR,(__location__ " Unable to setup call send\n"));
806                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
807                 CTDB_UPDATE_LATENCY(ctdb, ctdb_db, "call_from_client 2", call_latency, dstate->start_time);
808                 return;
809         }
810         talloc_steal(state, dstate);
811         talloc_steal(client, state);
812
813         state->async.fn = daemon_call_from_client_callback;
814         state->async.private_data = dstate;
815 }
816
817
818 static void daemon_request_control_from_client(struct ctdb_client *client, 
819                                                struct ctdb_req_control_old *c);
820 static void daemon_request_tunnel_from_client(struct ctdb_client *client,
821                                               struct ctdb_req_tunnel_old *c);
822
823 /* data contains a packet from the client */
824 static void daemon_incoming_packet(void *p, struct ctdb_req_header *hdr)
825 {
826         struct ctdb_client *client = talloc_get_type(p, struct ctdb_client);
827         TALLOC_CTX *tmp_ctx;
828         struct ctdb_context *ctdb = client->ctdb;
829
830         /* place the packet as a child of a tmp_ctx. We then use
831            talloc_free() below to free it. If any of the calls want
832            to keep it, then they will steal it somewhere else, and the
833            talloc_free() will be a no-op */
834         tmp_ctx = talloc_new(client);
835         talloc_steal(tmp_ctx, hdr);
836
837         if (hdr->ctdb_magic != CTDB_MAGIC) {
838                 ctdb_set_error(client->ctdb, "Non CTDB packet rejected in daemon\n");
839                 goto done;
840         }
841
842         if (hdr->ctdb_version != CTDB_PROTOCOL) {
843                 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
844                 goto done;
845         }
846
847         switch (hdr->operation) {
848         case CTDB_REQ_CALL:
849                 CTDB_INCREMENT_STAT(ctdb, client.req_call);
850                 daemon_request_call_from_client(client, (struct ctdb_req_call_old *)hdr);
851                 break;
852
853         case CTDB_REQ_MESSAGE:
854                 CTDB_INCREMENT_STAT(ctdb, client.req_message);
855                 daemon_request_message_from_client(client, (struct ctdb_req_message_old *)hdr);
856                 break;
857
858         case CTDB_REQ_CONTROL:
859                 CTDB_INCREMENT_STAT(ctdb, client.req_control);
860                 daemon_request_control_from_client(client, (struct ctdb_req_control_old *)hdr);
861                 break;
862
863         case CTDB_REQ_TUNNEL:
864                 CTDB_INCREMENT_STAT(ctdb, client.req_tunnel);
865                 daemon_request_tunnel_from_client(client, (struct ctdb_req_tunnel_old *)hdr);
866                 break;
867
868         default:
869                 DEBUG(DEBUG_CRIT,(__location__ " daemon: unrecognized operation %u\n",
870                          hdr->operation));
871         }
872
873 done:
874         talloc_free(tmp_ctx);
875 }
876
877 /*
878   called when the daemon gets a incoming packet
879  */
880 static void ctdb_daemon_read_cb(uint8_t *data, size_t cnt, void *args)
881 {
882         struct ctdb_client *client = talloc_get_type(args, struct ctdb_client);
883         struct ctdb_req_header *hdr;
884
885         if (cnt == 0) {
886                 talloc_free(client);
887                 return;
888         }
889
890         CTDB_INCREMENT_STAT(client->ctdb, client_packets_recv);
891
892         if (cnt < sizeof(*hdr)) {
893                 ctdb_set_error(client->ctdb, "Bad packet length %u in daemon\n", 
894                                (unsigned)cnt);
895                 return;
896         }
897         hdr = (struct ctdb_req_header *)data;
898         if (cnt != hdr->length) {
899                 ctdb_set_error(client->ctdb, "Bad header length %u expected %u\n in daemon", 
900                                (unsigned)hdr->length, (unsigned)cnt);
901                 return;
902         }
903
904         if (hdr->ctdb_magic != CTDB_MAGIC) {
905                 ctdb_set_error(client->ctdb, "Non CTDB packet rejected\n");
906                 return;
907         }
908
909         if (hdr->ctdb_version != CTDB_PROTOCOL) {
910                 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
911                 return;
912         }
913
914         DEBUG(DEBUG_DEBUG,(__location__ " client request %u of type %u length %u from "
915                  "node %u to %u\n", hdr->reqid, hdr->operation, hdr->length,
916                  hdr->srcnode, hdr->destnode));
917
918         /* it is the responsibility of the incoming packet function to free 'data' */
919         daemon_incoming_packet(client, hdr);
920 }
921
922
923 static int ctdb_clientpid_destructor(struct ctdb_client_pid_list *client_pid)
924 {
925         if (client_pid->ctdb->client_pids != NULL) {
926                 DLIST_REMOVE(client_pid->ctdb->client_pids, client_pid);
927         }
928
929         return 0;
930 }
931
932
933 static void ctdb_accept_client(struct tevent_context *ev,
934                                struct tevent_fd *fde, uint16_t flags,
935                                void *private_data)
936 {
937         struct sockaddr_un addr;
938         socklen_t len;
939         int fd;
940         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
941         struct ctdb_client *client;
942         struct ctdb_client_pid_list *client_pid;
943         pid_t peer_pid = 0;
944         int ret;
945
946         memset(&addr, 0, sizeof(addr));
947         len = sizeof(addr);
948         fd = accept(ctdb->daemon.sd, (struct sockaddr *)&addr, &len);
949         if (fd == -1) {
950                 return;
951         }
952
953         ret = set_blocking(fd, false);
954         if (ret != 0) {
955                 DEBUG(DEBUG_ERR,
956                       (__location__
957                        " failed to set socket non-blocking (%s)\n",
958                        strerror(errno)));
959                 close(fd);
960                 return;
961         }
962
963         set_close_on_exec(fd);
964
965         DEBUG(DEBUG_DEBUG,(__location__ " Created SOCKET FD:%d to connected child\n", fd));
966
967         client = talloc_zero(ctdb, struct ctdb_client);
968         if (ctdb_get_peer_pid(fd, &peer_pid) == 0) {
969                 DEBUG(DEBUG_INFO,("Connected client with pid:%u\n", (unsigned)peer_pid));
970         }
971
972         client->ctdb = ctdb;
973         client->fd = fd;
974         client->client_id = reqid_new(ctdb->idr, client);
975         client->pid = peer_pid;
976
977         client_pid = talloc(client, struct ctdb_client_pid_list);
978         if (client_pid == NULL) {
979                 DEBUG(DEBUG_ERR,("Failed to allocate client pid structure\n"));
980                 close(fd);
981                 talloc_free(client);
982                 return;
983         }               
984         client_pid->ctdb   = ctdb;
985         client_pid->pid    = peer_pid;
986         client_pid->client = client;
987
988         DLIST_ADD(ctdb->client_pids, client_pid);
989
990         client->queue = ctdb_queue_setup(ctdb, client, fd, CTDB_DS_ALIGNMENT, 
991                                          ctdb_daemon_read_cb, client,
992                                          "client-%u", client->pid);
993
994         talloc_set_destructor(client, ctdb_client_destructor);
995         talloc_set_destructor(client_pid, ctdb_clientpid_destructor);
996         ctdb->num_clients++;
997 }
998
999
1000
1001 /*
1002   create a unix domain socket and bind it
1003   return a file descriptor open on the socket 
1004 */
1005 static int ux_socket_bind(struct ctdb_context *ctdb)
1006 {
1007         struct sockaddr_un addr;
1008         int ret;
1009
1010         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
1011         if (ctdb->daemon.sd == -1) {
1012                 return -1;
1013         }
1014
1015         memset(&addr, 0, sizeof(addr));
1016         addr.sun_family = AF_UNIX;
1017         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path)-1);
1018
1019         if (! sock_clean(ctdb->daemon.name)) {
1020                 return -1;
1021         }
1022
1023         set_close_on_exec(ctdb->daemon.sd);
1024
1025         ret = set_blocking(ctdb->daemon.sd, false);
1026         if (ret != 0) {
1027                 DEBUG(DEBUG_ERR,
1028                       (__location__
1029                        " failed to set socket non-blocking (%s)\n",
1030                        strerror(errno)));
1031                 goto failed;
1032         }
1033
1034         if (bind(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
1035                 DEBUG(DEBUG_CRIT,("Unable to bind on ctdb socket '%s'\n", ctdb->daemon.name));
1036                 goto failed;
1037         }
1038
1039         if (chown(ctdb->daemon.name, geteuid(), getegid()) != 0 ||
1040             chmod(ctdb->daemon.name, 0700) != 0) {
1041                 DEBUG(DEBUG_CRIT,("Unable to secure ctdb socket '%s', ctdb->daemon.name\n", ctdb->daemon.name));
1042                 goto failed;
1043         }
1044
1045
1046         if (listen(ctdb->daemon.sd, 100) != 0) {
1047                 DEBUG(DEBUG_CRIT,("Unable to listen on ctdb socket '%s'\n", ctdb->daemon.name));
1048                 goto failed;
1049         }
1050
1051         DEBUG(DEBUG_NOTICE, ("Listening to ctdb socket %s\n",
1052                              ctdb->daemon.name));
1053         return 0;
1054
1055 failed:
1056         close(ctdb->daemon.sd);
1057         ctdb->daemon.sd = -1;
1058         return -1;      
1059 }
1060
1061 static void initialise_node_flags (struct ctdb_context *ctdb)
1062 {
1063         if (ctdb->pnn == -1) {
1064                 ctdb_fatal(ctdb, "PNN is set to -1 (unknown value)");
1065         }
1066
1067         ctdb->nodes[ctdb->pnn]->flags &= ~NODE_FLAGS_DISCONNECTED;
1068
1069         /* do we start out in DISABLED mode? */
1070         if (ctdb->start_as_disabled != 0) {
1071                 DEBUG(DEBUG_ERR,
1072                       ("This node is configured to start in DISABLED state\n"));
1073                 ctdb->nodes[ctdb->pnn]->flags |= NODE_FLAGS_DISABLED;
1074         }
1075         /* do we start out in STOPPED mode? */
1076         if (ctdb->start_as_stopped != 0) {
1077                 DEBUG(DEBUG_ERR,
1078                       ("This node is configured to start in STOPPED state\n"));
1079                 ctdb->nodes[ctdb->pnn]->flags |= NODE_FLAGS_STOPPED;
1080         }
1081 }
1082
1083 static void ctdb_setup_event_callback(struct ctdb_context *ctdb, int status,
1084                                       void *private_data)
1085 {
1086         if (status != 0) {
1087                 ctdb_die(ctdb, "Failed to run setup event");
1088         }
1089         ctdb_run_notification_script(ctdb, "setup");
1090
1091         /* Start the recovery daemon */
1092         if (ctdb_start_recoverd(ctdb) != 0) {
1093                 DEBUG(DEBUG_ALERT,("Failed to start recovery daemon\n"));
1094                 exit(11);
1095         }
1096
1097         ctdb_start_periodic_events(ctdb);
1098
1099         ctdb_wait_for_first_recovery(ctdb);
1100 }
1101
1102 static struct timeval tevent_before_wait_ts;
1103 static struct timeval tevent_after_wait_ts;
1104
1105 static void ctdb_tevent_trace_init(void)
1106 {
1107         struct timeval now;
1108
1109         now = timeval_current();
1110
1111         tevent_before_wait_ts = now;
1112         tevent_after_wait_ts = now;
1113 }
1114
1115 static void ctdb_tevent_trace(enum tevent_trace_point tp,
1116                               void *private_data)
1117 {
1118         struct timeval diff;
1119         struct timeval now;
1120         struct ctdb_context *ctdb =
1121                 talloc_get_type(private_data, struct ctdb_context);
1122
1123         if (getpid() != ctdb->ctdbd_pid) {
1124                 return;
1125         }
1126
1127         now = timeval_current();
1128
1129         switch (tp) {
1130         case TEVENT_TRACE_BEFORE_WAIT:
1131                 diff = timeval_until(&tevent_after_wait_ts, &now);
1132                 if (diff.tv_sec > 3) {
1133                         DEBUG(DEBUG_ERR,
1134                               ("Handling event took %ld seconds!\n",
1135                                (long)diff.tv_sec));
1136                 }
1137                 tevent_before_wait_ts = now;
1138                 break;
1139
1140         case TEVENT_TRACE_AFTER_WAIT:
1141                 diff = timeval_until(&tevent_before_wait_ts, &now);
1142                 if (diff.tv_sec > 3) {
1143                         DEBUG(DEBUG_ERR,
1144                               ("No event for %ld seconds!\n",
1145                                (long)diff.tv_sec));
1146                 }
1147                 tevent_after_wait_ts = now;
1148                 break;
1149
1150         default:
1151                 /* Do nothing for future tevent trace points */ ;
1152         }
1153 }
1154
1155 static void ctdb_remove_pidfile(void)
1156 {
1157         TALLOC_FREE(ctdbd_pidfile_ctx);
1158 }
1159
1160 static void ctdb_create_pidfile(TALLOC_CTX *mem_ctx)
1161 {
1162         if (ctdbd_pidfile != NULL) {
1163                 int ret = pidfile_context_create(mem_ctx, ctdbd_pidfile,
1164                                                  &ctdbd_pidfile_ctx);
1165                 if (ret != 0) {
1166                         DEBUG(DEBUG_ERR,
1167                               ("Failed to create PID file %s\n",
1168                                ctdbd_pidfile));
1169                         exit(11);
1170                 }
1171
1172                 DEBUG(DEBUG_NOTICE, ("Created PID file %s\n", ctdbd_pidfile));
1173                 atexit(ctdb_remove_pidfile);
1174         }
1175 }
1176
1177 static void ctdb_initialise_vnn_map(struct ctdb_context *ctdb)
1178 {
1179         int i, j, count;
1180
1181         /* initialize the vnn mapping table, skipping any deleted nodes */
1182         ctdb->vnn_map = talloc(ctdb, struct ctdb_vnn_map);
1183         CTDB_NO_MEMORY_FATAL(ctdb, ctdb->vnn_map);
1184
1185         count = 0;
1186         for (i = 0; i < ctdb->num_nodes; i++) {
1187                 if ((ctdb->nodes[i]->flags & NODE_FLAGS_DELETED) == 0) {
1188                         count++;
1189                 }
1190         }
1191
1192         ctdb->vnn_map->generation = INVALID_GENERATION;
1193         ctdb->vnn_map->size = count;
1194         ctdb->vnn_map->map = talloc_array(ctdb->vnn_map, uint32_t, ctdb->vnn_map->size);
1195         CTDB_NO_MEMORY_FATAL(ctdb, ctdb->vnn_map->map);
1196
1197         for(i=0, j=0; i < ctdb->vnn_map->size; i++) {
1198                 if (ctdb->nodes[i]->flags & NODE_FLAGS_DELETED) {
1199                         continue;
1200                 }
1201                 ctdb->vnn_map->map[j] = i;
1202                 j++;
1203         }
1204 }
1205
1206 static void ctdb_set_my_pnn(struct ctdb_context *ctdb)
1207 {
1208         int nodeid;
1209
1210         if (ctdb->address == NULL) {
1211                 ctdb_fatal(ctdb,
1212                            "Can not determine PNN - node address is not set\n");
1213         }
1214
1215         nodeid = ctdb_ip_to_nodeid(ctdb, ctdb->address);
1216         if (nodeid == -1) {
1217                 ctdb_fatal(ctdb,
1218                            "Can not determine PNN - node address not found in node list\n");
1219         }
1220
1221         ctdb->pnn = ctdb->nodes[nodeid]->pnn;
1222         DEBUG(DEBUG_NOTICE, ("PNN is %u\n", ctdb->pnn));
1223 }
1224
1225 /*
1226   start the protocol going as a daemon
1227 */
1228 int ctdb_start_daemon(struct ctdb_context *ctdb, bool do_fork)
1229 {
1230         int res, ret = -1;
1231         struct tevent_fd *fde;
1232
1233         become_daemon(do_fork, false, false);
1234
1235         ignore_signal(SIGPIPE);
1236         ignore_signal(SIGUSR1);
1237
1238         ctdb->ctdbd_pid = getpid();
1239         DEBUG(DEBUG_ERR, ("Starting CTDBD (Version %s) as PID: %u\n",
1240                           ctdb_version_string, ctdb->ctdbd_pid));
1241         ctdb_create_pidfile(ctdb);
1242
1243         /* create a unix domain stream socket to listen to */
1244         res = ux_socket_bind(ctdb);
1245         if (res!=0) {
1246                 DEBUG(DEBUG_ALERT,("Cannot continue.  Exiting!\n"));
1247                 exit(10);
1248         }
1249
1250         /* Make sure we log something when the daemon terminates.
1251          * This must be the first exit handler to run (so the last to
1252          * be registered.
1253          */
1254         __ctdbd_pid = getpid();
1255         atexit(print_exit_message);
1256
1257         if (ctdb->do_setsched) {
1258                 /* try to set us up as realtime */
1259                 if (!set_scheduler()) {
1260                         exit(1);
1261                 }
1262                 DEBUG(DEBUG_NOTICE, ("Set real-time scheduler priority\n"));
1263         }
1264
1265         ctdb->ev = tevent_context_init(NULL);
1266         if (ctdb->ev == NULL) {
1267                 DEBUG(DEBUG_ALERT,("tevent_context_init() failed\n"));
1268                 exit(1);
1269         }
1270         tevent_loop_allow_nesting(ctdb->ev);
1271         ctdb_tevent_trace_init();
1272         tevent_set_trace_callback(ctdb->ev, ctdb_tevent_trace, ctdb);
1273
1274         /* set up a handler to pick up sigchld */
1275         if (ctdb_init_sigchld(ctdb) == NULL) {
1276                 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD\n"));
1277                 exit(1);
1278         }
1279
1280         if (do_fork) {
1281                 ctdb_set_child_logging(ctdb);
1282         }
1283
1284         TALLOC_FREE(ctdb->srv);
1285         if (srvid_init(ctdb, &ctdb->srv) != 0) {
1286                 DEBUG(DEBUG_CRIT,("Failed to setup message srvid context\n"));
1287                 exit(1);
1288         }
1289
1290         TALLOC_FREE(ctdb->tunnels);
1291         if (srvid_init(ctdb, &ctdb->tunnels) != 0) {
1292                 DEBUG(DEBUG_ERR, ("Failed to setup tunnels context\n"));
1293                 exit(1);
1294         }
1295
1296         /* initialize statistics collection */
1297         ctdb_statistics_init(ctdb);
1298
1299         /* force initial recovery for election */
1300         ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
1301
1302         if (ctdb_start_eventd(ctdb) != 0) {
1303                 DEBUG(DEBUG_ERR, ("Failed to start event daemon\n"));
1304                 exit(1);
1305         }
1306
1307         ctdb_set_runstate(ctdb, CTDB_RUNSTATE_INIT);
1308         ret = ctdb_event_script(ctdb, CTDB_EVENT_INIT);
1309         if (ret != 0) {
1310                 ctdb_die(ctdb, "Failed to run init event\n");
1311         }
1312         ctdb_run_notification_script(ctdb, "init");
1313
1314         if (strcmp(ctdb->transport, "tcp") == 0) {
1315                 ret = ctdb_tcp_init(ctdb);
1316         }
1317 #ifdef USE_INFINIBAND
1318         if (strcmp(ctdb->transport, "ib") == 0) {
1319                 ret = ctdb_ibw_init(ctdb);
1320         }
1321 #endif
1322         if (ret != 0) {
1323                 DEBUG(DEBUG_ERR,("Failed to initialise transport '%s'\n", ctdb->transport));
1324                 return -1;
1325         }
1326
1327         if (ctdb->methods == NULL) {
1328                 DEBUG(DEBUG_ALERT,(__location__ " Can not initialize transport. ctdb->methods is NULL\n"));
1329                 ctdb_fatal(ctdb, "transport is unavailable. can not initialize.");
1330         }
1331
1332         /* Initialise the transport.  This sets the node address if it
1333          * was not set via the command-line. */
1334         if (ctdb->methods->initialise(ctdb) != 0) {
1335                 ctdb_fatal(ctdb, "transport failed to initialise");
1336         }
1337
1338         ctdb_set_my_pnn(ctdb);
1339
1340         initialise_node_flags(ctdb);
1341
1342         if (ctdb->public_addresses_file) {
1343                 ret = ctdb_set_public_addresses(ctdb, true);
1344                 if (ret == -1) {
1345                         DEBUG(DEBUG_ALERT,("Unable to setup public address list\n"));
1346                         exit(1);
1347                 }
1348         }
1349
1350         ctdb_initialise_vnn_map(ctdb);
1351
1352         /* attach to existing databases */
1353         if (ctdb_attach_databases(ctdb) != 0) {
1354                 ctdb_fatal(ctdb, "Failed to attach to databases\n");
1355         }
1356
1357         /* start frozen, then let the first election sort things out */
1358         if (!ctdb_blocking_freeze(ctdb)) {
1359                 ctdb_fatal(ctdb, "Failed to get initial freeze\n");
1360         }
1361
1362         /* now start accepting clients, only can do this once frozen */
1363         fde = tevent_add_fd(ctdb->ev, ctdb, ctdb->daemon.sd, TEVENT_FD_READ,
1364                             ctdb_accept_client, ctdb);
1365         if (fde == NULL) {
1366                 ctdb_fatal(ctdb, "Failed to add daemon socket to event loop");
1367         }
1368         tevent_fd_set_auto_close(fde);
1369
1370         /* Start the transport */
1371         if (ctdb->methods->start(ctdb) != 0) {
1372                 DEBUG(DEBUG_ALERT,("transport failed to start!\n"));
1373                 ctdb_fatal(ctdb, "transport failed to start");
1374         }
1375
1376         /* Recovery daemon and timed events are started from the
1377          * callback, only after the setup event completes
1378          * successfully.
1379          */
1380         ctdb_set_runstate(ctdb, CTDB_RUNSTATE_SETUP);
1381         ret = ctdb_event_script_callback(ctdb,
1382                                          ctdb,
1383                                          ctdb_setup_event_callback,
1384                                          ctdb,
1385                                          CTDB_EVENT_SETUP,
1386                                          "%s",
1387                                          "");
1388         if (ret != 0) {
1389                 DEBUG(DEBUG_CRIT,("Failed to set up 'setup' event\n"));
1390                 exit(1);
1391         }
1392
1393         lockdown_memory(ctdb->valgrinding);
1394
1395         /* go into a wait loop to allow other nodes to complete */
1396         tevent_loop_wait(ctdb->ev);
1397
1398         DEBUG(DEBUG_CRIT,("event_loop_wait() returned. this should not happen\n"));
1399         exit(1);
1400 }
1401
1402 /*
1403   allocate a packet for use in daemon<->daemon communication
1404  */
1405 struct ctdb_req_header *_ctdb_transport_allocate(struct ctdb_context *ctdb,
1406                                                  TALLOC_CTX *mem_ctx, 
1407                                                  enum ctdb_operation operation, 
1408                                                  size_t length, size_t slength,
1409                                                  const char *type)
1410 {
1411         int size;
1412         struct ctdb_req_header *hdr;
1413
1414         length = MAX(length, slength);
1415         size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
1416
1417         if (ctdb->methods == NULL) {
1418                 DEBUG(DEBUG_INFO,(__location__ " Unable to allocate transport packet for operation %u of length %u. Transport is DOWN.\n",
1419                          operation, (unsigned)length));
1420                 return NULL;
1421         }
1422
1423         hdr = (struct ctdb_req_header *)ctdb->methods->allocate_pkt(mem_ctx, size);
1424         if (hdr == NULL) {
1425                 DEBUG(DEBUG_ERR,("Unable to allocate transport packet for operation %u of length %u\n",
1426                          operation, (unsigned)length));
1427                 return NULL;
1428         }
1429         talloc_set_name_const(hdr, type);
1430         memset(hdr, 0, slength);
1431         hdr->length       = length;
1432         hdr->operation    = operation;
1433         hdr->ctdb_magic   = CTDB_MAGIC;
1434         hdr->ctdb_version = CTDB_PROTOCOL;
1435         hdr->generation   = ctdb->vnn_map->generation;
1436         hdr->srcnode      = ctdb->pnn;
1437
1438         return hdr;     
1439 }
1440
1441 struct daemon_control_state {
1442         struct daemon_control_state *next, *prev;
1443         struct ctdb_client *client;
1444         struct ctdb_req_control_old *c;
1445         uint32_t reqid;
1446         struct ctdb_node *node;
1447 };
1448
1449 /*
1450   callback when a control reply comes in
1451  */
1452 static void daemon_control_callback(struct ctdb_context *ctdb,
1453                                     int32_t status, TDB_DATA data, 
1454                                     const char *errormsg,
1455                                     void *private_data)
1456 {
1457         struct daemon_control_state *state = talloc_get_type(private_data, 
1458                                                              struct daemon_control_state);
1459         struct ctdb_client *client = state->client;
1460         struct ctdb_reply_control_old *r;
1461         size_t len;
1462         int ret;
1463
1464         /* construct a message to send to the client containing the data */
1465         len = offsetof(struct ctdb_reply_control_old, data) + data.dsize;
1466         if (errormsg) {
1467                 len += strlen(errormsg);
1468         }
1469         r = ctdbd_allocate_pkt(ctdb, state, CTDB_REPLY_CONTROL, len, 
1470                                struct ctdb_reply_control_old);
1471         CTDB_NO_MEMORY_VOID(ctdb, r);
1472
1473         r->hdr.reqid     = state->reqid;
1474         r->status        = status;
1475         r->datalen       = data.dsize;
1476         r->errorlen = 0;
1477         memcpy(&r->data[0], data.dptr, data.dsize);
1478         if (errormsg) {
1479                 r->errorlen = strlen(errormsg);
1480                 memcpy(&r->data[r->datalen], errormsg, r->errorlen);
1481         }
1482
1483         ret = daemon_queue_send(client, &r->hdr);
1484         if (ret != -1) {
1485                 talloc_free(state);
1486         }
1487 }
1488
1489 /*
1490   fail all pending controls to a disconnected node
1491  */
1492 void ctdb_daemon_cancel_controls(struct ctdb_context *ctdb, struct ctdb_node *node)
1493 {
1494         struct daemon_control_state *state;
1495         while ((state = node->pending_controls)) {
1496                 DLIST_REMOVE(node->pending_controls, state);
1497                 daemon_control_callback(ctdb, (uint32_t)-1, tdb_null, 
1498                                         "node is disconnected", state);
1499         }
1500 }
1501
1502 /*
1503   destroy a daemon_control_state
1504  */
1505 static int daemon_control_destructor(struct daemon_control_state *state)
1506 {
1507         if (state->node) {
1508                 DLIST_REMOVE(state->node->pending_controls, state);
1509         }
1510         return 0;
1511 }
1512
1513 /*
1514   this is called when the ctdb daemon received a ctdb request control
1515   from a local client over the unix domain socket
1516  */
1517 static void daemon_request_control_from_client(struct ctdb_client *client, 
1518                                                struct ctdb_req_control_old *c)
1519 {
1520         TDB_DATA data;
1521         int res;
1522         struct daemon_control_state *state;
1523         TALLOC_CTX *tmp_ctx = talloc_new(client);
1524
1525         if (c->hdr.destnode == CTDB_CURRENT_NODE) {
1526                 c->hdr.destnode = client->ctdb->pnn;
1527         }
1528
1529         state = talloc(client, struct daemon_control_state);
1530         CTDB_NO_MEMORY_VOID(client->ctdb, state);
1531
1532         state->client = client;
1533         state->c = talloc_steal(state, c);
1534         state->reqid = c->hdr.reqid;
1535         if (ctdb_validate_pnn(client->ctdb, c->hdr.destnode)) {
1536                 state->node = client->ctdb->nodes[c->hdr.destnode];
1537                 DLIST_ADD(state->node->pending_controls, state);
1538         } else {
1539                 state->node = NULL;
1540         }
1541
1542         talloc_set_destructor(state, daemon_control_destructor);
1543
1544         if (c->flags & CTDB_CTRL_FLAG_NOREPLY) {
1545                 talloc_steal(tmp_ctx, state);
1546         }
1547         
1548         data.dptr = &c->data[0];
1549         data.dsize = c->datalen;
1550         res = ctdb_daemon_send_control(client->ctdb, c->hdr.destnode,
1551                                        c->srvid, c->opcode, client->client_id,
1552                                        c->flags,
1553                                        data, daemon_control_callback,
1554                                        state);
1555         if (res != 0) {
1556                 DEBUG(DEBUG_ERR,(__location__ " Failed to send control to remote node %u\n",
1557                          c->hdr.destnode));
1558         }
1559
1560         talloc_free(tmp_ctx);
1561 }
1562
1563 static void daemon_request_tunnel_from_client(struct ctdb_client *client,
1564                                               struct ctdb_req_tunnel_old *c)
1565 {
1566         TDB_DATA data;
1567         int ret;
1568
1569         if (! ctdb_validate_pnn(client->ctdb, c->hdr.destnode)) {
1570                 DEBUG(DEBUG_ERR, ("Invalid destination 0x%x\n",
1571                                   c->hdr.destnode));
1572                 return;
1573         }
1574
1575         ret = srvid_exists(client->ctdb->tunnels, c->tunnel_id, NULL);
1576         if (ret != 0) {
1577                 DEBUG(DEBUG_ERR,
1578                       ("tunnel id 0x%"PRIx64" not registered, dropping pkt\n",
1579                        c->tunnel_id));
1580                 return;
1581         }
1582
1583         data = (TDB_DATA) {
1584                 .dsize = c->datalen,
1585                 .dptr = &c->data[0],
1586         };
1587
1588         ret = ctdb_daemon_send_tunnel(client->ctdb, c->hdr.destnode,
1589                                       c->tunnel_id, c->flags, data);
1590         if (ret != 0) {
1591                 DEBUG(DEBUG_ERR, ("Failed to set tunnel to remote note %u\n",
1592                                   c->hdr.destnode));
1593         }
1594 }
1595
1596 /*
1597   register a call function
1598 */
1599 int ctdb_daemon_set_call(struct ctdb_context *ctdb, uint32_t db_id,
1600                          ctdb_fn_t fn, int id)
1601 {
1602         struct ctdb_registered_call *call;
1603         struct ctdb_db_context *ctdb_db;
1604
1605         ctdb_db = find_ctdb_db(ctdb, db_id);
1606         if (ctdb_db == NULL) {
1607                 return -1;
1608         }
1609
1610         call = talloc(ctdb_db, struct ctdb_registered_call);
1611         call->fn = fn;
1612         call->id = id;
1613
1614         DLIST_ADD(ctdb_db->calls, call);        
1615         return 0;
1616 }
1617
1618
1619
1620 /*
1621   this local messaging handler is ugly, but is needed to prevent
1622   recursion in ctdb_send_message() when the destination node is the
1623   same as the source node
1624  */
1625 struct ctdb_local_message {
1626         struct ctdb_context *ctdb;
1627         uint64_t srvid;
1628         TDB_DATA data;
1629 };
1630
1631 static void ctdb_local_message_trigger(struct tevent_context *ev,
1632                                        struct tevent_timer *te,
1633                                        struct timeval t, void *private_data)
1634 {
1635         struct ctdb_local_message *m = talloc_get_type(
1636                 private_data, struct ctdb_local_message);
1637
1638         srvid_dispatch(m->ctdb->srv, m->srvid, CTDB_SRVID_ALL, m->data);
1639         talloc_free(m);
1640 }
1641
1642 static int ctdb_local_message(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data)
1643 {
1644         struct ctdb_local_message *m;
1645         m = talloc(ctdb, struct ctdb_local_message);
1646         CTDB_NO_MEMORY(ctdb, m);
1647
1648         m->ctdb = ctdb;
1649         m->srvid = srvid;
1650         m->data  = data;
1651         m->data.dptr = talloc_memdup(m, m->data.dptr, m->data.dsize);
1652         if (m->data.dptr == NULL) {
1653                 talloc_free(m);
1654                 return -1;
1655         }
1656
1657         /* this needs to be done as an event to prevent recursion */
1658         tevent_add_timer(ctdb->ev, m, timeval_zero(),
1659                          ctdb_local_message_trigger, m);
1660         return 0;
1661 }
1662
1663 /*
1664   send a ctdb message
1665 */
1666 int ctdb_daemon_send_message(struct ctdb_context *ctdb, uint32_t pnn,
1667                              uint64_t srvid, TDB_DATA data)
1668 {
1669         struct ctdb_req_message_old *r;
1670         int len;
1671
1672         if (ctdb->methods == NULL) {
1673                 DEBUG(DEBUG_INFO,(__location__ " Failed to send message. Transport is DOWN\n"));
1674                 return -1;
1675         }
1676
1677         /* see if this is a message to ourselves */
1678         if (pnn == ctdb->pnn) {
1679                 return ctdb_local_message(ctdb, srvid, data);
1680         }
1681
1682         len = offsetof(struct ctdb_req_message_old, data) + data.dsize;
1683         r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_MESSAGE, len,
1684                                     struct ctdb_req_message_old);
1685         CTDB_NO_MEMORY(ctdb, r);
1686
1687         r->hdr.destnode  = pnn;
1688         r->srvid         = srvid;
1689         r->datalen       = data.dsize;
1690         memcpy(&r->data[0], data.dptr, data.dsize);
1691
1692         ctdb_queue_packet(ctdb, &r->hdr);
1693
1694         talloc_free(r);
1695         return 0;
1696 }
1697
1698
1699
1700 struct ctdb_client_notify_list {
1701         struct ctdb_client_notify_list *next, *prev;
1702         struct ctdb_context *ctdb;
1703         uint64_t srvid;
1704         TDB_DATA data;
1705 };
1706
1707
1708 static int ctdb_client_notify_destructor(struct ctdb_client_notify_list *nl)
1709 {
1710         int ret;
1711
1712         DEBUG(DEBUG_ERR,("Sending client notify message for srvid:%llu\n", (unsigned long long)nl->srvid));
1713
1714         ret = ctdb_daemon_send_message(nl->ctdb, CTDB_BROADCAST_CONNECTED, (unsigned long long)nl->srvid, nl->data);
1715         if (ret != 0) {
1716                 DEBUG(DEBUG_ERR,("Failed to send client notify message\n"));
1717         }
1718
1719         return 0;
1720 }
1721
1722 int32_t ctdb_control_register_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1723 {
1724         struct ctdb_notify_data_old *notify = (struct ctdb_notify_data_old *)indata.dptr;
1725         struct ctdb_client *client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
1726         struct ctdb_client_notify_list *nl;
1727
1728         DEBUG(DEBUG_INFO,("Register srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
1729
1730         if (indata.dsize < offsetof(struct ctdb_notify_data_old, notify_data)) {
1731                 DEBUG(DEBUG_ERR,(__location__ " Too little data in control : %d\n", (int)indata.dsize));
1732                 return -1;
1733         }
1734
1735         if (indata.dsize != (notify->len + offsetof(struct ctdb_notify_data_old, notify_data))) {
1736                 DEBUG(DEBUG_ERR,(__location__ " Wrong amount of data in control. Got %d, expected %d\n", (int)indata.dsize, (int)(notify->len + offsetof(struct ctdb_notify_data_old, notify_data))));
1737                 return -1;
1738         }
1739
1740
1741         if (client == NULL) {
1742                 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1743                 return -1;
1744         }
1745
1746         for(nl=client->notify; nl; nl=nl->next) {
1747                 if (nl->srvid == notify->srvid) {
1748                         break;
1749                 }
1750         }
1751         if (nl != NULL) {
1752                 DEBUG(DEBUG_ERR,(__location__ " Notification for srvid:%llu already exists for this client\n", (unsigned long long)notify->srvid));
1753                 return -1;
1754         }
1755
1756         nl = talloc(client, struct ctdb_client_notify_list);
1757         CTDB_NO_MEMORY(ctdb, nl);
1758         nl->ctdb       = ctdb;
1759         nl->srvid      = notify->srvid;
1760         nl->data.dsize = notify->len;
1761         nl->data.dptr  = talloc_memdup(nl, notify->notify_data,
1762                                        nl->data.dsize);
1763         CTDB_NO_MEMORY(ctdb, nl->data.dptr);
1764         
1765         DLIST_ADD(client->notify, nl);
1766         talloc_set_destructor(nl, ctdb_client_notify_destructor);
1767
1768         return 0;
1769 }
1770
1771 int32_t ctdb_control_deregister_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1772 {
1773         uint64_t srvid = *(uint64_t *)indata.dptr;
1774         struct ctdb_client *client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
1775         struct ctdb_client_notify_list *nl;
1776
1777         DEBUG(DEBUG_INFO,("Deregister srvid %llu for client %d\n", (unsigned long long)srvid, client_id));
1778
1779         if (client == NULL) {
1780                 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1781                 return -1;
1782         }
1783
1784         for(nl=client->notify; nl; nl=nl->next) {
1785                 if (nl->srvid == srvid) {
1786                         break;
1787                 }
1788         }
1789         if (nl == NULL) {
1790                 DEBUG(DEBUG_ERR,(__location__ " No notification for srvid:%llu found for this client\n", (unsigned long long)srvid));
1791                 return -1;
1792         }
1793
1794         DLIST_REMOVE(client->notify, nl);
1795         talloc_set_destructor(nl, NULL);
1796         talloc_free(nl);
1797
1798         return 0;
1799 }
1800
1801 struct ctdb_client *ctdb_find_client_by_pid(struct ctdb_context *ctdb, pid_t pid)
1802 {
1803         struct ctdb_client_pid_list *client_pid;
1804
1805         for (client_pid = ctdb->client_pids; client_pid; client_pid=client_pid->next) {
1806                 if (client_pid->pid == pid) {
1807                         return client_pid->client;
1808                 }
1809         }
1810         return NULL;
1811 }
1812
1813
1814 /* This control is used by samba when probing if a process (of a samba daemon)
1815    exists on the node.
1816    Samba does this when it needs/wants to check if a subrecord in one of the
1817    databases is still valid, or if it is stale and can be removed.
1818    If the node is in unhealthy or stopped state we just kill of the samba
1819    process holding this sub-record and return to the calling samba that
1820    the process does not exist.
1821    This allows us to forcefully recall subrecords registered by samba processes
1822    on banned and stopped nodes.
1823 */
1824 int32_t ctdb_control_process_exists(struct ctdb_context *ctdb, pid_t pid)
1825 {
1826         struct ctdb_client *client;
1827
1828         client = ctdb_find_client_by_pid(ctdb, pid);
1829         if (client == NULL) {
1830                 return -1;
1831         }
1832
1833         if (ctdb->nodes[ctdb->pnn]->flags & NODE_FLAGS_INACTIVE) {
1834                 DEBUG(DEBUG_NOTICE,
1835                       ("Killing client with pid:%d on banned/stopped node\n",
1836                        (int)pid));
1837                 talloc_free(client);
1838                 return -1;
1839         }
1840
1841         return kill(pid, 0);
1842 }
1843
1844 int32_t ctdb_control_check_pid_srvid(struct ctdb_context *ctdb,
1845                                      TDB_DATA indata)
1846 {
1847         struct ctdb_client_pid_list *client_pid;
1848         pid_t pid;
1849         uint64_t srvid;
1850         int ret;
1851
1852         pid = *(pid_t *)indata.dptr;
1853         srvid = *(uint64_t *)(indata.dptr + sizeof(pid_t));
1854
1855         for (client_pid = ctdb->client_pids;
1856              client_pid != NULL;
1857              client_pid = client_pid->next) {
1858                 if (client_pid->pid == pid) {
1859                         ret = srvid_exists(ctdb->srv, srvid,
1860                                            client_pid->client);
1861                         if (ret == 0) {
1862                                 return 0;
1863                         }
1864                 }
1865         }
1866
1867         return -1;
1868 }
1869
1870 int ctdb_control_getnodesfile(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
1871 {
1872         struct ctdb_node_map_old *node_map = NULL;
1873
1874         CHECK_CONTROL_DATA_SIZE(0);
1875
1876         node_map = ctdb_read_nodes_file(ctdb, ctdb->nodes_file);
1877         if (node_map == NULL) {
1878                 DEBUG(DEBUG_ERR, ("Failed to read nodes file\n"));
1879                 return -1;
1880         }
1881
1882         outdata->dptr  = (unsigned char *)node_map;
1883         outdata->dsize = talloc_get_size(outdata->dptr);
1884
1885         return 0;
1886 }
1887
1888 void ctdb_shutdown_sequence(struct ctdb_context *ctdb, int exit_code)
1889 {
1890         if (ctdb->runstate == CTDB_RUNSTATE_SHUTDOWN) {
1891                 DEBUG(DEBUG_NOTICE,("Already shutting down so will not proceed.\n"));
1892                 return;
1893         }
1894
1895         DEBUG(DEBUG_ERR,("Shutdown sequence commencing.\n"));
1896         ctdb_set_runstate(ctdb, CTDB_RUNSTATE_SHUTDOWN);
1897         ctdb_stop_recoverd(ctdb);
1898         ctdb_stop_keepalive(ctdb);
1899         ctdb_stop_monitoring(ctdb);
1900         ctdb_release_all_ips(ctdb);
1901         ctdb_event_script(ctdb, CTDB_EVENT_SHUTDOWN);
1902         ctdb_stop_eventd(ctdb);
1903         if (ctdb->methods != NULL && ctdb->methods->shutdown != NULL) {
1904                 ctdb->methods->shutdown(ctdb);
1905         }
1906
1907         DEBUG(DEBUG_ERR,("Shutdown sequence complete, exiting.\n"));
1908         exit(exit_code);
1909 }
1910
1911 /* When forking the main daemon and the child process needs to connect
1912  * back to the daemon as a client process, this function can be used
1913  * to change the ctdb context from daemon into client mode.  The child
1914  * process must be created using ctdb_fork() and not fork() -
1915  * ctdb_fork() does some necessary housekeeping.
1916  */
1917 int switch_from_server_to_client(struct ctdb_context *ctdb)
1918 {
1919         int ret;
1920
1921         /* get a new event context */
1922         ctdb->ev = tevent_context_init(ctdb);
1923         if (ctdb->ev == NULL) {
1924                 DEBUG(DEBUG_ALERT,("tevent_context_init() failed\n"));
1925                 exit(1);
1926         }
1927         tevent_loop_allow_nesting(ctdb->ev);
1928
1929         /* Connect to main CTDB daemon */
1930         ret = ctdb_socket_connect(ctdb);
1931         if (ret != 0) {
1932                 DEBUG(DEBUG_ALERT, (__location__ " Failed to init ctdb client\n"));
1933                 return -1;
1934         }
1935
1936         ctdb->can_send_controls = true;
1937
1938         return 0;
1939 }