ctdbd: When the "setup" event fails log an error and exit, don't abort
[obnox/samba/samba-obnox.git] / ctdb / server / ctdb_daemon.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2006
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "db_wrap.h"
22 #include "lib/tdb/include/tdb.h"
23 #include "lib/util/dlinklist.h"
24 #include "system/network.h"
25 #include "system/filesys.h"
26 #include "system/wait.h"
27 #include "../include/ctdb_version.h"
28 #include "../include/ctdb_client.h"
29 #include "../include/ctdb_private.h"
30 #include "../common/rb_tree.h"
31 #include <sys/socket.h>
32
33 struct ctdb_client_pid_list {
34         struct ctdb_client_pid_list *next, *prev;
35         struct ctdb_context *ctdb;
36         pid_t pid;
37         struct ctdb_client *client;
38 };
39
40 const char *ctdbd_pidfile = NULL;
41
42 static void daemon_incoming_packet(void *, struct ctdb_req_header *);
43
44 static void print_exit_message(void)
45 {
46         if (debug_extra != NULL && debug_extra[0] != '\0') {
47                 DEBUG(DEBUG_NOTICE,("CTDB %s shutting down\n", debug_extra));
48         } else {
49                 DEBUG(DEBUG_NOTICE,("CTDB daemon shutting down\n"));
50         }
51 }
52
53
54
55 static void ctdb_time_tick(struct event_context *ev, struct timed_event *te, 
56                                   struct timeval t, void *private_data)
57 {
58         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
59
60         if (getpid() != ctdbd_pid) {
61                 return;
62         }
63
64         event_add_timed(ctdb->ev, ctdb, 
65                         timeval_current_ofs(1, 0), 
66                         ctdb_time_tick, ctdb);
67 }
68
69 /* Used to trigger a dummy event once per second, to make
70  * detection of hangs more reliable.
71  */
72 static void ctdb_start_time_tickd(struct ctdb_context *ctdb)
73 {
74         event_add_timed(ctdb->ev, ctdb, 
75                         timeval_current_ofs(1, 0), 
76                         ctdb_time_tick, ctdb);
77 }
78
79 static void ctdb_start_periodic_events(struct ctdb_context *ctdb)
80 {
81         /* start monitoring for connected/disconnected nodes */
82         ctdb_start_keepalive(ctdb);
83
84         /* start monitoring for node health */
85         ctdb_start_monitoring(ctdb);
86
87         /* start periodic update of tcp tickle lists */
88         ctdb_start_tcp_tickle_update(ctdb);
89
90         /* start listening for recovery daemon pings */
91         ctdb_control_recd_ping(ctdb);
92
93         /* start listening to timer ticks */
94         ctdb_start_time_tickd(ctdb);
95 }
96
97 static void block_signal(int signum)
98 {
99         struct sigaction act;
100
101         memset(&act, 0, sizeof(act));
102
103         act.sa_handler = SIG_IGN;
104         sigemptyset(&act.sa_mask);
105         sigaddset(&act.sa_mask, signum);
106         sigaction(signum, &act, NULL);
107 }
108
109
110 /*
111   send a packet to a client
112  */
113 static int daemon_queue_send(struct ctdb_client *client, struct ctdb_req_header *hdr)
114 {
115         CTDB_INCREMENT_STAT(client->ctdb, client_packets_sent);
116         if (hdr->operation == CTDB_REQ_MESSAGE) {
117                 if (ctdb_queue_length(client->queue) > client->ctdb->tunable.max_queue_depth_drop_msg) {
118                         DEBUG(DEBUG_ERR,("CTDB_REQ_MESSAGE queue full - killing client connection.\n"));
119                         talloc_free(client);
120                         return -1;
121                 }
122         }
123         return ctdb_queue_send(client->queue, (uint8_t *)hdr, hdr->length);
124 }
125
126 /*
127   message handler for when we are in daemon mode. This redirects the message
128   to the right client
129  */
130 static void daemon_message_handler(struct ctdb_context *ctdb, uint64_t srvid, 
131                                     TDB_DATA data, void *private_data)
132 {
133         struct ctdb_client *client = talloc_get_type(private_data, struct ctdb_client);
134         struct ctdb_req_message *r;
135         int len;
136
137         /* construct a message to send to the client containing the data */
138         len = offsetof(struct ctdb_req_message, data) + data.dsize;
139         r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE, 
140                                len, struct ctdb_req_message);
141         CTDB_NO_MEMORY_VOID(ctdb, r);
142
143         talloc_set_name_const(r, "req_message packet");
144
145         r->srvid         = srvid;
146         r->datalen       = data.dsize;
147         memcpy(&r->data[0], data.dptr, data.dsize);
148
149         daemon_queue_send(client, &r->hdr);
150
151         talloc_free(r);
152 }
153
154 /*
155   this is called when the ctdb daemon received a ctdb request to 
156   set the srvid from the client
157  */
158 int daemon_register_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
159 {
160         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
161         int res;
162         if (client == NULL) {
163                 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_register_message_handler\n"));
164                 return -1;
165         }
166         res = ctdb_register_message_handler(ctdb, client, srvid, daemon_message_handler, client);
167         if (res != 0) {
168                 DEBUG(DEBUG_ERR,(__location__ " Failed to register handler %llu in daemon\n", 
169                          (unsigned long long)srvid));
170         } else {
171                 DEBUG(DEBUG_INFO,(__location__ " Registered message handler for srvid=%llu\n", 
172                          (unsigned long long)srvid));
173         }
174
175         return res;
176 }
177
178 /*
179   this is called when the ctdb daemon received a ctdb request to 
180   remove a srvid from the client
181  */
182 int daemon_deregister_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
183 {
184         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
185         if (client == NULL) {
186                 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_deregister_message_handler\n"));
187                 return -1;
188         }
189         return ctdb_deregister_message_handler(ctdb, srvid, client);
190 }
191
192 int daemon_check_srvids(struct ctdb_context *ctdb, TDB_DATA indata,
193                         TDB_DATA *outdata)
194 {
195         uint64_t *ids;
196         int i, num_ids;
197         uint8_t *results;
198
199         if ((indata.dsize % sizeof(uint64_t)) != 0) {
200                 DEBUG(DEBUG_ERR, ("Bad indata in daemon_check_srvids, "
201                                   "size=%d\n", (int)indata.dsize));
202                 return -1;
203         }
204
205         ids = (uint64_t *)indata.dptr;
206         num_ids = indata.dsize / 8;
207
208         results = talloc_zero_array(outdata, uint8_t, (num_ids+7)/8);
209         if (results == NULL) {
210                 DEBUG(DEBUG_ERR, ("talloc failed in daemon_check_srvids\n"));
211                 return -1;
212         }
213         for (i=0; i<num_ids; i++) {
214                 if (ctdb_check_message_handler(ctdb, ids[i])) {
215                         results[i/8] |= (1 << (i%8));
216                 }
217         }
218         outdata->dptr = (uint8_t *)results;
219         outdata->dsize = talloc_get_size(results);
220         return 0;
221 }
222
223 /*
224   destroy a ctdb_client
225 */
226 static int ctdb_client_destructor(struct ctdb_client *client)
227 {
228         struct ctdb_db_context *ctdb_db;
229
230         ctdb_takeover_client_destructor_hook(client);
231         ctdb_reqid_remove(client->ctdb, client->client_id);
232         client->ctdb->num_clients--;
233
234         if (client->num_persistent_updates != 0) {
235                 DEBUG(DEBUG_ERR,(__location__ " Client disconnecting with %u persistent updates in flight. Starting recovery\n", client->num_persistent_updates));
236                 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
237         }
238         ctdb_db = find_ctdb_db(client->ctdb, client->db_id);
239         if (ctdb_db) {
240                 DEBUG(DEBUG_ERR, (__location__ " client exit while transaction "
241                                   "commit active. Forcing recovery.\n"));
242                 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
243
244                 /* legacy trans2 transaction state: */
245                 ctdb_db->transaction_active = false;
246
247                 /*
248                  * trans3 transaction state:
249                  *
250                  * The destructor sets the pointer to NULL.
251                  */
252                 talloc_free(ctdb_db->persistent_state);
253         }
254
255         return 0;
256 }
257
258
259 /*
260   this is called when the ctdb daemon received a ctdb request message
261   from a local client over the unix domain socket
262  */
263 static void daemon_request_message_from_client(struct ctdb_client *client, 
264                                                struct ctdb_req_message *c)
265 {
266         TDB_DATA data;
267         int res;
268
269         if (c->hdr.destnode == CTDB_CURRENT_NODE) {
270                 c->hdr.destnode = ctdb_get_pnn(client->ctdb);
271         }
272
273         /* maybe the message is for another client on this node */
274         if (ctdb_get_pnn(client->ctdb)==c->hdr.destnode) {
275                 ctdb_request_message(client->ctdb, (struct ctdb_req_header *)c);
276                 return;
277         }
278
279         /* its for a remote node */
280         data.dptr = &c->data[0];
281         data.dsize = c->datalen;
282         res = ctdb_daemon_send_message(client->ctdb, c->hdr.destnode,
283                                        c->srvid, data);
284         if (res != 0) {
285                 DEBUG(DEBUG_ERR,(__location__ " Failed to send message to remote node %u\n",
286                          c->hdr.destnode));
287         }
288 }
289
290
291 struct daemon_call_state {
292         struct ctdb_client *client;
293         uint32_t reqid;
294         struct ctdb_call *call;
295         struct timeval start_time;
296
297         /* readonly request ? */
298         uint32_t readonly_fetch;
299         uint32_t client_callid;
300 };
301
302 /* 
303    complete a call from a client 
304 */
305 static void daemon_call_from_client_callback(struct ctdb_call_state *state)
306 {
307         struct daemon_call_state *dstate = talloc_get_type(state->async.private_data, 
308                                                            struct daemon_call_state);
309         struct ctdb_reply_call *r;
310         int res;
311         uint32_t length;
312         struct ctdb_client *client = dstate->client;
313         struct ctdb_db_context *ctdb_db = state->ctdb_db;
314
315         talloc_steal(client, dstate);
316         talloc_steal(dstate, dstate->call);
317
318         res = ctdb_daemon_call_recv(state, dstate->call);
319         if (res != 0) {
320                 DEBUG(DEBUG_ERR, (__location__ " ctdbd_call_recv() returned error\n"));
321                 CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
322
323                 CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 1", call_latency, dstate->start_time);
324                 return;
325         }
326
327         length = offsetof(struct ctdb_reply_call, data) + dstate->call->reply_data.dsize;
328         /* If the client asked for readonly FETCH, we remapped this to 
329            FETCH_WITH_HEADER when calling the daemon. So we must
330            strip the extra header off the reply data before passing
331            it back to the client.
332         */
333         if (dstate->readonly_fetch
334         && dstate->client_callid == CTDB_FETCH_FUNC) {
335                 length -= sizeof(struct ctdb_ltdb_header);
336         }
337
338         r = ctdbd_allocate_pkt(client->ctdb, dstate, CTDB_REPLY_CALL, 
339                                length, struct ctdb_reply_call);
340         if (r == NULL) {
341                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate reply_call in ctdb daemon\n"));
342                 CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
343                 CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 2", call_latency, dstate->start_time);
344                 return;
345         }
346         r->hdr.reqid        = dstate->reqid;
347         r->status           = dstate->call->status;
348
349         if (dstate->readonly_fetch
350         && dstate->client_callid == CTDB_FETCH_FUNC) {
351                 /* client only asked for a FETCH so we must strip off
352                    the extra ctdb_ltdb header
353                 */
354                 r->datalen          = dstate->call->reply_data.dsize - sizeof(struct ctdb_ltdb_header);
355                 memcpy(&r->data[0], dstate->call->reply_data.dptr + sizeof(struct ctdb_ltdb_header), r->datalen);
356         } else {
357                 r->datalen          = dstate->call->reply_data.dsize;
358                 memcpy(&r->data[0], dstate->call->reply_data.dptr, r->datalen);
359         }
360
361         res = daemon_queue_send(client, &r->hdr);
362         if (res == -1) {
363                 /* client is dead - return immediately */
364                 return;
365         }
366         if (res != 0) {
367                 DEBUG(DEBUG_ERR, (__location__ " Failed to queue packet from daemon to client\n"));
368         }
369         CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 3", call_latency, dstate->start_time);
370         CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
371         talloc_free(dstate);
372 }
373
374 struct ctdb_daemon_packet_wrap {
375         struct ctdb_context *ctdb;
376         uint32_t client_id;
377 };
378
379 /*
380   a wrapper to catch disconnected clients
381  */
382 static void daemon_incoming_packet_wrap(void *p, struct ctdb_req_header *hdr)
383 {
384         struct ctdb_client *client;
385         struct ctdb_daemon_packet_wrap *w = talloc_get_type(p, 
386                                                             struct ctdb_daemon_packet_wrap);
387         if (w == NULL) {
388                 DEBUG(DEBUG_CRIT,(__location__ " Bad packet type '%s'\n", talloc_get_name(p)));
389                 return;
390         }
391
392         client = ctdb_reqid_find(w->ctdb, w->client_id, struct ctdb_client);
393         if (client == NULL) {
394                 DEBUG(DEBUG_ERR,(__location__ " Packet for disconnected client %u\n",
395                          w->client_id));
396                 talloc_free(w);
397                 return;
398         }
399         talloc_free(w);
400
401         /* process it */
402         daemon_incoming_packet(client, hdr);    
403 }
404
405 struct ctdb_deferred_fetch_call {
406         struct ctdb_deferred_fetch_call *next, *prev;
407         struct ctdb_req_call *c;
408         struct ctdb_daemon_packet_wrap *w;
409 };
410
411 struct ctdb_deferred_fetch_queue {
412         struct ctdb_deferred_fetch_call *deferred_calls;
413 };
414
415 struct ctdb_deferred_requeue {
416         struct ctdb_deferred_fetch_call *dfc;
417         struct ctdb_client *client;
418 };
419
420 /* called from a timer event and starts reprocessing the deferred call.*/
421 static void reprocess_deferred_call(struct event_context *ev, struct timed_event *te, 
422                                        struct timeval t, void *private_data)
423 {
424         struct ctdb_deferred_requeue *dfr = (struct ctdb_deferred_requeue *)private_data;
425         struct ctdb_client *client = dfr->client;
426
427         talloc_steal(client, dfr->dfc->c);
428         daemon_incoming_packet(client, (struct ctdb_req_header *)dfr->dfc->c);
429         talloc_free(dfr);
430 }
431
432 /* the referral context is destroyed either after a timeout or when the initial
433    fetch-lock has finished.
434    at this stage, immediately start reprocessing the queued up deferred
435    calls so they get reprocessed immediately (and since we are dmaster at
436    this stage, trigger the waiting smbd processes to pick up and aquire the
437    record right away.
438 */
439 static int deferred_fetch_queue_destructor(struct ctdb_deferred_fetch_queue *dfq)
440 {
441
442         /* need to reprocess the packets from the queue explicitely instead of
443            just using a normal destructor since we want, need, to
444            call the clients in the same oder as the requests queued up
445         */
446         while (dfq->deferred_calls != NULL) {
447                 struct ctdb_client *client;
448                 struct ctdb_deferred_fetch_call *dfc = dfq->deferred_calls;
449                 struct ctdb_deferred_requeue *dfr;
450
451                 DLIST_REMOVE(dfq->deferred_calls, dfc);
452
453                 client = ctdb_reqid_find(dfc->w->ctdb, dfc->w->client_id, struct ctdb_client);
454                 if (client == NULL) {
455                         DEBUG(DEBUG_ERR,(__location__ " Packet for disconnected client %u\n",
456                                  dfc->w->client_id));
457                         continue;
458                 }
459
460                 /* process it by pushing it back onto the eventloop */
461                 dfr = talloc(client, struct ctdb_deferred_requeue);
462                 if (dfr == NULL) {
463                         DEBUG(DEBUG_ERR,("Failed to allocate deferred fetch requeue structure\n"));
464                         continue;
465                 }
466
467                 dfr->dfc    = talloc_steal(dfr, dfc);
468                 dfr->client = client;
469
470                 event_add_timed(dfc->w->ctdb->ev, client, timeval_zero(), reprocess_deferred_call, dfr);
471         }
472
473         return 0;
474 }
475
476 /* insert the new deferral context into the rb tree.
477    there should never be a pre-existing context here, but check for it
478    warn and destroy the previous context if there is already a deferral context
479    for this key.
480 */
481 static void *insert_dfq_callback(void *parm, void *data)
482 {
483         if (data) {
484                 DEBUG(DEBUG_ERR,("Already have DFQ registered. Free old %p and create new %p\n", data, parm));
485                 talloc_free(data);
486         }
487         return parm;
488 }
489
490 /* if the original fetch-lock did not complete within a reasonable time,
491    free the context and context for all deferred requests to cause them to be
492    re-inserted into the event system.
493 */
494 static void dfq_timeout(struct event_context *ev, struct timed_event *te, 
495                                   struct timeval t, void *private_data)
496 {
497         talloc_free(private_data);
498 }
499
500 /* This function is used in the local daemon to register a KEY in a database
501    for being "fetched"
502    While the remote fetch is in-flight, any futher attempts to re-fetch the
503    same record will be deferred until the fetch completes.
504 */
505 static int setup_deferred_fetch_locks(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
506 {
507         uint32_t *k;
508         struct ctdb_deferred_fetch_queue *dfq;
509
510         k = talloc_zero_size(call, ((call->key.dsize + 3) & 0xfffffffc) + 4);
511         if (k == NULL) {
512                 DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch\n"));
513                 return -1;
514         }
515
516         k[0] = (call->key.dsize + 3) / 4 + 1;
517         memcpy(&k[1], call->key.dptr, call->key.dsize);
518
519         dfq  = talloc(call, struct ctdb_deferred_fetch_queue);
520         if (dfq == NULL) {
521                 DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch queue structure\n"));
522                 talloc_free(k);
523                 return -1;
524         }
525         dfq->deferred_calls = NULL;
526
527         trbt_insertarray32_callback(ctdb_db->deferred_fetch, k[0], &k[0], insert_dfq_callback, dfq);
528
529         talloc_set_destructor(dfq, deferred_fetch_queue_destructor);
530
531         /* if the fetch havent completed in 30 seconds, just tear it all down
532            and let it try again as the events are reissued */
533         event_add_timed(ctdb_db->ctdb->ev, dfq, timeval_current_ofs(30, 0), dfq_timeout, dfq);
534
535         talloc_free(k);
536         return 0;
537 }
538
539 /* check if this is a duplicate request to a fetch already in-flight
540    if it is, make this call deferred to be reprocessed later when
541    the in-flight fetch completes.
542 */
543 static int requeue_duplicate_fetch(struct ctdb_db_context *ctdb_db, struct ctdb_client *client, TDB_DATA key, struct ctdb_req_call *c)
544 {
545         uint32_t *k;
546         struct ctdb_deferred_fetch_queue *dfq;
547         struct ctdb_deferred_fetch_call *dfc;
548
549         k = talloc_zero_size(c, ((key.dsize + 3) & 0xfffffffc) + 4);
550         if (k == NULL) {
551                 DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch\n"));
552                 return -1;
553         }
554
555         k[0] = (key.dsize + 3) / 4 + 1;
556         memcpy(&k[1], key.dptr, key.dsize);
557
558         dfq = trbt_lookuparray32(ctdb_db->deferred_fetch, k[0], &k[0]);
559         if (dfq == NULL) {
560                 talloc_free(k);
561                 return -1;
562         }
563
564
565         talloc_free(k);
566
567         dfc = talloc(dfq, struct ctdb_deferred_fetch_call);
568         if (dfc == NULL) {
569                 DEBUG(DEBUG_ERR, ("Failed to allocate deferred fetch call structure\n"));
570                 return -1;
571         }
572
573         dfc->w = talloc(dfc, struct ctdb_daemon_packet_wrap);
574         if (dfc->w == NULL) {
575                 DEBUG(DEBUG_ERR,("Failed to allocate deferred fetch daemon packet wrap structure\n"));
576                 talloc_free(dfc);
577                 return -1;
578         }
579
580         dfc->c = talloc_steal(dfc, c);
581         dfc->w->ctdb = ctdb_db->ctdb;
582         dfc->w->client_id = client->client_id;
583
584         DLIST_ADD_END(dfq->deferred_calls, dfc, NULL);
585
586         return 0;
587 }
588
589
590 /*
591   this is called when the ctdb daemon received a ctdb request call
592   from a local client over the unix domain socket
593  */
594 static void daemon_request_call_from_client(struct ctdb_client *client, 
595                                             struct ctdb_req_call *c)
596 {
597         struct ctdb_call_state *state;
598         struct ctdb_db_context *ctdb_db;
599         struct daemon_call_state *dstate;
600         struct ctdb_call *call;
601         struct ctdb_ltdb_header header;
602         TDB_DATA key, data;
603         int ret;
604         struct ctdb_context *ctdb = client->ctdb;
605         struct ctdb_daemon_packet_wrap *w;
606
607         CTDB_INCREMENT_STAT(ctdb, total_calls);
608         CTDB_DECREMENT_STAT(ctdb, pending_calls);
609
610         ctdb_db = find_ctdb_db(client->ctdb, c->db_id);
611         if (!ctdb_db) {
612                 DEBUG(DEBUG_ERR, (__location__ " Unknown database in request. db_id==0x%08x",
613                           c->db_id));
614                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
615                 return;
616         }
617
618         if (ctdb_db->unhealthy_reason) {
619                 /*
620                  * this is just a warning, as the tdb should be empty anyway,
621                  * and only persistent databases can be unhealthy, which doesn't
622                  * use this code patch
623                  */
624                 DEBUG(DEBUG_WARNING,("warn: db(%s) unhealty in daemon_request_call_from_client(): %s\n",
625                                      ctdb_db->db_name, ctdb_db->unhealthy_reason));
626         }
627
628         key.dptr = c->data;
629         key.dsize = c->keylen;
630
631         w = talloc(ctdb, struct ctdb_daemon_packet_wrap);
632         CTDB_NO_MEMORY_VOID(ctdb, w);   
633
634         w->ctdb = ctdb;
635         w->client_id = client->client_id;
636
637         ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header, 
638                                            (struct ctdb_req_header *)c, &data,
639                                            daemon_incoming_packet_wrap, w, true);
640         if (ret == -2) {
641                 /* will retry later */
642                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
643                 return;
644         }
645
646         talloc_free(w);
647
648         if (ret != 0) {
649                 DEBUG(DEBUG_ERR,(__location__ " Unable to fetch record\n"));
650                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
651                 return;
652         }
653
654
655         /* check if this fetch request is a duplicate for a
656            request we already have in flight. If so defer it until
657            the first request completes.
658         */
659         if (ctdb->tunable.fetch_collapse == 1) {
660                 if (requeue_duplicate_fetch(ctdb_db, client, key, c) == 0) {
661                         ret = ctdb_ltdb_unlock(ctdb_db, key);
662                         if (ret != 0) {
663                                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
664                         }
665                         return;
666                 }
667         }
668
669         /* Dont do READONLY if we dont have a tracking database */
670         if ((c->flags & CTDB_WANT_READONLY) && !ctdb_db->readonly) {
671                 c->flags &= ~CTDB_WANT_READONLY;
672         }
673
674         if (header.flags & CTDB_REC_RO_REVOKE_COMPLETE) {
675                 header.flags &= ~CTDB_REC_RO_FLAGS;
676                 CTDB_INCREMENT_STAT(ctdb, total_ro_revokes);
677                 CTDB_INCREMENT_DB_STAT(ctdb_db, db_ro_revokes);
678                 if (ctdb_ltdb_store(ctdb_db, key, &header, data) != 0) {
679                         ctdb_fatal(ctdb, "Failed to write header with cleared REVOKE flag");
680                 }
681                 /* and clear out the tracking data */
682                 if (tdb_delete(ctdb_db->rottdb, key) != 0) {
683                         DEBUG(DEBUG_ERR,(__location__ " Failed to clear out trackingdb record\n"));
684                 }
685         }
686
687         /* if we are revoking, we must defer all other calls until the revoke
688          * had completed.
689          */
690         if (header.flags & CTDB_REC_RO_REVOKING_READONLY) {
691                 talloc_free(data.dptr);
692                 ret = ctdb_ltdb_unlock(ctdb_db, key);
693
694                 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, key, (struct ctdb_req_header *)c, daemon_incoming_packet, client) != 0) {
695                         ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
696                 }
697                 return;
698         }
699
700         if ((header.dmaster == ctdb->pnn)
701         && (!(c->flags & CTDB_WANT_READONLY))
702         && (header.flags & (CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY)) ) {
703                 header.flags   |= CTDB_REC_RO_REVOKING_READONLY;
704                 if (ctdb_ltdb_store(ctdb_db, key, &header, data) != 0) {
705                         ctdb_fatal(ctdb, "Failed to store record with HAVE_DELEGATIONS set");
706                 }
707                 ret = ctdb_ltdb_unlock(ctdb_db, key);
708
709                 if (ctdb_start_revoke_ro_record(ctdb, ctdb_db, key, &header, data) != 0) {
710                         ctdb_fatal(ctdb, "Failed to start record revoke");
711                 }
712                 talloc_free(data.dptr);
713
714                 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, key, (struct ctdb_req_header *)c, daemon_incoming_packet, client) != 0) {
715                         ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
716                 }
717
718                 return;
719         }               
720
721         dstate = talloc(client, struct daemon_call_state);
722         if (dstate == NULL) {
723                 ret = ctdb_ltdb_unlock(ctdb_db, key);
724                 if (ret != 0) {
725                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
726                 }
727
728                 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate dstate\n"));
729                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
730                 return;
731         }
732         dstate->start_time = timeval_current();
733         dstate->client = client;
734         dstate->reqid  = c->hdr.reqid;
735         talloc_steal(dstate, data.dptr);
736
737         call = dstate->call = talloc_zero(dstate, struct ctdb_call);
738         if (call == NULL) {
739                 ret = ctdb_ltdb_unlock(ctdb_db, key);
740                 if (ret != 0) {
741                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
742                 }
743
744                 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate call\n"));
745                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
746                 CTDB_UPDATE_LATENCY(ctdb, ctdb_db, "call_from_client 1", call_latency, dstate->start_time);
747                 return;
748         }
749
750         dstate->readonly_fetch = 0;
751         call->call_id = c->callid;
752         call->key = key;
753         call->call_data.dptr = c->data + c->keylen;
754         call->call_data.dsize = c->calldatalen;
755         call->flags = c->flags;
756
757         if (c->flags & CTDB_WANT_READONLY) {
758                 /* client wants readonly record, so translate this into a 
759                    fetch with header. remember what the client asked for
760                    so we can remap the reply back to the proper format for
761                    the client in the reply
762                  */
763                 dstate->client_callid = call->call_id;
764                 call->call_id = CTDB_FETCH_WITH_HEADER_FUNC;
765                 dstate->readonly_fetch = 1;
766         }
767
768         if (header.dmaster == ctdb->pnn) {
769                 state = ctdb_call_local_send(ctdb_db, call, &header, &data);
770         } else {
771                 state = ctdb_daemon_call_send_remote(ctdb_db, call, &header);
772                 if (ctdb->tunable.fetch_collapse == 1) {
773                         /* This request triggered a remote fetch-lock.
774                            set up a deferral for this key so any additional
775                            fetch-locks are deferred until the current one
776                            finishes.
777                          */
778                         setup_deferred_fetch_locks(ctdb_db, call);
779                 }
780         }
781
782         ret = ctdb_ltdb_unlock(ctdb_db, key);
783         if (ret != 0) {
784                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
785         }
786
787         if (state == NULL) {
788                 DEBUG(DEBUG_ERR,(__location__ " Unable to setup call send\n"));
789                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
790                 CTDB_UPDATE_LATENCY(ctdb, ctdb_db, "call_from_client 2", call_latency, dstate->start_time);
791                 return;
792         }
793         talloc_steal(state, dstate);
794         talloc_steal(client, state);
795
796         state->async.fn = daemon_call_from_client_callback;
797         state->async.private_data = dstate;
798 }
799
800
801 static void daemon_request_control_from_client(struct ctdb_client *client, 
802                                                struct ctdb_req_control *c);
803
804 /* data contains a packet from the client */
805 static void daemon_incoming_packet(void *p, struct ctdb_req_header *hdr)
806 {
807         struct ctdb_client *client = talloc_get_type(p, struct ctdb_client);
808         TALLOC_CTX *tmp_ctx;
809         struct ctdb_context *ctdb = client->ctdb;
810
811         /* place the packet as a child of a tmp_ctx. We then use
812            talloc_free() below to free it. If any of the calls want
813            to keep it, then they will steal it somewhere else, and the
814            talloc_free() will be a no-op */
815         tmp_ctx = talloc_new(client);
816         talloc_steal(tmp_ctx, hdr);
817
818         if (hdr->ctdb_magic != CTDB_MAGIC) {
819                 ctdb_set_error(client->ctdb, "Non CTDB packet rejected in daemon\n");
820                 goto done;
821         }
822
823         if (hdr->ctdb_version != CTDB_VERSION) {
824                 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
825                 goto done;
826         }
827
828         switch (hdr->operation) {
829         case CTDB_REQ_CALL:
830                 CTDB_INCREMENT_STAT(ctdb, client.req_call);
831                 daemon_request_call_from_client(client, (struct ctdb_req_call *)hdr);
832                 break;
833
834         case CTDB_REQ_MESSAGE:
835                 CTDB_INCREMENT_STAT(ctdb, client.req_message);
836                 daemon_request_message_from_client(client, (struct ctdb_req_message *)hdr);
837                 break;
838
839         case CTDB_REQ_CONTROL:
840                 CTDB_INCREMENT_STAT(ctdb, client.req_control);
841                 daemon_request_control_from_client(client, (struct ctdb_req_control *)hdr);
842                 break;
843
844         default:
845                 DEBUG(DEBUG_CRIT,(__location__ " daemon: unrecognized operation %u\n",
846                          hdr->operation));
847         }
848
849 done:
850         talloc_free(tmp_ctx);
851 }
852
853 /*
854   called when the daemon gets a incoming packet
855  */
856 static void ctdb_daemon_read_cb(uint8_t *data, size_t cnt, void *args)
857 {
858         struct ctdb_client *client = talloc_get_type(args, struct ctdb_client);
859         struct ctdb_req_header *hdr;
860
861         if (cnt == 0) {
862                 talloc_free(client);
863                 return;
864         }
865
866         CTDB_INCREMENT_STAT(client->ctdb, client_packets_recv);
867
868         if (cnt < sizeof(*hdr)) {
869                 ctdb_set_error(client->ctdb, "Bad packet length %u in daemon\n", 
870                                (unsigned)cnt);
871                 return;
872         }
873         hdr = (struct ctdb_req_header *)data;
874         if (cnt != hdr->length) {
875                 ctdb_set_error(client->ctdb, "Bad header length %u expected %u\n in daemon", 
876                                (unsigned)hdr->length, (unsigned)cnt);
877                 return;
878         }
879
880         if (hdr->ctdb_magic != CTDB_MAGIC) {
881                 ctdb_set_error(client->ctdb, "Non CTDB packet rejected\n");
882                 return;
883         }
884
885         if (hdr->ctdb_version != CTDB_VERSION) {
886                 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
887                 return;
888         }
889
890         DEBUG(DEBUG_DEBUG,(__location__ " client request %u of type %u length %u from "
891                  "node %u to %u\n", hdr->reqid, hdr->operation, hdr->length,
892                  hdr->srcnode, hdr->destnode));
893
894         /* it is the responsibility of the incoming packet function to free 'data' */
895         daemon_incoming_packet(client, hdr);
896 }
897
898
899 static int ctdb_clientpid_destructor(struct ctdb_client_pid_list *client_pid)
900 {
901         if (client_pid->ctdb->client_pids != NULL) {
902                 DLIST_REMOVE(client_pid->ctdb->client_pids, client_pid);
903         }
904
905         return 0;
906 }
907
908
909 static void ctdb_accept_client(struct event_context *ev, struct fd_event *fde, 
910                          uint16_t flags, void *private_data)
911 {
912         struct sockaddr_un addr;
913         socklen_t len;
914         int fd;
915         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
916         struct ctdb_client *client;
917         struct ctdb_client_pid_list *client_pid;
918         pid_t peer_pid = 0;
919
920         memset(&addr, 0, sizeof(addr));
921         len = sizeof(addr);
922         fd = accept(ctdb->daemon.sd, (struct sockaddr *)&addr, &len);
923         if (fd == -1) {
924                 return;
925         }
926
927         set_nonblocking(fd);
928         set_close_on_exec(fd);
929
930         DEBUG(DEBUG_DEBUG,(__location__ " Created SOCKET FD:%d to connected child\n", fd));
931
932         client = talloc_zero(ctdb, struct ctdb_client);
933         if (ctdb_get_peer_pid(fd, &peer_pid) == 0) {
934                 DEBUG(DEBUG_INFO,("Connected client with pid:%u\n", (unsigned)peer_pid));
935         }
936
937         client->ctdb = ctdb;
938         client->fd = fd;
939         client->client_id = ctdb_reqid_new(ctdb, client);
940         client->pid = peer_pid;
941
942         client_pid = talloc(client, struct ctdb_client_pid_list);
943         if (client_pid == NULL) {
944                 DEBUG(DEBUG_ERR,("Failed to allocate client pid structure\n"));
945                 close(fd);
946                 talloc_free(client);
947                 return;
948         }               
949         client_pid->ctdb   = ctdb;
950         client_pid->pid    = peer_pid;
951         client_pid->client = client;
952
953         DLIST_ADD(ctdb->client_pids, client_pid);
954
955         client->queue = ctdb_queue_setup(ctdb, client, fd, CTDB_DS_ALIGNMENT, 
956                                          ctdb_daemon_read_cb, client,
957                                          "client-%u", client->pid);
958
959         talloc_set_destructor(client, ctdb_client_destructor);
960         talloc_set_destructor(client_pid, ctdb_clientpid_destructor);
961         ctdb->num_clients++;
962 }
963
964
965
966 /*
967   create a unix domain socket and bind it
968   return a file descriptor open on the socket 
969 */
970 static int ux_socket_bind(struct ctdb_context *ctdb)
971 {
972         struct sockaddr_un addr;
973
974         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
975         if (ctdb->daemon.sd == -1) {
976                 return -1;
977         }
978
979         set_close_on_exec(ctdb->daemon.sd);
980         set_nonblocking(ctdb->daemon.sd);
981
982         memset(&addr, 0, sizeof(addr));
983         addr.sun_family = AF_UNIX;
984         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
985
986         if (bind(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
987                 DEBUG(DEBUG_CRIT,("Unable to bind on ctdb socket '%s'\n", ctdb->daemon.name));
988                 goto failed;
989         }       
990
991         if (chown(ctdb->daemon.name, geteuid(), getegid()) != 0 ||
992             chmod(ctdb->daemon.name, 0700) != 0) {
993                 DEBUG(DEBUG_CRIT,("Unable to secure ctdb socket '%s', ctdb->daemon.name\n", ctdb->daemon.name));
994                 goto failed;
995         } 
996
997
998         if (listen(ctdb->daemon.sd, 100) != 0) {
999                 DEBUG(DEBUG_CRIT,("Unable to listen on ctdb socket '%s'\n", ctdb->daemon.name));
1000                 goto failed;
1001         }
1002
1003         return 0;
1004
1005 failed:
1006         close(ctdb->daemon.sd);
1007         ctdb->daemon.sd = -1;
1008         return -1;      
1009 }
1010
1011 static void initialise_node_flags (struct ctdb_context *ctdb)
1012 {
1013         if (ctdb->pnn == -1) {
1014                 ctdb_fatal(ctdb, "PNN is set to -1 (unknown value)");
1015         }
1016
1017         ctdb->nodes[ctdb->pnn]->flags &= ~NODE_FLAGS_DISCONNECTED;
1018
1019         /* do we start out in DISABLED mode? */
1020         if (ctdb->start_as_disabled != 0) {
1021                 DEBUG(DEBUG_INFO, ("This node is configured to start in DISABLED state\n"));
1022                 ctdb->nodes[ctdb->pnn]->flags |= NODE_FLAGS_DISABLED;
1023         }
1024         /* do we start out in STOPPED mode? */
1025         if (ctdb->start_as_stopped != 0) {
1026                 DEBUG(DEBUG_INFO, ("This node is configured to start in STOPPED state\n"));
1027                 ctdb->nodes[ctdb->pnn]->flags |= NODE_FLAGS_STOPPED;
1028         }
1029 }
1030
1031 static void ctdb_setup_event_callback(struct ctdb_context *ctdb, int status,
1032                                       void *private_data)
1033 {
1034         if (status != 0) {
1035                 DEBUG(DEBUG_ALERT,("Failed to run setup event - exiting\n"));
1036                 exit(1);
1037         }
1038         ctdb_run_notification_script(ctdb, "setup");
1039
1040         ctdb_set_runstate(ctdb, CTDB_RUNSTATE_FIRST_RECOVERY);
1041
1042         /* tell all other nodes we've just started up */
1043         ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL,
1044                                  0, CTDB_CONTROL_STARTUP, 0,
1045                                  CTDB_CTRL_FLAG_NOREPLY,
1046                                  tdb_null, NULL, NULL);
1047
1048         /* Start the recovery daemon */
1049         if (ctdb_start_recoverd(ctdb) != 0) {
1050                 DEBUG(DEBUG_ALERT,("Failed to start recovery daemon\n"));
1051                 exit(11);
1052         }
1053
1054         ctdb_start_periodic_events(ctdb);
1055 }
1056
1057 static struct timeval tevent_before_wait_ts;
1058 static struct timeval tevent_after_wait_ts;
1059
1060 static void ctdb_tevent_trace(enum tevent_trace_point tp,
1061                               void *private_data)
1062 {
1063         struct timeval diff;
1064         struct timeval now;
1065
1066         if (getpid() != ctdbd_pid) {
1067                 return;
1068         }
1069
1070         now = timeval_current();
1071
1072         switch (tp) {
1073         case TEVENT_TRACE_BEFORE_WAIT:
1074                 if (!timeval_is_zero(&tevent_after_wait_ts)) {
1075                         diff = timeval_until(&tevent_after_wait_ts, &now);
1076                         if (diff.tv_sec > 3) {
1077                                 DEBUG(DEBUG_ERR,
1078                                       ("Handling event took %ld seconds!\n",
1079                                        diff.tv_sec));
1080                         }
1081                 }
1082                 tevent_before_wait_ts = now;
1083                 break;
1084
1085         case TEVENT_TRACE_AFTER_WAIT:
1086                 if (!timeval_is_zero(&tevent_before_wait_ts)) {
1087                         diff = timeval_until(&tevent_before_wait_ts, &now);
1088                         if (diff.tv_sec > 3) {
1089                                 DEBUG(DEBUG_CRIT,
1090                                       ("No event for %ld seconds!\n",
1091                                        diff.tv_sec));
1092                         }
1093                 }
1094                 tevent_after_wait_ts = now;
1095                 break;
1096
1097         default:
1098                 /* Do nothing for future tevent trace points */ ;
1099         }
1100 }
1101
1102 static void ctdb_remove_pidfile(void)
1103 {
1104         if (ctdbd_pidfile != NULL && !ctdb_is_child_process()) {
1105                 if (unlink(ctdbd_pidfile) == 0) {
1106                         DEBUG(DEBUG_NOTICE, ("Removed PID file %s\n",
1107                                              ctdbd_pidfile));
1108                 } else {
1109                         DEBUG(DEBUG_WARNING, ("Failed to Remove PID file %s\n",
1110                                               ctdbd_pidfile));
1111                 }
1112         }
1113 }
1114
1115 static void ctdb_create_pidfile(pid_t pid)
1116 {
1117         if (ctdbd_pidfile != NULL) {
1118                 FILE *fp;
1119
1120                 fp = fopen(ctdbd_pidfile, "w");
1121                 if (fp == NULL) {
1122                         DEBUG(DEBUG_ALERT,
1123                               ("Failed to open PID file %s\n", ctdbd_pidfile));
1124                         exit(11);
1125                 }
1126
1127                 fprintf(fp, "%d\n", pid);
1128                 fclose(fp);
1129                 DEBUG(DEBUG_NOTICE, ("Created PID file %s\n", ctdbd_pidfile));
1130                 atexit(ctdb_remove_pidfile);
1131         }
1132 }
1133
1134 /*
1135   start the protocol going as a daemon
1136 */
1137 int ctdb_start_daemon(struct ctdb_context *ctdb, bool do_fork, bool use_syslog, const char *public_address_list)
1138 {
1139         int res, ret = -1;
1140         struct fd_event *fde;
1141         const char *domain_socket_name;
1142
1143         /* get rid of any old sockets */
1144         unlink(ctdb->daemon.name);
1145
1146         /* create a unix domain stream socket to listen to */
1147         res = ux_socket_bind(ctdb);
1148         if (res!=0) {
1149                 DEBUG(DEBUG_ALERT,(__location__ " Failed to open CTDB unix domain socket\n"));
1150                 exit(10);
1151         }
1152
1153         if (do_fork && fork()) {
1154                 return 0;
1155         }
1156
1157         tdb_reopen_all(false);
1158
1159         if (do_fork) {
1160                 setsid();
1161                 close(0);
1162                 if (open("/dev/null", O_RDONLY) != 0) {
1163                         DEBUG(DEBUG_ALERT,(__location__ " Failed to setup stdin on /dev/null\n"));
1164                         exit(11);
1165                 }
1166         }
1167         block_signal(SIGPIPE);
1168
1169         ctdbd_pid = getpid();
1170         ctdb->ctdbd_pid = ctdbd_pid;
1171         DEBUG(DEBUG_ERR, ("Starting CTDBD (Version %s) as PID: %u\n",
1172                           CTDB_VERSION_STRING, ctdbd_pid));
1173         ctdb_create_pidfile(ctdb->ctdbd_pid);
1174
1175         if (ctdb->do_setsched) {
1176                 /* try to set us up as realtime */
1177                 ctdb_set_scheduler(ctdb);
1178         }
1179
1180         /* ensure the socket is deleted on exit of the daemon */
1181         domain_socket_name = talloc_strdup(talloc_autofree_context(), ctdb->daemon.name);
1182         if (domain_socket_name == NULL) {
1183                 DEBUG(DEBUG_ALERT,(__location__ " talloc_strdup failed.\n"));
1184                 exit(12);
1185         }
1186
1187         ctdb->ev = event_context_init(NULL);
1188         tevent_loop_allow_nesting(ctdb->ev);
1189         tevent_set_trace_callback(ctdb->ev, ctdb_tevent_trace, NULL);
1190         ret = ctdb_init_tevent_logging(ctdb);
1191         if (ret != 0) {
1192                 DEBUG(DEBUG_ALERT,("Failed to initialize TEVENT logging\n"));
1193                 exit(1);
1194         }
1195
1196         /* set up a handler to pick up sigchld */
1197         if (ctdb_init_sigchld(ctdb) == NULL) {
1198                 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD\n"));
1199                 exit(1);
1200         }
1201
1202         ctdb_set_child_logging(ctdb);
1203         if (use_syslog) {
1204                 if (start_syslog_daemon(ctdb)) {
1205                         DEBUG(DEBUG_CRIT, ("Failed to start syslog daemon\n"));
1206                         exit(10);
1207                 }
1208         }
1209
1210         /* initialize statistics collection */
1211         ctdb_statistics_init(ctdb);
1212
1213         /* force initial recovery for election */
1214         ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
1215
1216         if (strcmp(ctdb->transport, "tcp") == 0) {
1217                 int ctdb_tcp_init(struct ctdb_context *);
1218                 ret = ctdb_tcp_init(ctdb);
1219         }
1220 #ifdef USE_INFINIBAND
1221         if (strcmp(ctdb->transport, "ib") == 0) {
1222                 int ctdb_ibw_init(struct ctdb_context *);
1223                 ret = ctdb_ibw_init(ctdb);
1224         }
1225 #endif
1226         if (ret != 0) {
1227                 DEBUG(DEBUG_ERR,("Failed to initialise transport '%s'\n", ctdb->transport));
1228                 return -1;
1229         }
1230
1231         if (ctdb->methods == NULL) {
1232                 DEBUG(DEBUG_ALERT,(__location__ " Can not initialize transport. ctdb->methods is NULL\n"));
1233                 ctdb_fatal(ctdb, "transport is unavailable. can not initialize.");
1234         }
1235
1236         /* initialise the transport  */
1237         if (ctdb->methods->initialise(ctdb) != 0) {
1238                 ctdb_fatal(ctdb, "transport failed to initialise");
1239         }
1240
1241         initialise_node_flags(ctdb);
1242
1243         if (public_address_list) {
1244                 ctdb->public_addresses_file = public_address_list;
1245                 ret = ctdb_set_public_addresses(ctdb, true);
1246                 if (ret == -1) {
1247                         DEBUG(DEBUG_ALERT,("Unable to setup public address list\n"));
1248                         exit(1);
1249                 }
1250                 if (ctdb->do_checkpublicip) {
1251                         ctdb_start_monitoring_interfaces(ctdb);
1252                 }
1253         }
1254
1255
1256         /* attach to existing databases */
1257         if (ctdb_attach_databases(ctdb) != 0) {
1258                 ctdb_fatal(ctdb, "Failed to attach to databases\n");
1259         }
1260
1261         ctdb_set_runstate(ctdb, CTDB_RUNSTATE_INIT);
1262         ret = ctdb_event_script(ctdb, CTDB_EVENT_INIT);
1263         if (ret != 0) {
1264                 ctdb_fatal(ctdb, "Failed to run init event\n");
1265         }
1266         ctdb_run_notification_script(ctdb, "init");
1267
1268         /* start frozen, then let the first election sort things out */
1269         if (!ctdb_blocking_freeze(ctdb)) {
1270                 ctdb_fatal(ctdb, "Failed to get initial freeze\n");
1271         }
1272
1273         /* now start accepting clients, only can do this once frozen */
1274         fde = event_add_fd(ctdb->ev, ctdb, ctdb->daemon.sd, 
1275                            EVENT_FD_READ,
1276                            ctdb_accept_client, ctdb);
1277         tevent_fd_set_auto_close(fde);
1278
1279         /* release any IPs we hold from previous runs of the daemon */
1280         if (ctdb->tunable.disable_ip_failover == 0) {
1281                 ctdb_release_all_ips(ctdb);
1282         }
1283
1284
1285         /* Make sure we log something when the daemon terminates */
1286         atexit(print_exit_message);
1287
1288         /* Start the transport */
1289         if (ctdb->methods->start(ctdb) != 0) {
1290                 DEBUG(DEBUG_ALERT,("transport failed to start!\n"));
1291                 ctdb_fatal(ctdb, "transport failed to start");
1292         }
1293
1294         /* Recovery daemon and timed events are started from the
1295          * callback, only after the setup event completes
1296          * successfully.
1297          */
1298         ctdb_set_runstate(ctdb, CTDB_RUNSTATE_SETUP);
1299         ret = ctdb_event_script_callback(ctdb,
1300                                          ctdb,
1301                                          ctdb_setup_event_callback,
1302                                          ctdb,
1303                                          false,
1304                                          CTDB_EVENT_SETUP,
1305                                          "%s",
1306                                          "");
1307         if (ret != 0) {
1308                 DEBUG(DEBUG_CRIT,("Failed to set up 'setup' event\n"));
1309                 exit(1);
1310         }
1311
1312         ctdb_lockdown_memory(ctdb);
1313           
1314         /* go into a wait loop to allow other nodes to complete */
1315         event_loop_wait(ctdb->ev);
1316
1317         DEBUG(DEBUG_CRIT,("event_loop_wait() returned. this should not happen\n"));
1318         exit(1);
1319 }
1320
1321 /*
1322   allocate a packet for use in daemon<->daemon communication
1323  */
1324 struct ctdb_req_header *_ctdb_transport_allocate(struct ctdb_context *ctdb,
1325                                                  TALLOC_CTX *mem_ctx, 
1326                                                  enum ctdb_operation operation, 
1327                                                  size_t length, size_t slength,
1328                                                  const char *type)
1329 {
1330         int size;
1331         struct ctdb_req_header *hdr;
1332
1333         length = MAX(length, slength);
1334         size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
1335
1336         if (ctdb->methods == NULL) {
1337                 DEBUG(DEBUG_INFO,(__location__ " Unable to allocate transport packet for operation %u of length %u. Transport is DOWN.\n",
1338                          operation, (unsigned)length));
1339                 return NULL;
1340         }
1341
1342         hdr = (struct ctdb_req_header *)ctdb->methods->allocate_pkt(mem_ctx, size);
1343         if (hdr == NULL) {
1344                 DEBUG(DEBUG_ERR,("Unable to allocate transport packet for operation %u of length %u\n",
1345                          operation, (unsigned)length));
1346                 return NULL;
1347         }
1348         talloc_set_name_const(hdr, type);
1349         memset(hdr, 0, slength);
1350         hdr->length       = length;
1351         hdr->operation    = operation;
1352         hdr->ctdb_magic   = CTDB_MAGIC;
1353         hdr->ctdb_version = CTDB_VERSION;
1354         hdr->generation   = ctdb->vnn_map->generation;
1355         hdr->srcnode      = ctdb->pnn;
1356
1357         return hdr;     
1358 }
1359
1360 struct daemon_control_state {
1361         struct daemon_control_state *next, *prev;
1362         struct ctdb_client *client;
1363         struct ctdb_req_control *c;
1364         uint32_t reqid;
1365         struct ctdb_node *node;
1366 };
1367
1368 /*
1369   callback when a control reply comes in
1370  */
1371 static void daemon_control_callback(struct ctdb_context *ctdb,
1372                                     int32_t status, TDB_DATA data, 
1373                                     const char *errormsg,
1374                                     void *private_data)
1375 {
1376         struct daemon_control_state *state = talloc_get_type(private_data, 
1377                                                              struct daemon_control_state);
1378         struct ctdb_client *client = state->client;
1379         struct ctdb_reply_control *r;
1380         size_t len;
1381         int ret;
1382
1383         /* construct a message to send to the client containing the data */
1384         len = offsetof(struct ctdb_reply_control, data) + data.dsize;
1385         if (errormsg) {
1386                 len += strlen(errormsg);
1387         }
1388         r = ctdbd_allocate_pkt(ctdb, state, CTDB_REPLY_CONTROL, len, 
1389                                struct ctdb_reply_control);
1390         CTDB_NO_MEMORY_VOID(ctdb, r);
1391
1392         r->hdr.reqid     = state->reqid;
1393         r->status        = status;
1394         r->datalen       = data.dsize;
1395         r->errorlen = 0;
1396         memcpy(&r->data[0], data.dptr, data.dsize);
1397         if (errormsg) {
1398                 r->errorlen = strlen(errormsg);
1399                 memcpy(&r->data[r->datalen], errormsg, r->errorlen);
1400         }
1401
1402         ret = daemon_queue_send(client, &r->hdr);
1403         if (ret != -1) {
1404                 talloc_free(state);
1405         }
1406 }
1407
1408 /*
1409   fail all pending controls to a disconnected node
1410  */
1411 void ctdb_daemon_cancel_controls(struct ctdb_context *ctdb, struct ctdb_node *node)
1412 {
1413         struct daemon_control_state *state;
1414         while ((state = node->pending_controls)) {
1415                 DLIST_REMOVE(node->pending_controls, state);
1416                 daemon_control_callback(ctdb, (uint32_t)-1, tdb_null, 
1417                                         "node is disconnected", state);
1418         }
1419 }
1420
1421 /*
1422   destroy a daemon_control_state
1423  */
1424 static int daemon_control_destructor(struct daemon_control_state *state)
1425 {
1426         if (state->node) {
1427                 DLIST_REMOVE(state->node->pending_controls, state);
1428         }
1429         return 0;
1430 }
1431
1432 /*
1433   this is called when the ctdb daemon received a ctdb request control
1434   from a local client over the unix domain socket
1435  */
1436 static void daemon_request_control_from_client(struct ctdb_client *client, 
1437                                                struct ctdb_req_control *c)
1438 {
1439         TDB_DATA data;
1440         int res;
1441         struct daemon_control_state *state;
1442         TALLOC_CTX *tmp_ctx = talloc_new(client);
1443
1444         if (c->hdr.destnode == CTDB_CURRENT_NODE) {
1445                 c->hdr.destnode = client->ctdb->pnn;
1446         }
1447
1448         state = talloc(client, struct daemon_control_state);
1449         CTDB_NO_MEMORY_VOID(client->ctdb, state);
1450
1451         state->client = client;
1452         state->c = talloc_steal(state, c);
1453         state->reqid = c->hdr.reqid;
1454         if (ctdb_validate_pnn(client->ctdb, c->hdr.destnode)) {
1455                 state->node = client->ctdb->nodes[c->hdr.destnode];
1456                 DLIST_ADD(state->node->pending_controls, state);
1457         } else {
1458                 state->node = NULL;
1459         }
1460
1461         talloc_set_destructor(state, daemon_control_destructor);
1462
1463         if (c->flags & CTDB_CTRL_FLAG_NOREPLY) {
1464                 talloc_steal(tmp_ctx, state);
1465         }
1466         
1467         data.dptr = &c->data[0];
1468         data.dsize = c->datalen;
1469         res = ctdb_daemon_send_control(client->ctdb, c->hdr.destnode,
1470                                        c->srvid, c->opcode, client->client_id,
1471                                        c->flags,
1472                                        data, daemon_control_callback,
1473                                        state);
1474         if (res != 0) {
1475                 DEBUG(DEBUG_ERR,(__location__ " Failed to send control to remote node %u\n",
1476                          c->hdr.destnode));
1477         }
1478
1479         talloc_free(tmp_ctx);
1480 }
1481
1482 /*
1483   register a call function
1484 */
1485 int ctdb_daemon_set_call(struct ctdb_context *ctdb, uint32_t db_id,
1486                          ctdb_fn_t fn, int id)
1487 {
1488         struct ctdb_registered_call *call;
1489         struct ctdb_db_context *ctdb_db;
1490
1491         ctdb_db = find_ctdb_db(ctdb, db_id);
1492         if (ctdb_db == NULL) {
1493                 return -1;
1494         }
1495
1496         call = talloc(ctdb_db, struct ctdb_registered_call);
1497         call->fn = fn;
1498         call->id = id;
1499
1500         DLIST_ADD(ctdb_db->calls, call);        
1501         return 0;
1502 }
1503
1504
1505
1506 /*
1507   this local messaging handler is ugly, but is needed to prevent
1508   recursion in ctdb_send_message() when the destination node is the
1509   same as the source node
1510  */
1511 struct ctdb_local_message {
1512         struct ctdb_context *ctdb;
1513         uint64_t srvid;
1514         TDB_DATA data;
1515 };
1516
1517 static void ctdb_local_message_trigger(struct event_context *ev, struct timed_event *te, 
1518                                        struct timeval t, void *private_data)
1519 {
1520         struct ctdb_local_message *m = talloc_get_type(private_data, 
1521                                                        struct ctdb_local_message);
1522         int res;
1523
1524         res = ctdb_dispatch_message(m->ctdb, m->srvid, m->data);
1525         if (res != 0) {
1526                 DEBUG(DEBUG_ERR, (__location__ " Failed to dispatch message for srvid=%llu\n", 
1527                           (unsigned long long)m->srvid));
1528         }
1529         talloc_free(m);
1530 }
1531
1532 static int ctdb_local_message(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data)
1533 {
1534         struct ctdb_local_message *m;
1535         m = talloc(ctdb, struct ctdb_local_message);
1536         CTDB_NO_MEMORY(ctdb, m);
1537
1538         m->ctdb = ctdb;
1539         m->srvid = srvid;
1540         m->data  = data;
1541         m->data.dptr = talloc_memdup(m, m->data.dptr, m->data.dsize);
1542         if (m->data.dptr == NULL) {
1543                 talloc_free(m);
1544                 return -1;
1545         }
1546
1547         /* this needs to be done as an event to prevent recursion */
1548         event_add_timed(ctdb->ev, m, timeval_zero(), ctdb_local_message_trigger, m);
1549         return 0;
1550 }
1551
1552 /*
1553   send a ctdb message
1554 */
1555 int ctdb_daemon_send_message(struct ctdb_context *ctdb, uint32_t pnn,
1556                              uint64_t srvid, TDB_DATA data)
1557 {
1558         struct ctdb_req_message *r;
1559         int len;
1560
1561         if (ctdb->methods == NULL) {
1562                 DEBUG(DEBUG_INFO,(__location__ " Failed to send message. Transport is DOWN\n"));
1563                 return -1;
1564         }
1565
1566         /* see if this is a message to ourselves */
1567         if (pnn == ctdb->pnn) {
1568                 return ctdb_local_message(ctdb, srvid, data);
1569         }
1570
1571         len = offsetof(struct ctdb_req_message, data) + data.dsize;
1572         r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_MESSAGE, len,
1573                                     struct ctdb_req_message);
1574         CTDB_NO_MEMORY(ctdb, r);
1575
1576         r->hdr.destnode  = pnn;
1577         r->srvid         = srvid;
1578         r->datalen       = data.dsize;
1579         memcpy(&r->data[0], data.dptr, data.dsize);
1580
1581         ctdb_queue_packet(ctdb, &r->hdr);
1582
1583         talloc_free(r);
1584         return 0;
1585 }
1586
1587
1588
1589 struct ctdb_client_notify_list {
1590         struct ctdb_client_notify_list *next, *prev;
1591         struct ctdb_context *ctdb;
1592         uint64_t srvid;
1593         TDB_DATA data;
1594 };
1595
1596
1597 static int ctdb_client_notify_destructor(struct ctdb_client_notify_list *nl)
1598 {
1599         int ret;
1600
1601         DEBUG(DEBUG_ERR,("Sending client notify message for srvid:%llu\n", (unsigned long long)nl->srvid));
1602
1603         ret = ctdb_daemon_send_message(nl->ctdb, CTDB_BROADCAST_CONNECTED, (unsigned long long)nl->srvid, nl->data);
1604         if (ret != 0) {
1605                 DEBUG(DEBUG_ERR,("Failed to send client notify message\n"));
1606         }
1607
1608         return 0;
1609 }
1610
1611 int32_t ctdb_control_register_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1612 {
1613         struct ctdb_client_notify_register *notify = (struct ctdb_client_notify_register *)indata.dptr;
1614         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client); 
1615         struct ctdb_client_notify_list *nl;
1616
1617         DEBUG(DEBUG_INFO,("Register srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
1618
1619         if (indata.dsize < offsetof(struct ctdb_client_notify_register, notify_data)) {
1620                 DEBUG(DEBUG_ERR,(__location__ " Too little data in control : %d\n", (int)indata.dsize));
1621                 return -1;
1622         }
1623
1624         if (indata.dsize != (notify->len + offsetof(struct ctdb_client_notify_register, notify_data))) {
1625                 DEBUG(DEBUG_ERR,(__location__ " Wrong amount of data in control. Got %d, expected %d\n", (int)indata.dsize, (int)(notify->len + offsetof(struct ctdb_client_notify_register, notify_data))));
1626                 return -1;
1627         }
1628
1629
1630         if (client == NULL) {
1631                 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1632                 return -1;
1633         }
1634
1635         for(nl=client->notify; nl; nl=nl->next) {
1636                 if (nl->srvid == notify->srvid) {
1637                         break;
1638                 }
1639         }
1640         if (nl != NULL) {
1641                 DEBUG(DEBUG_ERR,(__location__ " Notification for srvid:%llu already exists for this client\n", (unsigned long long)notify->srvid));
1642                 return -1;
1643         }
1644
1645         nl = talloc(client, struct ctdb_client_notify_list);
1646         CTDB_NO_MEMORY(ctdb, nl);
1647         nl->ctdb       = ctdb;
1648         nl->srvid      = notify->srvid;
1649         nl->data.dsize = notify->len;
1650         nl->data.dptr  = talloc_size(nl, nl->data.dsize);
1651         CTDB_NO_MEMORY(ctdb, nl->data.dptr);
1652         memcpy(nl->data.dptr, notify->notify_data, nl->data.dsize);
1653         
1654         DLIST_ADD(client->notify, nl);
1655         talloc_set_destructor(nl, ctdb_client_notify_destructor);
1656
1657         return 0;
1658 }
1659
1660 int32_t ctdb_control_deregister_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1661 {
1662         struct ctdb_client_notify_deregister *notify = (struct ctdb_client_notify_deregister *)indata.dptr;
1663         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client); 
1664         struct ctdb_client_notify_list *nl;
1665
1666         DEBUG(DEBUG_INFO,("Deregister srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
1667
1668         if (client == NULL) {
1669                 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1670                 return -1;
1671         }
1672
1673         for(nl=client->notify; nl; nl=nl->next) {
1674                 if (nl->srvid == notify->srvid) {
1675                         break;
1676                 }
1677         }
1678         if (nl == NULL) {
1679                 DEBUG(DEBUG_ERR,(__location__ " No notification for srvid:%llu found for this client\n", (unsigned long long)notify->srvid));
1680                 return -1;
1681         }
1682
1683         DLIST_REMOVE(client->notify, nl);
1684         talloc_set_destructor(nl, NULL);
1685         talloc_free(nl);
1686
1687         return 0;
1688 }
1689
1690 struct ctdb_client *ctdb_find_client_by_pid(struct ctdb_context *ctdb, pid_t pid)
1691 {
1692         struct ctdb_client_pid_list *client_pid;
1693
1694         for (client_pid = ctdb->client_pids; client_pid; client_pid=client_pid->next) {
1695                 if (client_pid->pid == pid) {
1696                         return client_pid->client;
1697                 }
1698         }
1699         return NULL;
1700 }
1701
1702
1703 /* This control is used by samba when probing if a process (of a samba daemon)
1704    exists on the node.
1705    Samba does this when it needs/wants to check if a subrecord in one of the
1706    databases is still valied, or if it is stale and can be removed.
1707    If the node is in unhealthy or stopped state we just kill of the samba
1708    process holding htis sub-record and return to the calling samba that
1709    the process does not exist.
1710    This allows us to forcefully recall subrecords registered by samba processes
1711    on banned and stopped nodes.
1712 */
1713 int32_t ctdb_control_process_exists(struct ctdb_context *ctdb, pid_t pid)
1714 {
1715         struct ctdb_client *client;
1716
1717         if (ctdb->nodes[ctdb->pnn]->flags & (NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
1718                 client = ctdb_find_client_by_pid(ctdb, pid);
1719                 if (client != NULL) {
1720                         DEBUG(DEBUG_NOTICE,(__location__ " Killing client with pid:%d on banned/stopped node\n", (int)pid));
1721                         talloc_free(client);
1722                 }
1723                 return -1;
1724         }
1725
1726         return kill(pid, 0);
1727 }