0f17b0f46812795a1020c53cf68b55e55fca622c
[sharpe/samba-autobuild/.git] / ctdb / server / ctdb_daemon.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2006
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "replace.h"
21 #include "system/network.h"
22 #include "system/filesys.h"
23 #include "system/wait.h"
24 #include "system/time.h"
25
26 #include <talloc.h>
27 /* Allow use of deprecated function tevent_loop_allow_nesting() */
28 #define TEVENT_DEPRECATED
29 #include <tevent.h>
30 #include <tdb.h>
31
32 #include "lib/tdb_wrap/tdb_wrap.h"
33 #include "lib/util/dlinklist.h"
34 #include "lib/util/debug.h"
35 #include "lib/util/samba_util.h"
36 #include "lib/util/blocking.h"
37
38 #include "ctdb_version.h"
39 #include "ctdb_private.h"
40 #include "ctdb_client.h"
41
42 #include "common/rb_tree.h"
43 #include "common/reqid.h"
44 #include "common/system.h"
45 #include "common/common.h"
46 #include "common/logging.h"
47
48 struct ctdb_client_pid_list {
49         struct ctdb_client_pid_list *next, *prev;
50         struct ctdb_context *ctdb;
51         pid_t pid;
52         struct ctdb_client *client;
53 };
54
55 const char *ctdbd_pidfile = NULL;
56
57 static void daemon_incoming_packet(void *, struct ctdb_req_header *);
58
59 static void print_exit_message(void)
60 {
61         if (debug_extra != NULL && debug_extra[0] != '\0') {
62                 DEBUG(DEBUG_NOTICE,("CTDB %s shutting down\n", debug_extra));
63         } else {
64                 DEBUG(DEBUG_NOTICE,("CTDB daemon shutting down\n"));
65
66                 /* Wait a second to allow pending log messages to be flushed */
67                 sleep(1);
68         }
69 }
70
71
72
73 static void ctdb_time_tick(struct tevent_context *ev, struct tevent_timer *te,
74                                   struct timeval t, void *private_data)
75 {
76         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
77
78         if (getpid() != ctdb->ctdbd_pid) {
79                 return;
80         }
81
82         tevent_add_timer(ctdb->ev, ctdb,
83                          timeval_current_ofs(1, 0),
84                          ctdb_time_tick, ctdb);
85 }
86
87 /* Used to trigger a dummy event once per second, to make
88  * detection of hangs more reliable.
89  */
90 static void ctdb_start_time_tickd(struct ctdb_context *ctdb)
91 {
92         tevent_add_timer(ctdb->ev, ctdb,
93                          timeval_current_ofs(1, 0),
94                          ctdb_time_tick, ctdb);
95 }
96
97 static void ctdb_start_periodic_events(struct ctdb_context *ctdb)
98 {
99         /* start monitoring for connected/disconnected nodes */
100         ctdb_start_keepalive(ctdb);
101
102         /* start periodic update of tcp tickle lists */
103         ctdb_start_tcp_tickle_update(ctdb);
104
105         /* start listening for recovery daemon pings */
106         ctdb_control_recd_ping(ctdb);
107
108         /* start listening to timer ticks */
109         ctdb_start_time_tickd(ctdb);
110 }
111
112 static void ignore_signal(int signum)
113 {
114         struct sigaction act;
115
116         memset(&act, 0, sizeof(act));
117
118         act.sa_handler = SIG_IGN;
119         sigemptyset(&act.sa_mask);
120         sigaddset(&act.sa_mask, signum);
121         sigaction(signum, &act, NULL);
122 }
123
124
125 /*
126   send a packet to a client
127  */
128 static int daemon_queue_send(struct ctdb_client *client, struct ctdb_req_header *hdr)
129 {
130         CTDB_INCREMENT_STAT(client->ctdb, client_packets_sent);
131         if (hdr->operation == CTDB_REQ_MESSAGE) {
132                 if (ctdb_queue_length(client->queue) > client->ctdb->tunable.max_queue_depth_drop_msg) {
133                         DEBUG(DEBUG_ERR,("CTDB_REQ_MESSAGE queue full - killing client connection.\n"));
134                         talloc_free(client);
135                         return -1;
136                 }
137         }
138         return ctdb_queue_send(client->queue, (uint8_t *)hdr, hdr->length);
139 }
140
141 /*
142   message handler for when we are in daemon mode. This redirects the message
143   to the right client
144  */
145 static void daemon_message_handler(uint64_t srvid, TDB_DATA data,
146                                    void *private_data)
147 {
148         struct ctdb_client *client = talloc_get_type(private_data, struct ctdb_client);
149         struct ctdb_req_message_old *r;
150         int len;
151
152         /* construct a message to send to the client containing the data */
153         len = offsetof(struct ctdb_req_message_old, data) + data.dsize;
154         r = ctdbd_allocate_pkt(client->ctdb, client->ctdb, CTDB_REQ_MESSAGE,
155                                len, struct ctdb_req_message_old);
156         CTDB_NO_MEMORY_VOID(client->ctdb, r);
157
158         talloc_set_name_const(r, "req_message packet");
159
160         r->srvid         = srvid;
161         r->datalen       = data.dsize;
162         memcpy(&r->data[0], data.dptr, data.dsize);
163
164         daemon_queue_send(client, &r->hdr);
165
166         talloc_free(r);
167 }
168
169 /*
170   this is called when the ctdb daemon received a ctdb request to 
171   set the srvid from the client
172  */
173 int daemon_register_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
174 {
175         struct ctdb_client *client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
176         int res;
177         if (client == NULL) {
178                 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_register_message_handler\n"));
179                 return -1;
180         }
181         res = srvid_register(ctdb->srv, client, srvid, daemon_message_handler,
182                              client);
183         if (res != 0) {
184                 DEBUG(DEBUG_ERR,(__location__ " Failed to register handler %llu in daemon\n", 
185                          (unsigned long long)srvid));
186         } else {
187                 DEBUG(DEBUG_INFO,(__location__ " Registered message handler for srvid=%llu\n", 
188                          (unsigned long long)srvid));
189         }
190
191         return res;
192 }
193
194 /*
195   this is called when the ctdb daemon received a ctdb request to 
196   remove a srvid from the client
197  */
198 int daemon_deregister_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
199 {
200         struct ctdb_client *client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
201         if (client == NULL) {
202                 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_deregister_message_handler\n"));
203                 return -1;
204         }
205         return srvid_deregister(ctdb->srv, srvid, client);
206 }
207
208 int daemon_check_srvids(struct ctdb_context *ctdb, TDB_DATA indata,
209                         TDB_DATA *outdata)
210 {
211         uint64_t *ids;
212         int i, num_ids;
213         uint8_t *results;
214
215         if ((indata.dsize % sizeof(uint64_t)) != 0) {
216                 DEBUG(DEBUG_ERR, ("Bad indata in daemon_check_srvids, "
217                                   "size=%d\n", (int)indata.dsize));
218                 return -1;
219         }
220
221         ids = (uint64_t *)indata.dptr;
222         num_ids = indata.dsize / 8;
223
224         results = talloc_zero_array(outdata, uint8_t, (num_ids+7)/8);
225         if (results == NULL) {
226                 DEBUG(DEBUG_ERR, ("talloc failed in daemon_check_srvids\n"));
227                 return -1;
228         }
229         for (i=0; i<num_ids; i++) {
230                 if (srvid_exists(ctdb->srv, ids[i]) == 0) {
231                         results[i/8] |= (1 << (i%8));
232                 }
233         }
234         outdata->dptr = (uint8_t *)results;
235         outdata->dsize = talloc_get_size(results);
236         return 0;
237 }
238
239 /*
240   destroy a ctdb_client
241 */
242 static int ctdb_client_destructor(struct ctdb_client *client)
243 {
244         struct ctdb_db_context *ctdb_db;
245
246         ctdb_takeover_client_destructor_hook(client);
247         reqid_remove(client->ctdb->idr, client->client_id);
248         client->ctdb->num_clients--;
249
250         if (client->num_persistent_updates != 0) {
251                 DEBUG(DEBUG_ERR,(__location__ " Client disconnecting with %u persistent updates in flight. Starting recovery\n", client->num_persistent_updates));
252                 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
253         }
254         ctdb_db = find_ctdb_db(client->ctdb, client->db_id);
255         if (ctdb_db) {
256                 DEBUG(DEBUG_ERR, (__location__ " client exit while transaction "
257                                   "commit active. Forcing recovery.\n"));
258                 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
259
260                 /*
261                  * trans3 transaction state:
262                  *
263                  * The destructor sets the pointer to NULL.
264                  */
265                 talloc_free(ctdb_db->persistent_state);
266         }
267
268         return 0;
269 }
270
271
272 /*
273   this is called when the ctdb daemon received a ctdb request message
274   from a local client over the unix domain socket
275  */
276 static void daemon_request_message_from_client(struct ctdb_client *client, 
277                                                struct ctdb_req_message_old *c)
278 {
279         TDB_DATA data;
280         int res;
281
282         if (c->hdr.destnode == CTDB_CURRENT_NODE) {
283                 c->hdr.destnode = ctdb_get_pnn(client->ctdb);
284         }
285
286         /* maybe the message is for another client on this node */
287         if (ctdb_get_pnn(client->ctdb)==c->hdr.destnode) {
288                 ctdb_request_message(client->ctdb, (struct ctdb_req_header *)c);
289                 return;
290         }
291
292         /* its for a remote node */
293         data.dptr = &c->data[0];
294         data.dsize = c->datalen;
295         res = ctdb_daemon_send_message(client->ctdb, c->hdr.destnode,
296                                        c->srvid, data);
297         if (res != 0) {
298                 DEBUG(DEBUG_ERR,(__location__ " Failed to send message to remote node %u\n",
299                          c->hdr.destnode));
300         }
301 }
302
303
304 struct daemon_call_state {
305         struct ctdb_client *client;
306         uint32_t reqid;
307         struct ctdb_call *call;
308         struct timeval start_time;
309
310         /* readonly request ? */
311         uint32_t readonly_fetch;
312         uint32_t client_callid;
313 };
314
315 /* 
316    complete a call from a client 
317 */
318 static void daemon_call_from_client_callback(struct ctdb_call_state *state)
319 {
320         struct daemon_call_state *dstate = talloc_get_type(state->async.private_data, 
321                                                            struct daemon_call_state);
322         struct ctdb_reply_call_old *r;
323         int res;
324         uint32_t length;
325         struct ctdb_client *client = dstate->client;
326         struct ctdb_db_context *ctdb_db = state->ctdb_db;
327
328         talloc_steal(client, dstate);
329         talloc_steal(dstate, dstate->call);
330
331         res = ctdb_daemon_call_recv(state, dstate->call);
332         if (res != 0) {
333                 DEBUG(DEBUG_ERR, (__location__ " ctdbd_call_recv() returned error\n"));
334                 CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
335
336                 CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 1", call_latency, dstate->start_time);
337                 return;
338         }
339
340         length = offsetof(struct ctdb_reply_call_old, data) + dstate->call->reply_data.dsize;
341         /* If the client asked for readonly FETCH, we remapped this to 
342            FETCH_WITH_HEADER when calling the daemon. So we must
343            strip the extra header off the reply data before passing
344            it back to the client.
345         */
346         if (dstate->readonly_fetch
347         && dstate->client_callid == CTDB_FETCH_FUNC) {
348                 length -= sizeof(struct ctdb_ltdb_header);
349         }
350
351         r = ctdbd_allocate_pkt(client->ctdb, dstate, CTDB_REPLY_CALL, 
352                                length, struct ctdb_reply_call_old);
353         if (r == NULL) {
354                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate reply_call in ctdb daemon\n"));
355                 CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
356                 CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 2", call_latency, dstate->start_time);
357                 return;
358         }
359         r->hdr.reqid        = dstate->reqid;
360         r->status           = dstate->call->status;
361
362         if (dstate->readonly_fetch
363         && dstate->client_callid == CTDB_FETCH_FUNC) {
364                 /* client only asked for a FETCH so we must strip off
365                    the extra ctdb_ltdb header
366                 */
367                 r->datalen          = dstate->call->reply_data.dsize - sizeof(struct ctdb_ltdb_header);
368                 memcpy(&r->data[0], dstate->call->reply_data.dptr + sizeof(struct ctdb_ltdb_header), r->datalen);
369         } else {
370                 r->datalen          = dstate->call->reply_data.dsize;
371                 memcpy(&r->data[0], dstate->call->reply_data.dptr, r->datalen);
372         }
373
374         res = daemon_queue_send(client, &r->hdr);
375         if (res == -1) {
376                 /* client is dead - return immediately */
377                 return;
378         }
379         if (res != 0) {
380                 DEBUG(DEBUG_ERR, (__location__ " Failed to queue packet from daemon to client\n"));
381         }
382         CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 3", call_latency, dstate->start_time);
383         CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
384         talloc_free(dstate);
385 }
386
387 struct ctdb_daemon_packet_wrap {
388         struct ctdb_context *ctdb;
389         uint32_t client_id;
390 };
391
392 /*
393   a wrapper to catch disconnected clients
394  */
395 static void daemon_incoming_packet_wrap(void *p, struct ctdb_req_header *hdr)
396 {
397         struct ctdb_client *client;
398         struct ctdb_daemon_packet_wrap *w = talloc_get_type(p, 
399                                                             struct ctdb_daemon_packet_wrap);
400         if (w == NULL) {
401                 DEBUG(DEBUG_CRIT,(__location__ " Bad packet type '%s'\n", talloc_get_name(p)));
402                 return;
403         }
404
405         client = reqid_find(w->ctdb->idr, w->client_id, struct ctdb_client);
406         if (client == NULL) {
407                 DEBUG(DEBUG_ERR,(__location__ " Packet for disconnected client %u\n",
408                          w->client_id));
409                 talloc_free(w);
410                 return;
411         }
412         talloc_free(w);
413
414         /* process it */
415         daemon_incoming_packet(client, hdr);    
416 }
417
418 struct ctdb_deferred_fetch_call {
419         struct ctdb_deferred_fetch_call *next, *prev;
420         struct ctdb_req_call_old *c;
421         struct ctdb_daemon_packet_wrap *w;
422 };
423
424 struct ctdb_deferred_fetch_queue {
425         struct ctdb_deferred_fetch_call *deferred_calls;
426 };
427
428 struct ctdb_deferred_requeue {
429         struct ctdb_deferred_fetch_call *dfc;
430         struct ctdb_client *client;
431 };
432
433 /* called from a timer event and starts reprocessing the deferred call.*/
434 static void reprocess_deferred_call(struct tevent_context *ev,
435                                     struct tevent_timer *te,
436                                     struct timeval t, void *private_data)
437 {
438         struct ctdb_deferred_requeue *dfr = (struct ctdb_deferred_requeue *)private_data;
439         struct ctdb_client *client = dfr->client;
440
441         talloc_steal(client, dfr->dfc->c);
442         daemon_incoming_packet(client, (struct ctdb_req_header *)dfr->dfc->c);
443         talloc_free(dfr);
444 }
445
446 /* the referral context is destroyed either after a timeout or when the initial
447    fetch-lock has finished.
448    at this stage, immediately start reprocessing the queued up deferred
449    calls so they get reprocessed immediately (and since we are dmaster at
450    this stage, trigger the waiting smbd processes to pick up and aquire the
451    record right away.
452 */
453 static int deferred_fetch_queue_destructor(struct ctdb_deferred_fetch_queue *dfq)
454 {
455
456         /* need to reprocess the packets from the queue explicitely instead of
457            just using a normal destructor since we want, need, to
458            call the clients in the same oder as the requests queued up
459         */
460         while (dfq->deferred_calls != NULL) {
461                 struct ctdb_client *client;
462                 struct ctdb_deferred_fetch_call *dfc = dfq->deferred_calls;
463                 struct ctdb_deferred_requeue *dfr;
464
465                 DLIST_REMOVE(dfq->deferred_calls, dfc);
466
467                 client = reqid_find(dfc->w->ctdb->idr, dfc->w->client_id, struct ctdb_client);
468                 if (client == NULL) {
469                         DEBUG(DEBUG_ERR,(__location__ " Packet for disconnected client %u\n",
470                                  dfc->w->client_id));
471                         continue;
472                 }
473
474                 /* process it by pushing it back onto the eventloop */
475                 dfr = talloc(client, struct ctdb_deferred_requeue);
476                 if (dfr == NULL) {
477                         DEBUG(DEBUG_ERR,("Failed to allocate deferred fetch requeue structure\n"));
478                         continue;
479                 }
480
481                 dfr->dfc    = talloc_steal(dfr, dfc);
482                 dfr->client = client;
483
484                 tevent_add_timer(dfc->w->ctdb->ev, client, timeval_zero(),
485                                  reprocess_deferred_call, dfr);
486         }
487
488         return 0;
489 }
490
491 /* insert the new deferral context into the rb tree.
492    there should never be a pre-existing context here, but check for it
493    warn and destroy the previous context if there is already a deferral context
494    for this key.
495 */
496 static void *insert_dfq_callback(void *parm, void *data)
497 {
498         if (data) {
499                 DEBUG(DEBUG_ERR,("Already have DFQ registered. Free old %p and create new %p\n", data, parm));
500                 talloc_free(data);
501         }
502         return parm;
503 }
504
505 /* if the original fetch-lock did not complete within a reasonable time,
506    free the context and context for all deferred requests to cause them to be
507    re-inserted into the event system.
508 */
509 static void dfq_timeout(struct tevent_context *ev, struct tevent_timer *te,
510                         struct timeval t, void *private_data)
511 {
512         talloc_free(private_data);
513 }
514
515 /* This function is used in the local daemon to register a KEY in a database
516    for being "fetched"
517    While the remote fetch is in-flight, any futher attempts to re-fetch the
518    same record will be deferred until the fetch completes.
519 */
520 static int setup_deferred_fetch_locks(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
521 {
522         uint32_t *k;
523         struct ctdb_deferred_fetch_queue *dfq;
524
525         k = ctdb_key_to_idkey(call, call->key);
526         if (k == NULL) {
527                 DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch\n"));
528                 return -1;
529         }
530
531         dfq  = talloc(call, struct ctdb_deferred_fetch_queue);
532         if (dfq == NULL) {
533                 DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch queue structure\n"));
534                 talloc_free(k);
535                 return -1;
536         }
537         dfq->deferred_calls = NULL;
538
539         trbt_insertarray32_callback(ctdb_db->deferred_fetch, k[0], &k[0], insert_dfq_callback, dfq);
540
541         talloc_set_destructor(dfq, deferred_fetch_queue_destructor);
542
543         /* if the fetch havent completed in 30 seconds, just tear it all down
544            and let it try again as the events are reissued */
545         tevent_add_timer(ctdb_db->ctdb->ev, dfq, timeval_current_ofs(30, 0),
546                          dfq_timeout, dfq);
547
548         talloc_free(k);
549         return 0;
550 }
551
552 /* check if this is a duplicate request to a fetch already in-flight
553    if it is, make this call deferred to be reprocessed later when
554    the in-flight fetch completes.
555 */
556 static int requeue_duplicate_fetch(struct ctdb_db_context *ctdb_db, struct ctdb_client *client, TDB_DATA key, struct ctdb_req_call_old *c)
557 {
558         uint32_t *k;
559         struct ctdb_deferred_fetch_queue *dfq;
560         struct ctdb_deferred_fetch_call *dfc;
561
562         k = ctdb_key_to_idkey(c, key);
563         if (k == NULL) {
564                 DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch\n"));
565                 return -1;
566         }
567
568         dfq = trbt_lookuparray32(ctdb_db->deferred_fetch, k[0], &k[0]);
569         if (dfq == NULL) {
570                 talloc_free(k);
571                 return -1;
572         }
573
574
575         talloc_free(k);
576
577         dfc = talloc(dfq, struct ctdb_deferred_fetch_call);
578         if (dfc == NULL) {
579                 DEBUG(DEBUG_ERR, ("Failed to allocate deferred fetch call structure\n"));
580                 return -1;
581         }
582
583         dfc->w = talloc(dfc, struct ctdb_daemon_packet_wrap);
584         if (dfc->w == NULL) {
585                 DEBUG(DEBUG_ERR,("Failed to allocate deferred fetch daemon packet wrap structure\n"));
586                 talloc_free(dfc);
587                 return -1;
588         }
589
590         dfc->c = talloc_steal(dfc, c);
591         dfc->w->ctdb = ctdb_db->ctdb;
592         dfc->w->client_id = client->client_id;
593
594         DLIST_ADD_END(dfq->deferred_calls, dfc);
595
596         return 0;
597 }
598
599
600 /*
601   this is called when the ctdb daemon received a ctdb request call
602   from a local client over the unix domain socket
603  */
604 static void daemon_request_call_from_client(struct ctdb_client *client, 
605                                             struct ctdb_req_call_old *c)
606 {
607         struct ctdb_call_state *state;
608         struct ctdb_db_context *ctdb_db;
609         struct daemon_call_state *dstate;
610         struct ctdb_call *call;
611         struct ctdb_ltdb_header header;
612         TDB_DATA key, data;
613         int ret;
614         struct ctdb_context *ctdb = client->ctdb;
615         struct ctdb_daemon_packet_wrap *w;
616
617         CTDB_INCREMENT_STAT(ctdb, total_calls);
618         CTDB_INCREMENT_STAT(ctdb, pending_calls);
619
620         ctdb_db = find_ctdb_db(client->ctdb, c->db_id);
621         if (!ctdb_db) {
622                 DEBUG(DEBUG_ERR, (__location__ " Unknown database in request. db_id==0x%08x",
623                           c->db_id));
624                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
625                 return;
626         }
627
628         if (ctdb_db->unhealthy_reason) {
629                 /*
630                  * this is just a warning, as the tdb should be empty anyway,
631                  * and only persistent databases can be unhealthy, which doesn't
632                  * use this code patch
633                  */
634                 DEBUG(DEBUG_WARNING,("warn: db(%s) unhealty in daemon_request_call_from_client(): %s\n",
635                                      ctdb_db->db_name, ctdb_db->unhealthy_reason));
636         }
637
638         key.dptr = c->data;
639         key.dsize = c->keylen;
640
641         w = talloc(ctdb, struct ctdb_daemon_packet_wrap);
642         CTDB_NO_MEMORY_VOID(ctdb, w);   
643
644         w->ctdb = ctdb;
645         w->client_id = client->client_id;
646
647         ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header, 
648                                            (struct ctdb_req_header *)c, &data,
649                                            daemon_incoming_packet_wrap, w, true);
650         if (ret == -2) {
651                 /* will retry later */
652                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
653                 return;
654         }
655
656         talloc_free(w);
657
658         if (ret != 0) {
659                 DEBUG(DEBUG_ERR,(__location__ " Unable to fetch record\n"));
660                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
661                 return;
662         }
663
664
665         /* check if this fetch request is a duplicate for a
666            request we already have in flight. If so defer it until
667            the first request completes.
668         */
669         if (ctdb->tunable.fetch_collapse == 1) {
670                 if (requeue_duplicate_fetch(ctdb_db, client, key, c) == 0) {
671                         ret = ctdb_ltdb_unlock(ctdb_db, key);
672                         if (ret != 0) {
673                                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
674                         }
675                         CTDB_DECREMENT_STAT(ctdb, pending_calls);
676                         return;
677                 }
678         }
679
680         /* Dont do READONLY if we don't have a tracking database */
681         if ((c->flags & CTDB_WANT_READONLY) && !ctdb_db->readonly) {
682                 c->flags &= ~CTDB_WANT_READONLY;
683         }
684
685         if (header.flags & CTDB_REC_RO_REVOKE_COMPLETE) {
686                 header.flags &= ~CTDB_REC_RO_FLAGS;
687                 CTDB_INCREMENT_STAT(ctdb, total_ro_revokes);
688                 CTDB_INCREMENT_DB_STAT(ctdb_db, db_ro_revokes);
689                 if (ctdb_ltdb_store(ctdb_db, key, &header, data) != 0) {
690                         ctdb_fatal(ctdb, "Failed to write header with cleared REVOKE flag");
691                 }
692                 /* and clear out the tracking data */
693                 if (tdb_delete(ctdb_db->rottdb, key) != 0) {
694                         DEBUG(DEBUG_ERR,(__location__ " Failed to clear out trackingdb record\n"));
695                 }
696         }
697
698         /* if we are revoking, we must defer all other calls until the revoke
699          * had completed.
700          */
701         if (header.flags & CTDB_REC_RO_REVOKING_READONLY) {
702                 talloc_free(data.dptr);
703                 ret = ctdb_ltdb_unlock(ctdb_db, key);
704
705                 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, key, (struct ctdb_req_header *)c, daemon_incoming_packet, client) != 0) {
706                         ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
707                 }
708                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
709                 return;
710         }
711
712         if ((header.dmaster == ctdb->pnn)
713         && (!(c->flags & CTDB_WANT_READONLY))
714         && (header.flags & (CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY)) ) {
715                 header.flags   |= CTDB_REC_RO_REVOKING_READONLY;
716                 if (ctdb_ltdb_store(ctdb_db, key, &header, data) != 0) {
717                         ctdb_fatal(ctdb, "Failed to store record with HAVE_DELEGATIONS set");
718                 }
719                 ret = ctdb_ltdb_unlock(ctdb_db, key);
720
721                 if (ctdb_start_revoke_ro_record(ctdb, ctdb_db, key, &header, data) != 0) {
722                         ctdb_fatal(ctdb, "Failed to start record revoke");
723                 }
724                 talloc_free(data.dptr);
725
726                 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, key, (struct ctdb_req_header *)c, daemon_incoming_packet, client) != 0) {
727                         ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
728                 }
729
730                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
731                 return;
732         }               
733
734         dstate = talloc(client, struct daemon_call_state);
735         if (dstate == NULL) {
736                 ret = ctdb_ltdb_unlock(ctdb_db, key);
737                 if (ret != 0) {
738                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
739                 }
740
741                 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate dstate\n"));
742                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
743                 return;
744         }
745         dstate->start_time = timeval_current();
746         dstate->client = client;
747         dstate->reqid  = c->hdr.reqid;
748         talloc_steal(dstate, data.dptr);
749
750         call = dstate->call = talloc_zero(dstate, struct ctdb_call);
751         if (call == NULL) {
752                 ret = ctdb_ltdb_unlock(ctdb_db, key);
753                 if (ret != 0) {
754                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
755                 }
756
757                 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate call\n"));
758                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
759                 CTDB_UPDATE_LATENCY(ctdb, ctdb_db, "call_from_client 1", call_latency, dstate->start_time);
760                 return;
761         }
762
763         dstate->readonly_fetch = 0;
764         call->call_id = c->callid;
765         call->key = key;
766         call->call_data.dptr = c->data + c->keylen;
767         call->call_data.dsize = c->calldatalen;
768         call->flags = c->flags;
769
770         if (c->flags & CTDB_WANT_READONLY) {
771                 /* client wants readonly record, so translate this into a 
772                    fetch with header. remember what the client asked for
773                    so we can remap the reply back to the proper format for
774                    the client in the reply
775                  */
776                 dstate->client_callid = call->call_id;
777                 call->call_id = CTDB_FETCH_WITH_HEADER_FUNC;
778                 dstate->readonly_fetch = 1;
779         }
780
781         if (header.dmaster == ctdb->pnn) {
782                 state = ctdb_call_local_send(ctdb_db, call, &header, &data);
783         } else {
784                 state = ctdb_daemon_call_send_remote(ctdb_db, call, &header);
785                 if (ctdb->tunable.fetch_collapse == 1) {
786                         /* This request triggered a remote fetch-lock.
787                            set up a deferral for this key so any additional
788                            fetch-locks are deferred until the current one
789                            finishes.
790                          */
791                         setup_deferred_fetch_locks(ctdb_db, call);
792                 }
793         }
794
795         ret = ctdb_ltdb_unlock(ctdb_db, key);
796         if (ret != 0) {
797                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
798         }
799
800         if (state == NULL) {
801                 DEBUG(DEBUG_ERR,(__location__ " Unable to setup call send\n"));
802                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
803                 CTDB_UPDATE_LATENCY(ctdb, ctdb_db, "call_from_client 2", call_latency, dstate->start_time);
804                 return;
805         }
806         talloc_steal(state, dstate);
807         talloc_steal(client, state);
808
809         state->async.fn = daemon_call_from_client_callback;
810         state->async.private_data = dstate;
811 }
812
813
814 static void daemon_request_control_from_client(struct ctdb_client *client, 
815                                                struct ctdb_req_control_old *c);
816
817 /* data contains a packet from the client */
818 static void daemon_incoming_packet(void *p, struct ctdb_req_header *hdr)
819 {
820         struct ctdb_client *client = talloc_get_type(p, struct ctdb_client);
821         TALLOC_CTX *tmp_ctx;
822         struct ctdb_context *ctdb = client->ctdb;
823
824         /* place the packet as a child of a tmp_ctx. We then use
825            talloc_free() below to free it. If any of the calls want
826            to keep it, then they will steal it somewhere else, and the
827            talloc_free() will be a no-op */
828         tmp_ctx = talloc_new(client);
829         talloc_steal(tmp_ctx, hdr);
830
831         if (hdr->ctdb_magic != CTDB_MAGIC) {
832                 ctdb_set_error(client->ctdb, "Non CTDB packet rejected in daemon\n");
833                 goto done;
834         }
835
836         if (hdr->ctdb_version != CTDB_PROTOCOL) {
837                 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
838                 goto done;
839         }
840
841         switch (hdr->operation) {
842         case CTDB_REQ_CALL:
843                 CTDB_INCREMENT_STAT(ctdb, client.req_call);
844                 daemon_request_call_from_client(client, (struct ctdb_req_call_old *)hdr);
845                 break;
846
847         case CTDB_REQ_MESSAGE:
848                 CTDB_INCREMENT_STAT(ctdb, client.req_message);
849                 daemon_request_message_from_client(client, (struct ctdb_req_message_old *)hdr);
850                 break;
851
852         case CTDB_REQ_CONTROL:
853                 CTDB_INCREMENT_STAT(ctdb, client.req_control);
854                 daemon_request_control_from_client(client, (struct ctdb_req_control_old *)hdr);
855                 break;
856
857         default:
858                 DEBUG(DEBUG_CRIT,(__location__ " daemon: unrecognized operation %u\n",
859                          hdr->operation));
860         }
861
862 done:
863         talloc_free(tmp_ctx);
864 }
865
866 /*
867   called when the daemon gets a incoming packet
868  */
869 static void ctdb_daemon_read_cb(uint8_t *data, size_t cnt, void *args)
870 {
871         struct ctdb_client *client = talloc_get_type(args, struct ctdb_client);
872         struct ctdb_req_header *hdr;
873
874         if (cnt == 0) {
875                 talloc_free(client);
876                 return;
877         }
878
879         CTDB_INCREMENT_STAT(client->ctdb, client_packets_recv);
880
881         if (cnt < sizeof(*hdr)) {
882                 ctdb_set_error(client->ctdb, "Bad packet length %u in daemon\n", 
883                                (unsigned)cnt);
884                 return;
885         }
886         hdr = (struct ctdb_req_header *)data;
887         if (cnt != hdr->length) {
888                 ctdb_set_error(client->ctdb, "Bad header length %u expected %u\n in daemon", 
889                                (unsigned)hdr->length, (unsigned)cnt);
890                 return;
891         }
892
893         if (hdr->ctdb_magic != CTDB_MAGIC) {
894                 ctdb_set_error(client->ctdb, "Non CTDB packet rejected\n");
895                 return;
896         }
897
898         if (hdr->ctdb_version != CTDB_PROTOCOL) {
899                 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
900                 return;
901         }
902
903         DEBUG(DEBUG_DEBUG,(__location__ " client request %u of type %u length %u from "
904                  "node %u to %u\n", hdr->reqid, hdr->operation, hdr->length,
905                  hdr->srcnode, hdr->destnode));
906
907         /* it is the responsibility of the incoming packet function to free 'data' */
908         daemon_incoming_packet(client, hdr);
909 }
910
911
912 static int ctdb_clientpid_destructor(struct ctdb_client_pid_list *client_pid)
913 {
914         if (client_pid->ctdb->client_pids != NULL) {
915                 DLIST_REMOVE(client_pid->ctdb->client_pids, client_pid);
916         }
917
918         return 0;
919 }
920
921
922 static void ctdb_accept_client(struct tevent_context *ev,
923                                struct tevent_fd *fde, uint16_t flags,
924                                void *private_data)
925 {
926         struct sockaddr_un addr;
927         socklen_t len;
928         int fd;
929         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
930         struct ctdb_client *client;
931         struct ctdb_client_pid_list *client_pid;
932         pid_t peer_pid = 0;
933
934         memset(&addr, 0, sizeof(addr));
935         len = sizeof(addr);
936         fd = accept(ctdb->daemon.sd, (struct sockaddr *)&addr, &len);
937         if (fd == -1) {
938                 return;
939         }
940
941         set_blocking(fd, false);
942         set_close_on_exec(fd);
943
944         DEBUG(DEBUG_DEBUG,(__location__ " Created SOCKET FD:%d to connected child\n", fd));
945
946         client = talloc_zero(ctdb, struct ctdb_client);
947         if (ctdb_get_peer_pid(fd, &peer_pid) == 0) {
948                 DEBUG(DEBUG_INFO,("Connected client with pid:%u\n", (unsigned)peer_pid));
949         }
950
951         client->ctdb = ctdb;
952         client->fd = fd;
953         client->client_id = reqid_new(ctdb->idr, client);
954         client->pid = peer_pid;
955
956         client_pid = talloc(client, struct ctdb_client_pid_list);
957         if (client_pid == NULL) {
958                 DEBUG(DEBUG_ERR,("Failed to allocate client pid structure\n"));
959                 close(fd);
960                 talloc_free(client);
961                 return;
962         }               
963         client_pid->ctdb   = ctdb;
964         client_pid->pid    = peer_pid;
965         client_pid->client = client;
966
967         DLIST_ADD(ctdb->client_pids, client_pid);
968
969         client->queue = ctdb_queue_setup(ctdb, client, fd, CTDB_DS_ALIGNMENT, 
970                                          ctdb_daemon_read_cb, client,
971                                          "client-%u", client->pid);
972
973         talloc_set_destructor(client, ctdb_client_destructor);
974         talloc_set_destructor(client_pid, ctdb_clientpid_destructor);
975         ctdb->num_clients++;
976 }
977
978
979
980 /*
981   create a unix domain socket and bind it
982   return a file descriptor open on the socket 
983 */
984 static int ux_socket_bind(struct ctdb_context *ctdb)
985 {
986         struct sockaddr_un addr;
987
988         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
989         if (ctdb->daemon.sd == -1) {
990                 return -1;
991         }
992
993         memset(&addr, 0, sizeof(addr));
994         addr.sun_family = AF_UNIX;
995         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path)-1);
996
997         /* First check if an old ctdbd might be running */
998         if (connect(ctdb->daemon.sd,
999                     (struct sockaddr *)&addr, sizeof(addr)) == 0) {
1000                 DEBUG(DEBUG_CRIT,
1001                       ("Something is already listening on ctdb socket '%s'\n",
1002                        ctdb->daemon.name));
1003                 goto failed;
1004         }
1005
1006         /* Remove any old socket */
1007         unlink(ctdb->daemon.name);
1008
1009         set_close_on_exec(ctdb->daemon.sd);
1010         set_blocking(ctdb->daemon.sd, false);
1011
1012         if (bind(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
1013                 DEBUG(DEBUG_CRIT,("Unable to bind on ctdb socket '%s'\n", ctdb->daemon.name));
1014                 goto failed;
1015         }
1016
1017         if (chown(ctdb->daemon.name, geteuid(), getegid()) != 0 ||
1018             chmod(ctdb->daemon.name, 0700) != 0) {
1019                 DEBUG(DEBUG_CRIT,("Unable to secure ctdb socket '%s', ctdb->daemon.name\n", ctdb->daemon.name));
1020                 goto failed;
1021         }
1022
1023
1024         if (listen(ctdb->daemon.sd, 100) != 0) {
1025                 DEBUG(DEBUG_CRIT,("Unable to listen on ctdb socket '%s'\n", ctdb->daemon.name));
1026                 goto failed;
1027         }
1028
1029         return 0;
1030
1031 failed:
1032         close(ctdb->daemon.sd);
1033         ctdb->daemon.sd = -1;
1034         return -1;      
1035 }
1036
1037 static void initialise_node_flags (struct ctdb_context *ctdb)
1038 {
1039         if (ctdb->pnn == -1) {
1040                 ctdb_fatal(ctdb, "PNN is set to -1 (unknown value)");
1041         }
1042
1043         ctdb->nodes[ctdb->pnn]->flags &= ~NODE_FLAGS_DISCONNECTED;
1044
1045         /* do we start out in DISABLED mode? */
1046         if (ctdb->start_as_disabled != 0) {
1047                 DEBUG(DEBUG_NOTICE, ("This node is configured to start in DISABLED state\n"));
1048                 ctdb->nodes[ctdb->pnn]->flags |= NODE_FLAGS_DISABLED;
1049         }
1050         /* do we start out in STOPPED mode? */
1051         if (ctdb->start_as_stopped != 0) {
1052                 DEBUG(DEBUG_NOTICE, ("This node is configured to start in STOPPED state\n"));
1053                 ctdb->nodes[ctdb->pnn]->flags |= NODE_FLAGS_STOPPED;
1054         }
1055 }
1056
1057 static void ctdb_setup_event_callback(struct ctdb_context *ctdb, int status,
1058                                       void *private_data)
1059 {
1060         if (status != 0) {
1061                 ctdb_die(ctdb, "Failed to run setup event");
1062         }
1063         ctdb_run_notification_script(ctdb, "setup");
1064
1065         /* tell all other nodes we've just started up */
1066         ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL,
1067                                  0, CTDB_CONTROL_STARTUP, 0,
1068                                  CTDB_CTRL_FLAG_NOREPLY,
1069                                  tdb_null, NULL, NULL);
1070
1071         /* Start the recovery daemon */
1072         if (ctdb_start_recoverd(ctdb) != 0) {
1073                 DEBUG(DEBUG_ALERT,("Failed to start recovery daemon\n"));
1074                 exit(11);
1075         }
1076
1077         ctdb_start_periodic_events(ctdb);
1078
1079         ctdb_wait_for_first_recovery(ctdb);
1080 }
1081
1082 static struct timeval tevent_before_wait_ts;
1083 static struct timeval tevent_after_wait_ts;
1084
1085 static void ctdb_tevent_trace(enum tevent_trace_point tp,
1086                               void *private_data)
1087 {
1088         struct timeval diff;
1089         struct timeval now;
1090         struct ctdb_context *ctdb =
1091                 talloc_get_type(private_data, struct ctdb_context);
1092
1093         if (getpid() != ctdb->ctdbd_pid) {
1094                 return;
1095         }
1096
1097         now = timeval_current();
1098
1099         switch (tp) {
1100         case TEVENT_TRACE_BEFORE_WAIT:
1101                 if (!timeval_is_zero(&tevent_after_wait_ts)) {
1102                         diff = timeval_until(&tevent_after_wait_ts, &now);
1103                         if (diff.tv_sec > 3) {
1104                                 DEBUG(DEBUG_ERR,
1105                                       ("Handling event took %ld seconds!\n",
1106                                        (long)diff.tv_sec));
1107                         }
1108                 }
1109                 tevent_before_wait_ts = now;
1110                 break;
1111
1112         case TEVENT_TRACE_AFTER_WAIT:
1113                 if (!timeval_is_zero(&tevent_before_wait_ts)) {
1114                         diff = timeval_until(&tevent_before_wait_ts, &now);
1115                         if (diff.tv_sec > 3) {
1116                                 DEBUG(DEBUG_CRIT,
1117                                       ("No event for %ld seconds!\n",
1118                                        (long)diff.tv_sec));
1119                         }
1120                 }
1121                 tevent_after_wait_ts = now;
1122                 break;
1123
1124         default:
1125                 /* Do nothing for future tevent trace points */ ;
1126         }
1127 }
1128
1129 static void ctdb_remove_pidfile(void)
1130 {
1131         /* Only the main ctdbd's PID matches the SID */
1132         if (ctdbd_pidfile != NULL && getsid(0) == getpid()) {
1133                 if (unlink(ctdbd_pidfile) == 0) {
1134                         DEBUG(DEBUG_NOTICE, ("Removed PID file %s\n",
1135                                              ctdbd_pidfile));
1136                 } else {
1137                         DEBUG(DEBUG_WARNING, ("Failed to Remove PID file %s\n",
1138                                               ctdbd_pidfile));
1139                 }
1140         }
1141 }
1142
1143 static void ctdb_create_pidfile(pid_t pid)
1144 {
1145         if (ctdbd_pidfile != NULL) {
1146                 FILE *fp;
1147
1148                 fp = fopen(ctdbd_pidfile, "w");
1149                 if (fp == NULL) {
1150                         DEBUG(DEBUG_ALERT,
1151                               ("Failed to open PID file %s\n", ctdbd_pidfile));
1152                         exit(11);
1153                 }
1154
1155                 fprintf(fp, "%d\n", pid);
1156                 fclose(fp);
1157                 DEBUG(DEBUG_NOTICE, ("Created PID file %s\n", ctdbd_pidfile));
1158                 atexit(ctdb_remove_pidfile);
1159         }
1160 }
1161
1162 static void ctdb_initialise_vnn_map(struct ctdb_context *ctdb)
1163 {
1164         int i, j, count;
1165
1166         /* initialize the vnn mapping table, skipping any deleted nodes */
1167         ctdb->vnn_map = talloc(ctdb, struct ctdb_vnn_map);
1168         CTDB_NO_MEMORY_FATAL(ctdb, ctdb->vnn_map);
1169
1170         count = 0;
1171         for (i = 0; i < ctdb->num_nodes; i++) {
1172                 if ((ctdb->nodes[i]->flags & NODE_FLAGS_DELETED) == 0) {
1173                         count++;
1174                 }
1175         }
1176
1177         ctdb->vnn_map->generation = INVALID_GENERATION;
1178         ctdb->vnn_map->size = count;
1179         ctdb->vnn_map->map = talloc_array(ctdb->vnn_map, uint32_t, ctdb->vnn_map->size);
1180         CTDB_NO_MEMORY_FATAL(ctdb, ctdb->vnn_map->map);
1181
1182         for(i=0, j=0; i < ctdb->vnn_map->size; i++) {
1183                 if (ctdb->nodes[i]->flags & NODE_FLAGS_DELETED) {
1184                         continue;
1185                 }
1186                 ctdb->vnn_map->map[j] = i;
1187                 j++;
1188         }
1189 }
1190
1191 static void ctdb_set_my_pnn(struct ctdb_context *ctdb)
1192 {
1193         int nodeid;
1194
1195         if (ctdb->address == NULL) {
1196                 ctdb_fatal(ctdb,
1197                            "Can not determine PNN - node address is not set\n");
1198         }
1199
1200         nodeid = ctdb_ip_to_nodeid(ctdb, ctdb->address);
1201         if (nodeid == -1) {
1202                 ctdb_fatal(ctdb,
1203                            "Can not determine PNN - node address not found in node list\n");
1204         }
1205
1206         ctdb->pnn = ctdb->nodes[nodeid]->pnn;
1207         DEBUG(DEBUG_NOTICE, ("PNN is %u\n", ctdb->pnn));
1208 }
1209
1210 /*
1211   start the protocol going as a daemon
1212 */
1213 int ctdb_start_daemon(struct ctdb_context *ctdb, bool do_fork)
1214 {
1215         int res, ret = -1;
1216         struct tevent_fd *fde;
1217
1218         /* create a unix domain stream socket to listen to */
1219         res = ux_socket_bind(ctdb);
1220         if (res!=0) {
1221                 DEBUG(DEBUG_ALERT,("Cannot continue.  Exiting!\n"));
1222                 exit(10);
1223         }
1224
1225         if (do_fork && fork()) {
1226                 return 0;
1227         }
1228
1229         tdb_reopen_all(false);
1230
1231         if (do_fork) {
1232                 if (setsid() == -1) {
1233                         ctdb_die(ctdb, "Failed to setsid()\n");
1234                 }
1235                 close(0);
1236                 if (open("/dev/null", O_RDONLY) != 0) {
1237                         DEBUG(DEBUG_ALERT,(__location__ " Failed to setup stdin on /dev/null\n"));
1238                         exit(11);
1239                 }
1240         }
1241         ignore_signal(SIGPIPE);
1242         ignore_signal(SIGUSR1);
1243
1244         ctdb->ctdbd_pid = getpid();
1245         DEBUG(DEBUG_ERR, ("Starting CTDBD (Version %s) as PID: %u\n",
1246                           CTDB_VERSION_STRING, ctdb->ctdbd_pid));
1247         ctdb_create_pidfile(ctdb->ctdbd_pid);
1248
1249         /* Make sure we log something when the daemon terminates.
1250          * This must be the first exit handler to run (so the last to
1251          * be registered.
1252          */
1253         atexit(print_exit_message);
1254
1255         if (ctdb->do_setsched) {
1256                 /* try to set us up as realtime */
1257                 if (!set_scheduler()) {
1258                         exit(1);
1259                 }
1260                 DEBUG(DEBUG_NOTICE, ("Set real-time scheduler priority\n"));
1261         }
1262
1263         ctdb->ev = tevent_context_init(NULL);
1264         tevent_loop_allow_nesting(ctdb->ev);
1265         tevent_set_trace_callback(ctdb->ev, ctdb_tevent_trace, ctdb);
1266         ret = ctdb_init_tevent_logging(ctdb);
1267         if (ret != 0) {
1268                 DEBUG(DEBUG_ALERT,("Failed to initialize TEVENT logging\n"));
1269                 exit(1);
1270         }
1271
1272         /* set up a handler to pick up sigchld */
1273         if (ctdb_init_sigchld(ctdb) == NULL) {
1274                 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD\n"));
1275                 exit(1);
1276         }
1277
1278         ctdb_set_child_logging(ctdb);
1279
1280         TALLOC_FREE(ctdb->srv);
1281         if (srvid_init(ctdb, &ctdb->srv) != 0) {
1282                 DEBUG(DEBUG_CRIT,("Failed to setup message srvid context\n"));
1283                 exit(1);
1284         }
1285
1286         /* initialize statistics collection */
1287         ctdb_statistics_init(ctdb);
1288
1289         /* force initial recovery for election */
1290         ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
1291
1292         ctdb_set_runstate(ctdb, CTDB_RUNSTATE_INIT);
1293         ret = ctdb_event_script(ctdb, CTDB_EVENT_INIT);
1294         if (ret != 0) {
1295                 ctdb_die(ctdb, "Failed to run init event\n");
1296         }
1297         ctdb_run_notification_script(ctdb, "init");
1298
1299         if (strcmp(ctdb->transport, "tcp") == 0) {
1300                 ret = ctdb_tcp_init(ctdb);
1301         }
1302 #ifdef USE_INFINIBAND
1303         if (strcmp(ctdb->transport, "ib") == 0) {
1304                 ret = ctdb_ibw_init(ctdb);
1305         }
1306 #endif
1307         if (ret != 0) {
1308                 DEBUG(DEBUG_ERR,("Failed to initialise transport '%s'\n", ctdb->transport));
1309                 return -1;
1310         }
1311
1312         if (ctdb->methods == NULL) {
1313                 DEBUG(DEBUG_ALERT,(__location__ " Can not initialize transport. ctdb->methods is NULL\n"));
1314                 ctdb_fatal(ctdb, "transport is unavailable. can not initialize.");
1315         }
1316
1317         /* Initialise the transport.  This sets the node address if it
1318          * was not set via the command-line. */
1319         if (ctdb->methods->initialise(ctdb) != 0) {
1320                 ctdb_fatal(ctdb, "transport failed to initialise");
1321         }
1322
1323         ctdb_set_my_pnn(ctdb);
1324
1325         initialise_node_flags(ctdb);
1326
1327         if (ctdb->public_addresses_file) {
1328                 ret = ctdb_set_public_addresses(ctdb, true);
1329                 if (ret == -1) {
1330                         DEBUG(DEBUG_ALERT,("Unable to setup public address list\n"));
1331                         exit(1);
1332                 }
1333         }
1334
1335         ctdb_initialise_vnn_map(ctdb);
1336
1337         /* attach to existing databases */
1338         if (ctdb_attach_databases(ctdb) != 0) {
1339                 ctdb_fatal(ctdb, "Failed to attach to databases\n");
1340         }
1341
1342         /* start frozen, then let the first election sort things out */
1343         if (!ctdb_blocking_freeze(ctdb)) {
1344                 ctdb_fatal(ctdb, "Failed to get initial freeze\n");
1345         }
1346
1347         /* now start accepting clients, only can do this once frozen */
1348         fde = tevent_add_fd(ctdb->ev, ctdb, ctdb->daemon.sd, TEVENT_FD_READ,
1349                             ctdb_accept_client, ctdb);
1350         if (fde == NULL) {
1351                 ctdb_fatal(ctdb, "Failed to add daemon socket to event loop");
1352         }
1353         tevent_fd_set_auto_close(fde);
1354
1355         /* Start the transport */
1356         if (ctdb->methods->start(ctdb) != 0) {
1357                 DEBUG(DEBUG_ALERT,("transport failed to start!\n"));
1358                 ctdb_fatal(ctdb, "transport failed to start");
1359         }
1360
1361         /* Recovery daemon and timed events are started from the
1362          * callback, only after the setup event completes
1363          * successfully.
1364          */
1365         ctdb_set_runstate(ctdb, CTDB_RUNSTATE_SETUP);
1366         ret = ctdb_event_script_callback(ctdb,
1367                                          ctdb,
1368                                          ctdb_setup_event_callback,
1369                                          ctdb,
1370                                          CTDB_EVENT_SETUP,
1371                                          "%s",
1372                                          "");
1373         if (ret != 0) {
1374                 DEBUG(DEBUG_CRIT,("Failed to set up 'setup' event\n"));
1375                 exit(1);
1376         }
1377
1378         lockdown_memory(ctdb->valgrinding);
1379
1380         /* go into a wait loop to allow other nodes to complete */
1381         tevent_loop_wait(ctdb->ev);
1382
1383         DEBUG(DEBUG_CRIT,("event_loop_wait() returned. this should not happen\n"));
1384         exit(1);
1385 }
1386
1387 /*
1388   allocate a packet for use in daemon<->daemon communication
1389  */
1390 struct ctdb_req_header *_ctdb_transport_allocate(struct ctdb_context *ctdb,
1391                                                  TALLOC_CTX *mem_ctx, 
1392                                                  enum ctdb_operation operation, 
1393                                                  size_t length, size_t slength,
1394                                                  const char *type)
1395 {
1396         int size;
1397         struct ctdb_req_header *hdr;
1398
1399         length = MAX(length, slength);
1400         size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
1401
1402         if (ctdb->methods == NULL) {
1403                 DEBUG(DEBUG_INFO,(__location__ " Unable to allocate transport packet for operation %u of length %u. Transport is DOWN.\n",
1404                          operation, (unsigned)length));
1405                 return NULL;
1406         }
1407
1408         hdr = (struct ctdb_req_header *)ctdb->methods->allocate_pkt(mem_ctx, size);
1409         if (hdr == NULL) {
1410                 DEBUG(DEBUG_ERR,("Unable to allocate transport packet for operation %u of length %u\n",
1411                          operation, (unsigned)length));
1412                 return NULL;
1413         }
1414         talloc_set_name_const(hdr, type);
1415         memset(hdr, 0, slength);
1416         hdr->length       = length;
1417         hdr->operation    = operation;
1418         hdr->ctdb_magic   = CTDB_MAGIC;
1419         hdr->ctdb_version = CTDB_PROTOCOL;
1420         hdr->generation   = ctdb->vnn_map->generation;
1421         hdr->srcnode      = ctdb->pnn;
1422
1423         return hdr;     
1424 }
1425
1426 struct daemon_control_state {
1427         struct daemon_control_state *next, *prev;
1428         struct ctdb_client *client;
1429         struct ctdb_req_control_old *c;
1430         uint32_t reqid;
1431         struct ctdb_node *node;
1432 };
1433
1434 /*
1435   callback when a control reply comes in
1436  */
1437 static void daemon_control_callback(struct ctdb_context *ctdb,
1438                                     int32_t status, TDB_DATA data, 
1439                                     const char *errormsg,
1440                                     void *private_data)
1441 {
1442         struct daemon_control_state *state = talloc_get_type(private_data, 
1443                                                              struct daemon_control_state);
1444         struct ctdb_client *client = state->client;
1445         struct ctdb_reply_control_old *r;
1446         size_t len;
1447         int ret;
1448
1449         /* construct a message to send to the client containing the data */
1450         len = offsetof(struct ctdb_reply_control_old, data) + data.dsize;
1451         if (errormsg) {
1452                 len += strlen(errormsg);
1453         }
1454         r = ctdbd_allocate_pkt(ctdb, state, CTDB_REPLY_CONTROL, len, 
1455                                struct ctdb_reply_control_old);
1456         CTDB_NO_MEMORY_VOID(ctdb, r);
1457
1458         r->hdr.reqid     = state->reqid;
1459         r->status        = status;
1460         r->datalen       = data.dsize;
1461         r->errorlen = 0;
1462         memcpy(&r->data[0], data.dptr, data.dsize);
1463         if (errormsg) {
1464                 r->errorlen = strlen(errormsg);
1465                 memcpy(&r->data[r->datalen], errormsg, r->errorlen);
1466         }
1467
1468         ret = daemon_queue_send(client, &r->hdr);
1469         if (ret != -1) {
1470                 talloc_free(state);
1471         }
1472 }
1473
1474 /*
1475   fail all pending controls to a disconnected node
1476  */
1477 void ctdb_daemon_cancel_controls(struct ctdb_context *ctdb, struct ctdb_node *node)
1478 {
1479         struct daemon_control_state *state;
1480         while ((state = node->pending_controls)) {
1481                 DLIST_REMOVE(node->pending_controls, state);
1482                 daemon_control_callback(ctdb, (uint32_t)-1, tdb_null, 
1483                                         "node is disconnected", state);
1484         }
1485 }
1486
1487 /*
1488   destroy a daemon_control_state
1489  */
1490 static int daemon_control_destructor(struct daemon_control_state *state)
1491 {
1492         if (state->node) {
1493                 DLIST_REMOVE(state->node->pending_controls, state);
1494         }
1495         return 0;
1496 }
1497
1498 /*
1499   this is called when the ctdb daemon received a ctdb request control
1500   from a local client over the unix domain socket
1501  */
1502 static void daemon_request_control_from_client(struct ctdb_client *client, 
1503                                                struct ctdb_req_control_old *c)
1504 {
1505         TDB_DATA data;
1506         int res;
1507         struct daemon_control_state *state;
1508         TALLOC_CTX *tmp_ctx = talloc_new(client);
1509
1510         if (c->hdr.destnode == CTDB_CURRENT_NODE) {
1511                 c->hdr.destnode = client->ctdb->pnn;
1512         }
1513
1514         state = talloc(client, struct daemon_control_state);
1515         CTDB_NO_MEMORY_VOID(client->ctdb, state);
1516
1517         state->client = client;
1518         state->c = talloc_steal(state, c);
1519         state->reqid = c->hdr.reqid;
1520         if (ctdb_validate_pnn(client->ctdb, c->hdr.destnode)) {
1521                 state->node = client->ctdb->nodes[c->hdr.destnode];
1522                 DLIST_ADD(state->node->pending_controls, state);
1523         } else {
1524                 state->node = NULL;
1525         }
1526
1527         talloc_set_destructor(state, daemon_control_destructor);
1528
1529         if (c->flags & CTDB_CTRL_FLAG_NOREPLY) {
1530                 talloc_steal(tmp_ctx, state);
1531         }
1532         
1533         data.dptr = &c->data[0];
1534         data.dsize = c->datalen;
1535         res = ctdb_daemon_send_control(client->ctdb, c->hdr.destnode,
1536                                        c->srvid, c->opcode, client->client_id,
1537                                        c->flags,
1538                                        data, daemon_control_callback,
1539                                        state);
1540         if (res != 0) {
1541                 DEBUG(DEBUG_ERR,(__location__ " Failed to send control to remote node %u\n",
1542                          c->hdr.destnode));
1543         }
1544
1545         talloc_free(tmp_ctx);
1546 }
1547
1548 /*
1549   register a call function
1550 */
1551 int ctdb_daemon_set_call(struct ctdb_context *ctdb, uint32_t db_id,
1552                          ctdb_fn_t fn, int id)
1553 {
1554         struct ctdb_registered_call *call;
1555         struct ctdb_db_context *ctdb_db;
1556
1557         ctdb_db = find_ctdb_db(ctdb, db_id);
1558         if (ctdb_db == NULL) {
1559                 return -1;
1560         }
1561
1562         call = talloc(ctdb_db, struct ctdb_registered_call);
1563         call->fn = fn;
1564         call->id = id;
1565
1566         DLIST_ADD(ctdb_db->calls, call);        
1567         return 0;
1568 }
1569
1570
1571
1572 /*
1573   this local messaging handler is ugly, but is needed to prevent
1574   recursion in ctdb_send_message() when the destination node is the
1575   same as the source node
1576  */
1577 struct ctdb_local_message {
1578         struct ctdb_context *ctdb;
1579         uint64_t srvid;
1580         TDB_DATA data;
1581 };
1582
1583 static void ctdb_local_message_trigger(struct tevent_context *ev,
1584                                        struct tevent_timer *te,
1585                                        struct timeval t, void *private_data)
1586 {
1587         struct ctdb_local_message *m = talloc_get_type(
1588                 private_data, struct ctdb_local_message);
1589
1590         srvid_dispatch(m->ctdb->srv, m->srvid, CTDB_SRVID_ALL, m->data);
1591         talloc_free(m);
1592 }
1593
1594 static int ctdb_local_message(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data)
1595 {
1596         struct ctdb_local_message *m;
1597         m = talloc(ctdb, struct ctdb_local_message);
1598         CTDB_NO_MEMORY(ctdb, m);
1599
1600         m->ctdb = ctdb;
1601         m->srvid = srvid;
1602         m->data  = data;
1603         m->data.dptr = talloc_memdup(m, m->data.dptr, m->data.dsize);
1604         if (m->data.dptr == NULL) {
1605                 talloc_free(m);
1606                 return -1;
1607         }
1608
1609         /* this needs to be done as an event to prevent recursion */
1610         tevent_add_timer(ctdb->ev, m, timeval_zero(),
1611                          ctdb_local_message_trigger, m);
1612         return 0;
1613 }
1614
1615 /*
1616   send a ctdb message
1617 */
1618 int ctdb_daemon_send_message(struct ctdb_context *ctdb, uint32_t pnn,
1619                              uint64_t srvid, TDB_DATA data)
1620 {
1621         struct ctdb_req_message_old *r;
1622         int len;
1623
1624         if (ctdb->methods == NULL) {
1625                 DEBUG(DEBUG_INFO,(__location__ " Failed to send message. Transport is DOWN\n"));
1626                 return -1;
1627         }
1628
1629         /* see if this is a message to ourselves */
1630         if (pnn == ctdb->pnn) {
1631                 return ctdb_local_message(ctdb, srvid, data);
1632         }
1633
1634         len = offsetof(struct ctdb_req_message_old, data) + data.dsize;
1635         r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_MESSAGE, len,
1636                                     struct ctdb_req_message_old);
1637         CTDB_NO_MEMORY(ctdb, r);
1638
1639         r->hdr.destnode  = pnn;
1640         r->srvid         = srvid;
1641         r->datalen       = data.dsize;
1642         memcpy(&r->data[0], data.dptr, data.dsize);
1643
1644         ctdb_queue_packet(ctdb, &r->hdr);
1645
1646         talloc_free(r);
1647         return 0;
1648 }
1649
1650
1651
1652 struct ctdb_client_notify_list {
1653         struct ctdb_client_notify_list *next, *prev;
1654         struct ctdb_context *ctdb;
1655         uint64_t srvid;
1656         TDB_DATA data;
1657 };
1658
1659
1660 static int ctdb_client_notify_destructor(struct ctdb_client_notify_list *nl)
1661 {
1662         int ret;
1663
1664         DEBUG(DEBUG_ERR,("Sending client notify message for srvid:%llu\n", (unsigned long long)nl->srvid));
1665
1666         ret = ctdb_daemon_send_message(nl->ctdb, CTDB_BROADCAST_CONNECTED, (unsigned long long)nl->srvid, nl->data);
1667         if (ret != 0) {
1668                 DEBUG(DEBUG_ERR,("Failed to send client notify message\n"));
1669         }
1670
1671         return 0;
1672 }
1673
1674 int32_t ctdb_control_register_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1675 {
1676         struct ctdb_notify_data_old *notify = (struct ctdb_notify_data_old *)indata.dptr;
1677         struct ctdb_client *client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
1678         struct ctdb_client_notify_list *nl;
1679
1680         DEBUG(DEBUG_INFO,("Register srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
1681
1682         if (indata.dsize < offsetof(struct ctdb_notify_data_old, notify_data)) {
1683                 DEBUG(DEBUG_ERR,(__location__ " Too little data in control : %d\n", (int)indata.dsize));
1684                 return -1;
1685         }
1686
1687         if (indata.dsize != (notify->len + offsetof(struct ctdb_notify_data_old, notify_data))) {
1688                 DEBUG(DEBUG_ERR,(__location__ " Wrong amount of data in control. Got %d, expected %d\n", (int)indata.dsize, (int)(notify->len + offsetof(struct ctdb_notify_data_old, notify_data))));
1689                 return -1;
1690         }
1691
1692
1693         if (client == NULL) {
1694                 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1695                 return -1;
1696         }
1697
1698         for(nl=client->notify; nl; nl=nl->next) {
1699                 if (nl->srvid == notify->srvid) {
1700                         break;
1701                 }
1702         }
1703         if (nl != NULL) {
1704                 DEBUG(DEBUG_ERR,(__location__ " Notification for srvid:%llu already exists for this client\n", (unsigned long long)notify->srvid));
1705                 return -1;
1706         }
1707
1708         nl = talloc(client, struct ctdb_client_notify_list);
1709         CTDB_NO_MEMORY(ctdb, nl);
1710         nl->ctdb       = ctdb;
1711         nl->srvid      = notify->srvid;
1712         nl->data.dsize = notify->len;
1713         nl->data.dptr  = talloc_memdup(nl, notify->notify_data,
1714                                        nl->data.dsize);
1715         CTDB_NO_MEMORY(ctdb, nl->data.dptr);
1716         
1717         DLIST_ADD(client->notify, nl);
1718         talloc_set_destructor(nl, ctdb_client_notify_destructor);
1719
1720         return 0;
1721 }
1722
1723 int32_t ctdb_control_deregister_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1724 {
1725         uint64_t srvid = *(uint64_t *)indata.dptr;
1726         struct ctdb_client *client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
1727         struct ctdb_client_notify_list *nl;
1728
1729         DEBUG(DEBUG_INFO,("Deregister srvid %llu for client %d\n", (unsigned long long)srvid, client_id));
1730
1731         if (client == NULL) {
1732                 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1733                 return -1;
1734         }
1735
1736         for(nl=client->notify; nl; nl=nl->next) {
1737                 if (nl->srvid == srvid) {
1738                         break;
1739                 }
1740         }
1741         if (nl == NULL) {
1742                 DEBUG(DEBUG_ERR,(__location__ " No notification for srvid:%llu found for this client\n", (unsigned long long)srvid));
1743                 return -1;
1744         }
1745
1746         DLIST_REMOVE(client->notify, nl);
1747         talloc_set_destructor(nl, NULL);
1748         talloc_free(nl);
1749
1750         return 0;
1751 }
1752
1753 struct ctdb_client *ctdb_find_client_by_pid(struct ctdb_context *ctdb, pid_t pid)
1754 {
1755         struct ctdb_client_pid_list *client_pid;
1756
1757         for (client_pid = ctdb->client_pids; client_pid; client_pid=client_pid->next) {
1758                 if (client_pid->pid == pid) {
1759                         return client_pid->client;
1760                 }
1761         }
1762         return NULL;
1763 }
1764
1765
1766 /* This control is used by samba when probing if a process (of a samba daemon)
1767    exists on the node.
1768    Samba does this when it needs/wants to check if a subrecord in one of the
1769    databases is still valied, or if it is stale and can be removed.
1770    If the node is in unhealthy or stopped state we just kill of the samba
1771    process holding htis sub-record and return to the calling samba that
1772    the process does not exist.
1773    This allows us to forcefully recall subrecords registered by samba processes
1774    on banned and stopped nodes.
1775 */
1776 int32_t ctdb_control_process_exists(struct ctdb_context *ctdb, pid_t pid)
1777 {
1778         struct ctdb_client *client;
1779
1780         if (ctdb->nodes[ctdb->pnn]->flags & (NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
1781                 client = ctdb_find_client_by_pid(ctdb, pid);
1782                 if (client != NULL) {
1783                         DEBUG(DEBUG_NOTICE,(__location__ " Killing client with pid:%d on banned/stopped node\n", (int)pid));
1784                         talloc_free(client);
1785                 }
1786                 return -1;
1787         }
1788
1789         return kill(pid, 0);
1790 }
1791
1792 int ctdb_control_getnodesfile(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
1793 {
1794         struct ctdb_node_map_old *node_map = NULL;
1795
1796         CHECK_CONTROL_DATA_SIZE(0);
1797
1798         node_map = ctdb_read_nodes_file(ctdb, ctdb->nodes_file);
1799         if (node_map == NULL) {
1800                 DEBUG(DEBUG_ERR, ("Failed to read nodes file\n"));
1801                 return -1;
1802         }
1803
1804         outdata->dptr  = (unsigned char *)node_map;
1805         outdata->dsize = talloc_get_size(outdata->dptr);
1806
1807         return 0;
1808 }
1809
1810 void ctdb_shutdown_sequence(struct ctdb_context *ctdb, int exit_code)
1811 {
1812         if (ctdb->runstate == CTDB_RUNSTATE_SHUTDOWN) {
1813                 DEBUG(DEBUG_NOTICE,("Already shutting down so will not proceed.\n"));
1814                 return;
1815         }
1816
1817         DEBUG(DEBUG_NOTICE,("Shutdown sequence commencing.\n"));
1818         ctdb_set_runstate(ctdb, CTDB_RUNSTATE_SHUTDOWN);
1819         ctdb_stop_recoverd(ctdb);
1820         ctdb_stop_keepalive(ctdb);
1821         ctdb_stop_monitoring(ctdb);
1822         ctdb_release_all_ips(ctdb);
1823         ctdb_event_script(ctdb, CTDB_EVENT_SHUTDOWN);
1824         if (ctdb->methods != NULL) {
1825                 ctdb->methods->shutdown(ctdb);
1826         }
1827
1828         DEBUG(DEBUG_NOTICE,("Shutdown sequence complete, exiting.\n"));
1829         exit(exit_code);
1830 }
1831
1832 /* When forking the main daemon and the child process needs to connect
1833  * back to the daemon as a client process, this function can be used
1834  * to change the ctdb context from daemon into client mode.  The child
1835  * process must be created using ctdb_fork() and not fork() -
1836  * ctdb_fork() does some necessary housekeeping.
1837  */
1838 int switch_from_server_to_client(struct ctdb_context *ctdb, const char *fmt, ...)
1839 {
1840         int ret;
1841         va_list ap;
1842
1843         /* Add extra information so we can identify this in the logs */
1844         va_start(ap, fmt);
1845         debug_extra = talloc_strdup_append(talloc_vasprintf(NULL, fmt, ap), ":");
1846         va_end(ap);
1847
1848         /* get a new event context */
1849         ctdb->ev = tevent_context_init(ctdb);
1850         tevent_loop_allow_nesting(ctdb->ev);
1851
1852         /* Connect to main CTDB daemon */
1853         ret = ctdb_socket_connect(ctdb);
1854         if (ret != 0) {
1855                 DEBUG(DEBUG_ALERT, (__location__ " Failed to init ctdb client\n"));
1856                 return -1;
1857         }
1858
1859         ctdb->can_send_controls = true;
1860
1861         return 0;
1862 }