ctdb-daemon: Don't index by PNN when initialising node flags
[samba.git] / ctdb / server / ctdb_daemon.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2006
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "replace.h"
21 #include "system/network.h"
22 #include "system/filesys.h"
23 #include "system/wait.h"
24 #include "system/time.h"
25
26 #include <talloc.h>
27 /* Allow use of deprecated function tevent_loop_allow_nesting() */
28 #define TEVENT_DEPRECATED
29 #include <tevent.h>
30 #include <tdb.h>
31
32 #include "lib/tdb_wrap/tdb_wrap.h"
33 #include "lib/util/dlinklist.h"
34 #include "lib/util/debug.h"
35 #include "lib/util/time.h"
36 #include "lib/util/blocking.h"
37 #include "lib/util/become_daemon.h"
38
39 #include "version.h"
40 #include "ctdb_private.h"
41 #include "ctdb_client.h"
42
43 #include "common/rb_tree.h"
44 #include "common/reqid.h"
45 #include "common/system.h"
46 #include "common/common.h"
47 #include "common/logging.h"
48 #include "common/pidfile.h"
49 #include "common/sock_io.h"
50
51 struct ctdb_client_pid_list {
52         struct ctdb_client_pid_list *next, *prev;
53         struct ctdb_context *ctdb;
54         pid_t pid;
55         struct ctdb_client *client;
56 };
57
58 const char *ctdbd_pidfile = NULL;
59 static struct pidfile_context *ctdbd_pidfile_ctx = NULL;
60
61 static void daemon_incoming_packet(void *, struct ctdb_req_header *);
62
63 static pid_t __ctdbd_pid;
64
65 static void print_exit_message(void)
66 {
67         if (getpid() == __ctdbd_pid) {
68                 DEBUG(DEBUG_NOTICE,("CTDB daemon shutting down\n"));
69
70                 /* Wait a second to allow pending log messages to be flushed */
71                 sleep(1);
72         }
73 }
74
75 #ifdef HAVE_GETRUSAGE
76
77 struct cpu_check_threshold_data {
78         unsigned short percent;
79         struct timeval timeofday;
80         struct timeval ru_time;
81 };
82
83 static void ctdb_cpu_check_threshold(struct tevent_context *ev,
84                                      struct tevent_timer *te,
85                                      struct timeval tv,
86                                      void *private_data)
87 {
88         struct ctdb_context *ctdb = talloc_get_type_abort(
89                 private_data, struct ctdb_context);
90         uint32_t interval = 60;
91
92         static unsigned short threshold = 0;
93         static struct cpu_check_threshold_data prev = {
94                 .percent = 0,
95                 .timeofday = { .tv_sec = 0 },
96                 .ru_time = { .tv_sec = 0 },
97         };
98
99         struct rusage usage;
100         struct cpu_check_threshold_data curr = {
101                 .percent = 0,
102         };
103         int64_t ru_time_diff, timeofday_diff;
104         bool first;
105         int ret;
106
107         /*
108          * Cache the threshold so that we don't waste time checking
109          * the environment variable every time
110          */
111         if (threshold == 0) {
112                 const char *t;
113
114                 threshold = 90;
115
116                 t = getenv("CTDB_TEST_CPU_USAGE_THRESHOLD");
117                 if (t != NULL) {
118                         int th;
119
120                         th = atoi(t);
121                         if (th <= 0 || th > 100) {
122                                 DBG_WARNING("Failed to parse env var: %s\n", t);
123                         } else {
124                                 threshold = th;
125                         }
126                 }
127         }
128
129         ret = getrusage(RUSAGE_SELF, &usage);
130         if (ret != 0) {
131                 DBG_WARNING("rusage() failed: %d\n", ret);
132                 goto next;
133         }
134
135         /* Sum the system and user CPU usage */
136         curr.ru_time = timeval_sum(&usage.ru_utime, &usage.ru_stime);
137
138         curr.timeofday = tv;
139
140         first = timeval_is_zero(&prev.timeofday);
141         if (first) {
142                 /* No previous values recorded so no calculation to do */
143                 goto done;
144         }
145
146         timeofday_diff = usec_time_diff(&curr.timeofday, &prev.timeofday);
147         if (timeofday_diff <= 0) {
148                 /*
149                  * Time went backwards or didn't progress so no (sane)
150                  * calculation can be done
151                  */
152                 goto done;
153         }
154
155         ru_time_diff = usec_time_diff(&curr.ru_time, &prev.ru_time);
156
157         curr.percent = ru_time_diff * 100 / timeofday_diff;
158
159         if (curr.percent >= threshold) {
160                 /* Log only if the utilisation changes */
161                 if (curr.percent != prev.percent) {
162                         D_WARNING("WARNING: CPU utilisation %hu%% >= "
163                                   "threshold (%hu%%)\n",
164                                   curr.percent,
165                                   threshold);
166                 }
167         } else {
168                 /* Log if the utilisation falls below the threshold */
169                 if (prev.percent >= threshold) {
170                         D_WARNING("WARNING: CPU utilisation %hu%% < "
171                                   "threshold (%hu%%)\n",
172                                   curr.percent,
173                                   threshold);
174                 }
175         }
176
177 done:
178         prev = curr;
179
180 next:
181         tevent_add_timer(ctdb->ev, ctdb,
182                          timeval_current_ofs(interval, 0),
183                          ctdb_cpu_check_threshold,
184                          ctdb);
185 }
186
187 static void ctdb_start_cpu_check_threshold(struct ctdb_context *ctdb)
188 {
189         tevent_add_timer(ctdb->ev, ctdb,
190                          timeval_current(),
191                          ctdb_cpu_check_threshold,
192                          ctdb);
193 }
194 #endif /* HAVE_GETRUSAGE */
195
196 static void ctdb_time_tick(struct tevent_context *ev, struct tevent_timer *te,
197                                   struct timeval t, void *private_data)
198 {
199         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
200
201         if (getpid() != ctdb->ctdbd_pid) {
202                 return;
203         }
204
205         tevent_add_timer(ctdb->ev, ctdb,
206                          timeval_current_ofs(1, 0),
207                          ctdb_time_tick, ctdb);
208 }
209
210 /* Used to trigger a dummy event once per second, to make
211  * detection of hangs more reliable.
212  */
213 static void ctdb_start_time_tickd(struct ctdb_context *ctdb)
214 {
215         tevent_add_timer(ctdb->ev, ctdb,
216                          timeval_current_ofs(1, 0),
217                          ctdb_time_tick, ctdb);
218 }
219
220 static void ctdb_start_periodic_events(struct ctdb_context *ctdb)
221 {
222         /* start monitoring for connected/disconnected nodes */
223         ctdb_start_keepalive(ctdb);
224
225         /* start periodic update of tcp tickle lists */
226         ctdb_start_tcp_tickle_update(ctdb);
227
228         /* start listening for recovery daemon pings */
229         ctdb_control_recd_ping(ctdb);
230
231         /* start listening to timer ticks */
232         ctdb_start_time_tickd(ctdb);
233
234 #ifdef HAVE_GETRUSAGE
235         ctdb_start_cpu_check_threshold(ctdb);
236 #endif /* HAVE_GETRUSAGE */
237 }
238
239 static void ignore_signal(int signum)
240 {
241         struct sigaction act;
242
243         memset(&act, 0, sizeof(act));
244
245         act.sa_handler = SIG_IGN;
246         sigemptyset(&act.sa_mask);
247         sigaddset(&act.sa_mask, signum);
248         sigaction(signum, &act, NULL);
249 }
250
251
252 /*
253   send a packet to a client
254  */
255 static int daemon_queue_send(struct ctdb_client *client, struct ctdb_req_header *hdr)
256 {
257         CTDB_INCREMENT_STAT(client->ctdb, client_packets_sent);
258         if (hdr->operation == CTDB_REQ_MESSAGE) {
259                 if (ctdb_queue_length(client->queue) > client->ctdb->tunable.max_queue_depth_drop_msg) {
260                         DEBUG(DEBUG_ERR,("CTDB_REQ_MESSAGE queue full - killing client connection.\n"));
261                         talloc_free(client);
262                         return -1;
263                 }
264         }
265         return ctdb_queue_send(client->queue, (uint8_t *)hdr, hdr->length);
266 }
267
268 /*
269   message handler for when we are in daemon mode. This redirects the message
270   to the right client
271  */
272 static void daemon_message_handler(uint64_t srvid, TDB_DATA data,
273                                    void *private_data)
274 {
275         struct ctdb_client *client = talloc_get_type(private_data, struct ctdb_client);
276         struct ctdb_req_message_old *r;
277         int len;
278
279         /* construct a message to send to the client containing the data */
280         len = offsetof(struct ctdb_req_message_old, data) + data.dsize;
281         r = ctdbd_allocate_pkt(client->ctdb, client->ctdb, CTDB_REQ_MESSAGE,
282                                len, struct ctdb_req_message_old);
283         CTDB_NO_MEMORY_VOID(client->ctdb, r);
284
285         talloc_set_name_const(r, "req_message packet");
286
287         r->srvid         = srvid;
288         r->datalen       = data.dsize;
289         memcpy(&r->data[0], data.dptr, data.dsize);
290
291         daemon_queue_send(client, &r->hdr);
292
293         talloc_free(r);
294 }
295
296 /*
297   this is called when the ctdb daemon received a ctdb request to 
298   set the srvid from the client
299  */
300 int daemon_register_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
301 {
302         struct ctdb_client *client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
303         int res;
304         if (client == NULL) {
305                 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_register_message_handler\n"));
306                 return -1;
307         }
308         res = srvid_register(ctdb->srv, client, srvid, daemon_message_handler,
309                              client);
310         if (res != 0) {
311                 DEBUG(DEBUG_ERR,(__location__ " Failed to register handler %llu in daemon\n", 
312                          (unsigned long long)srvid));
313         } else {
314                 DEBUG(DEBUG_INFO,(__location__ " Registered message handler for srvid=%llu\n", 
315                          (unsigned long long)srvid));
316         }
317
318         return res;
319 }
320
321 /*
322   this is called when the ctdb daemon received a ctdb request to 
323   remove a srvid from the client
324  */
325 int daemon_deregister_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
326 {
327         struct ctdb_client *client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
328         if (client == NULL) {
329                 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_deregister_message_handler\n"));
330                 return -1;
331         }
332         return srvid_deregister(ctdb->srv, srvid, client);
333 }
334
335 void daemon_tunnel_handler(uint64_t tunnel_id, TDB_DATA data,
336                            void *private_data)
337 {
338         struct ctdb_client *client =
339                 talloc_get_type_abort(private_data, struct ctdb_client);
340         struct ctdb_req_tunnel_old *c, *pkt;
341         size_t len;
342
343         pkt = (struct ctdb_req_tunnel_old *)data.dptr;
344
345         len = offsetof(struct ctdb_req_tunnel_old, data) + pkt->datalen;
346         c = ctdbd_allocate_pkt(client->ctdb, client->ctdb, CTDB_REQ_TUNNEL,
347                                len, struct ctdb_req_tunnel_old);
348         if (c == NULL) {
349                 DEBUG(DEBUG_ERR, ("Memory error in daemon_tunnel_handler\n"));
350                 return;
351         }
352
353         talloc_set_name_const(c, "req_tunnel packet");
354
355         c->tunnel_id = tunnel_id;
356         c->flags = pkt->flags;
357         c->datalen = pkt->datalen;
358         memcpy(c->data, pkt->data, pkt->datalen);
359
360         daemon_queue_send(client, &c->hdr);
361
362         talloc_free(c);
363 }
364
365 /*
366   destroy a ctdb_client
367 */
368 static int ctdb_client_destructor(struct ctdb_client *client)
369 {
370         struct ctdb_db_context *ctdb_db;
371
372         ctdb_takeover_client_destructor_hook(client);
373         reqid_remove(client->ctdb->idr, client->client_id);
374         client->ctdb->num_clients--;
375
376         if (client->num_persistent_updates != 0) {
377                 DEBUG(DEBUG_ERR,(__location__ " Client disconnecting with %u persistent updates in flight. Starting recovery\n", client->num_persistent_updates));
378                 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
379         }
380         ctdb_db = find_ctdb_db(client->ctdb, client->db_id);
381         if (ctdb_db) {
382                 DEBUG(DEBUG_ERR, (__location__ " client exit while transaction "
383                                   "commit active. Forcing recovery.\n"));
384                 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
385
386                 /*
387                  * trans3 transaction state:
388                  *
389                  * The destructor sets the pointer to NULL.
390                  */
391                 talloc_free(ctdb_db->persistent_state);
392         }
393
394         return 0;
395 }
396
397
398 /*
399   this is called when the ctdb daemon received a ctdb request message
400   from a local client over the unix domain socket
401  */
402 static void daemon_request_message_from_client(struct ctdb_client *client, 
403                                                struct ctdb_req_message_old *c)
404 {
405         TDB_DATA data;
406         int res;
407
408         if (c->hdr.destnode == CTDB_CURRENT_NODE) {
409                 c->hdr.destnode = ctdb_get_pnn(client->ctdb);
410         }
411
412         /* maybe the message is for another client on this node */
413         if (ctdb_get_pnn(client->ctdb)==c->hdr.destnode) {
414                 ctdb_request_message(client->ctdb, (struct ctdb_req_header *)c);
415                 return;
416         }
417
418         /* its for a remote node */
419         data.dptr = &c->data[0];
420         data.dsize = c->datalen;
421         res = ctdb_daemon_send_message(client->ctdb, c->hdr.destnode,
422                                        c->srvid, data);
423         if (res != 0) {
424                 DEBUG(DEBUG_ERR,(__location__ " Failed to send message to remote node %u\n",
425                          c->hdr.destnode));
426         }
427 }
428
429
430 struct daemon_call_state {
431         struct ctdb_client *client;
432         uint32_t reqid;
433         struct ctdb_call *call;
434         struct timeval start_time;
435
436         /* readonly request ? */
437         uint32_t readonly_fetch;
438         uint32_t client_callid;
439 };
440
441 /* 
442    complete a call from a client 
443 */
444 static void daemon_call_from_client_callback(struct ctdb_call_state *state)
445 {
446         struct daemon_call_state *dstate = talloc_get_type(state->async.private_data, 
447                                                            struct daemon_call_state);
448         struct ctdb_reply_call_old *r;
449         int res;
450         uint32_t length;
451         struct ctdb_client *client = dstate->client;
452         struct ctdb_db_context *ctdb_db = state->ctdb_db;
453
454         talloc_steal(client, dstate);
455         talloc_steal(dstate, dstate->call);
456
457         res = ctdb_daemon_call_recv(state, dstate->call);
458         if (res != 0) {
459                 DEBUG(DEBUG_ERR, (__location__ " ctdbd_call_recv() returned error\n"));
460                 CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
461
462                 CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 1", call_latency, dstate->start_time);
463                 return;
464         }
465
466         length = offsetof(struct ctdb_reply_call_old, data) + dstate->call->reply_data.dsize;
467         /* If the client asked for readonly FETCH, we remapped this to 
468            FETCH_WITH_HEADER when calling the daemon. So we must
469            strip the extra header off the reply data before passing
470            it back to the client.
471         */
472         if (dstate->readonly_fetch
473         && dstate->client_callid == CTDB_FETCH_FUNC) {
474                 length -= sizeof(struct ctdb_ltdb_header);
475         }
476
477         r = ctdbd_allocate_pkt(client->ctdb, dstate, CTDB_REPLY_CALL, 
478                                length, struct ctdb_reply_call_old);
479         if (r == NULL) {
480                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate reply_call in ctdb daemon\n"));
481                 CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
482                 CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 2", call_latency, dstate->start_time);
483                 return;
484         }
485         r->hdr.reqid        = dstate->reqid;
486         r->status           = dstate->call->status;
487
488         if (dstate->readonly_fetch
489         && dstate->client_callid == CTDB_FETCH_FUNC) {
490                 /* client only asked for a FETCH so we must strip off
491                    the extra ctdb_ltdb header
492                 */
493                 r->datalen          = dstate->call->reply_data.dsize - sizeof(struct ctdb_ltdb_header);
494                 memcpy(&r->data[0], dstate->call->reply_data.dptr + sizeof(struct ctdb_ltdb_header), r->datalen);
495         } else {
496                 r->datalen          = dstate->call->reply_data.dsize;
497                 memcpy(&r->data[0], dstate->call->reply_data.dptr, r->datalen);
498         }
499
500         res = daemon_queue_send(client, &r->hdr);
501         if (res == -1) {
502                 /* client is dead - return immediately */
503                 return;
504         }
505         if (res != 0) {
506                 DEBUG(DEBUG_ERR, (__location__ " Failed to queue packet from daemon to client\n"));
507         }
508         CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 3", call_latency, dstate->start_time);
509         CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
510         talloc_free(dstate);
511 }
512
513 struct ctdb_daemon_packet_wrap {
514         struct ctdb_context *ctdb;
515         uint32_t client_id;
516 };
517
518 /*
519   a wrapper to catch disconnected clients
520  */
521 static void daemon_incoming_packet_wrap(void *p, struct ctdb_req_header *hdr)
522 {
523         struct ctdb_client *client;
524         struct ctdb_daemon_packet_wrap *w = talloc_get_type(p, 
525                                                             struct ctdb_daemon_packet_wrap);
526         if (w == NULL) {
527                 DEBUG(DEBUG_CRIT,(__location__ " Bad packet type '%s'\n", talloc_get_name(p)));
528                 return;
529         }
530
531         client = reqid_find(w->ctdb->idr, w->client_id, struct ctdb_client);
532         if (client == NULL) {
533                 DEBUG(DEBUG_ERR,(__location__ " Packet for disconnected client %u\n",
534                          w->client_id));
535                 talloc_free(w);
536                 return;
537         }
538         talloc_free(w);
539
540         /* process it */
541         daemon_incoming_packet(client, hdr);    
542 }
543
544 struct ctdb_deferred_fetch_call {
545         struct ctdb_deferred_fetch_call *next, *prev;
546         struct ctdb_req_call_old *c;
547         struct ctdb_daemon_packet_wrap *w;
548 };
549
550 struct ctdb_deferred_fetch_queue {
551         struct ctdb_deferred_fetch_call *deferred_calls;
552 };
553
554 struct ctdb_deferred_requeue {
555         struct ctdb_deferred_fetch_call *dfc;
556         struct ctdb_client *client;
557 };
558
559 /* called from a timer event and starts reprocessing the deferred call.*/
560 static void reprocess_deferred_call(struct tevent_context *ev,
561                                     struct tevent_timer *te,
562                                     struct timeval t, void *private_data)
563 {
564         struct ctdb_deferred_requeue *dfr = (struct ctdb_deferred_requeue *)private_data;
565         struct ctdb_client *client = dfr->client;
566
567         talloc_steal(client, dfr->dfc->c);
568         daemon_incoming_packet(client, (struct ctdb_req_header *)dfr->dfc->c);
569         talloc_free(dfr);
570 }
571
572 /* the referral context is destroyed either after a timeout or when the initial
573    fetch-lock has finished.
574    at this stage, immediately start reprocessing the queued up deferred
575    calls so they get reprocessed immediately (and since we are dmaster at
576    this stage, trigger the waiting smbd processes to pick up and aquire the
577    record right away.
578 */
579 static int deferred_fetch_queue_destructor(struct ctdb_deferred_fetch_queue *dfq)
580 {
581
582         /* need to reprocess the packets from the queue explicitely instead of
583            just using a normal destructor since we want, need, to
584            call the clients in the same oder as the requests queued up
585         */
586         while (dfq->deferred_calls != NULL) {
587                 struct ctdb_client *client;
588                 struct ctdb_deferred_fetch_call *dfc = dfq->deferred_calls;
589                 struct ctdb_deferred_requeue *dfr;
590
591                 DLIST_REMOVE(dfq->deferred_calls, dfc);
592
593                 client = reqid_find(dfc->w->ctdb->idr, dfc->w->client_id, struct ctdb_client);
594                 if (client == NULL) {
595                         DEBUG(DEBUG_ERR,(__location__ " Packet for disconnected client %u\n",
596                                  dfc->w->client_id));
597                         continue;
598                 }
599
600                 /* process it by pushing it back onto the eventloop */
601                 dfr = talloc(client, struct ctdb_deferred_requeue);
602                 if (dfr == NULL) {
603                         DEBUG(DEBUG_ERR,("Failed to allocate deferred fetch requeue structure\n"));
604                         continue;
605                 }
606
607                 dfr->dfc    = talloc_steal(dfr, dfc);
608                 dfr->client = client;
609
610                 tevent_add_timer(dfc->w->ctdb->ev, client, timeval_zero(),
611                                  reprocess_deferred_call, dfr);
612         }
613
614         return 0;
615 }
616
617 /* insert the new deferral context into the rb tree.
618    there should never be a pre-existing context here, but check for it
619    warn and destroy the previous context if there is already a deferral context
620    for this key.
621 */
622 static void *insert_dfq_callback(void *parm, void *data)
623 {
624         if (data) {
625                 DEBUG(DEBUG_ERR,("Already have DFQ registered. Free old %p and create new %p\n", data, parm));
626                 talloc_free(data);
627         }
628         return parm;
629 }
630
631 /* if the original fetch-lock did not complete within a reasonable time,
632    free the context and context for all deferred requests to cause them to be
633    re-inserted into the event system.
634 */
635 static void dfq_timeout(struct tevent_context *ev, struct tevent_timer *te,
636                         struct timeval t, void *private_data)
637 {
638         talloc_free(private_data);
639 }
640
641 /* This function is used in the local daemon to register a KEY in a database
642    for being "fetched"
643    While the remote fetch is in-flight, any futher attempts to re-fetch the
644    same record will be deferred until the fetch completes.
645 */
646 static int setup_deferred_fetch_locks(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
647 {
648         uint32_t *k;
649         struct ctdb_deferred_fetch_queue *dfq;
650
651         k = ctdb_key_to_idkey(call, call->key);
652         if (k == NULL) {
653                 DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch\n"));
654                 return -1;
655         }
656
657         dfq  = talloc(call, struct ctdb_deferred_fetch_queue);
658         if (dfq == NULL) {
659                 DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch queue structure\n"));
660                 talloc_free(k);
661                 return -1;
662         }
663         dfq->deferred_calls = NULL;
664
665         trbt_insertarray32_callback(ctdb_db->deferred_fetch, k[0], &k[0], insert_dfq_callback, dfq);
666
667         talloc_set_destructor(dfq, deferred_fetch_queue_destructor);
668
669         /* if the fetch havent completed in 30 seconds, just tear it all down
670            and let it try again as the events are reissued */
671         tevent_add_timer(ctdb_db->ctdb->ev, dfq, timeval_current_ofs(30, 0),
672                          dfq_timeout, dfq);
673
674         talloc_free(k);
675         return 0;
676 }
677
678 /* check if this is a duplicate request to a fetch already in-flight
679    if it is, make this call deferred to be reprocessed later when
680    the in-flight fetch completes.
681 */
682 static int requeue_duplicate_fetch(struct ctdb_db_context *ctdb_db, struct ctdb_client *client, TDB_DATA key, struct ctdb_req_call_old *c)
683 {
684         uint32_t *k;
685         struct ctdb_deferred_fetch_queue *dfq;
686         struct ctdb_deferred_fetch_call *dfc;
687
688         k = ctdb_key_to_idkey(c, key);
689         if (k == NULL) {
690                 DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch\n"));
691                 return -1;
692         }
693
694         dfq = trbt_lookuparray32(ctdb_db->deferred_fetch, k[0], &k[0]);
695         if (dfq == NULL) {
696                 talloc_free(k);
697                 return -1;
698         }
699
700
701         talloc_free(k);
702
703         dfc = talloc(dfq, struct ctdb_deferred_fetch_call);
704         if (dfc == NULL) {
705                 DEBUG(DEBUG_ERR, ("Failed to allocate deferred fetch call structure\n"));
706                 return -1;
707         }
708
709         dfc->w = talloc(dfc, struct ctdb_daemon_packet_wrap);
710         if (dfc->w == NULL) {
711                 DEBUG(DEBUG_ERR,("Failed to allocate deferred fetch daemon packet wrap structure\n"));
712                 talloc_free(dfc);
713                 return -1;
714         }
715
716         dfc->c = talloc_steal(dfc, c);
717         dfc->w->ctdb = ctdb_db->ctdb;
718         dfc->w->client_id = client->client_id;
719
720         DLIST_ADD_END(dfq->deferred_calls, dfc);
721
722         return 0;
723 }
724
725
726 /*
727   this is called when the ctdb daemon received a ctdb request call
728   from a local client over the unix domain socket
729  */
730 static void daemon_request_call_from_client(struct ctdb_client *client, 
731                                             struct ctdb_req_call_old *c)
732 {
733         struct ctdb_call_state *state;
734         struct ctdb_db_context *ctdb_db;
735         struct daemon_call_state *dstate;
736         struct ctdb_call *call;
737         struct ctdb_ltdb_header header;
738         TDB_DATA key, data;
739         int ret;
740         struct ctdb_context *ctdb = client->ctdb;
741         struct ctdb_daemon_packet_wrap *w;
742
743         CTDB_INCREMENT_STAT(ctdb, total_calls);
744         CTDB_INCREMENT_STAT(ctdb, pending_calls);
745
746         ctdb_db = find_ctdb_db(client->ctdb, c->db_id);
747         if (!ctdb_db) {
748                 DEBUG(DEBUG_ERR, (__location__ " Unknown database in request. db_id==0x%08x",
749                           c->db_id));
750                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
751                 return;
752         }
753
754         if (ctdb_db->unhealthy_reason) {
755                 /*
756                  * this is just a warning, as the tdb should be empty anyway,
757                  * and only persistent databases can be unhealthy, which doesn't
758                  * use this code patch
759                  */
760                 DEBUG(DEBUG_WARNING,("warn: db(%s) unhealty in daemon_request_call_from_client(): %s\n",
761                                      ctdb_db->db_name, ctdb_db->unhealthy_reason));
762         }
763
764         key.dptr = c->data;
765         key.dsize = c->keylen;
766
767         w = talloc(ctdb, struct ctdb_daemon_packet_wrap);
768         CTDB_NO_MEMORY_VOID(ctdb, w);   
769
770         w->ctdb = ctdb;
771         w->client_id = client->client_id;
772
773         ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header, 
774                                            (struct ctdb_req_header *)c, &data,
775                                            daemon_incoming_packet_wrap, w, true);
776         if (ret == -2) {
777                 /* will retry later */
778                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
779                 return;
780         }
781
782         talloc_free(w);
783
784         if (ret != 0) {
785                 DEBUG(DEBUG_ERR,(__location__ " Unable to fetch record\n"));
786                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
787                 return;
788         }
789
790
791         /* check if this fetch request is a duplicate for a
792            request we already have in flight. If so defer it until
793            the first request completes.
794         */
795         if (ctdb->tunable.fetch_collapse == 1) {
796                 if (requeue_duplicate_fetch(ctdb_db, client, key, c) == 0) {
797                         ret = ctdb_ltdb_unlock(ctdb_db, key);
798                         if (ret != 0) {
799                                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
800                         }
801                         CTDB_DECREMENT_STAT(ctdb, pending_calls);
802                         talloc_free(data.dptr);
803                         return;
804                 }
805         }
806
807         /* Dont do READONLY if we don't have a tracking database */
808         if ((c->flags & CTDB_WANT_READONLY) && !ctdb_db_readonly(ctdb_db)) {
809                 c->flags &= ~CTDB_WANT_READONLY;
810         }
811
812         if (header.flags & CTDB_REC_RO_REVOKE_COMPLETE) {
813                 header.flags &= ~CTDB_REC_RO_FLAGS;
814                 CTDB_INCREMENT_STAT(ctdb, total_ro_revokes);
815                 CTDB_INCREMENT_DB_STAT(ctdb_db, db_ro_revokes);
816                 if (ctdb_ltdb_store(ctdb_db, key, &header, data) != 0) {
817                         ctdb_fatal(ctdb, "Failed to write header with cleared REVOKE flag");
818                 }
819                 /* and clear out the tracking data */
820                 if (tdb_delete(ctdb_db->rottdb, key) != 0) {
821                         DEBUG(DEBUG_ERR,(__location__ " Failed to clear out trackingdb record\n"));
822                 }
823         }
824
825         /* if we are revoking, we must defer all other calls until the revoke
826          * had completed.
827          */
828         if (header.flags & CTDB_REC_RO_REVOKING_READONLY) {
829                 talloc_free(data.dptr);
830                 ret = ctdb_ltdb_unlock(ctdb_db, key);
831
832                 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, key, (struct ctdb_req_header *)c, daemon_incoming_packet, client) != 0) {
833                         ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
834                 }
835                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
836                 return;
837         }
838
839         if ((header.dmaster == ctdb->pnn)
840         && (!(c->flags & CTDB_WANT_READONLY))
841         && (header.flags & (CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY)) ) {
842                 header.flags   |= CTDB_REC_RO_REVOKING_READONLY;
843                 if (ctdb_ltdb_store(ctdb_db, key, &header, data) != 0) {
844                         ctdb_fatal(ctdb, "Failed to store record with HAVE_DELEGATIONS set");
845                 }
846                 ret = ctdb_ltdb_unlock(ctdb_db, key);
847
848                 if (ctdb_start_revoke_ro_record(ctdb, ctdb_db, key, &header, data) != 0) {
849                         ctdb_fatal(ctdb, "Failed to start record revoke");
850                 }
851                 talloc_free(data.dptr);
852
853                 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, key, (struct ctdb_req_header *)c, daemon_incoming_packet, client) != 0) {
854                         ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
855                 }
856
857                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
858                 return;
859         }               
860
861         dstate = talloc(client, struct daemon_call_state);
862         if (dstate == NULL) {
863                 ret = ctdb_ltdb_unlock(ctdb_db, key);
864                 if (ret != 0) {
865                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
866                 }
867
868                 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate dstate\n"));
869                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
870                 return;
871         }
872         dstate->start_time = timeval_current();
873         dstate->client = client;
874         dstate->reqid  = c->hdr.reqid;
875         talloc_steal(dstate, data.dptr);
876
877         call = dstate->call = talloc_zero(dstate, struct ctdb_call);
878         if (call == NULL) {
879                 ret = ctdb_ltdb_unlock(ctdb_db, key);
880                 if (ret != 0) {
881                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
882                 }
883
884                 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate call\n"));
885                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
886                 CTDB_UPDATE_LATENCY(ctdb, ctdb_db, "call_from_client 1", call_latency, dstate->start_time);
887                 return;
888         }
889
890         dstate->readonly_fetch = 0;
891         call->call_id = c->callid;
892         call->key = key;
893         call->call_data.dptr = c->data + c->keylen;
894         call->call_data.dsize = c->calldatalen;
895         call->flags = c->flags;
896
897         if (c->flags & CTDB_WANT_READONLY) {
898                 /* client wants readonly record, so translate this into a 
899                    fetch with header. remember what the client asked for
900                    so we can remap the reply back to the proper format for
901                    the client in the reply
902                  */
903                 dstate->client_callid = call->call_id;
904                 call->call_id = CTDB_FETCH_WITH_HEADER_FUNC;
905                 dstate->readonly_fetch = 1;
906         }
907
908         if (header.dmaster == ctdb->pnn) {
909                 state = ctdb_call_local_send(ctdb_db, call, &header, &data);
910         } else {
911                 state = ctdb_daemon_call_send_remote(ctdb_db, call, &header);
912                 if (ctdb->tunable.fetch_collapse == 1) {
913                         /* This request triggered a remote fetch-lock.
914                            set up a deferral for this key so any additional
915                            fetch-locks are deferred until the current one
916                            finishes.
917                          */
918                         setup_deferred_fetch_locks(ctdb_db, call);
919                 }
920         }
921
922         ret = ctdb_ltdb_unlock(ctdb_db, key);
923         if (ret != 0) {
924                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
925         }
926
927         if (state == NULL) {
928                 DEBUG(DEBUG_ERR,(__location__ " Unable to setup call send\n"));
929                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
930                 CTDB_UPDATE_LATENCY(ctdb, ctdb_db, "call_from_client 2", call_latency, dstate->start_time);
931                 return;
932         }
933         talloc_steal(state, dstate);
934         talloc_steal(client, state);
935
936         state->async.fn = daemon_call_from_client_callback;
937         state->async.private_data = dstate;
938 }
939
940
941 static void daemon_request_control_from_client(struct ctdb_client *client, 
942                                                struct ctdb_req_control_old *c);
943 static void daemon_request_tunnel_from_client(struct ctdb_client *client,
944                                               struct ctdb_req_tunnel_old *c);
945
946 /* data contains a packet from the client */
947 static void daemon_incoming_packet(void *p, struct ctdb_req_header *hdr)
948 {
949         struct ctdb_client *client = talloc_get_type(p, struct ctdb_client);
950         TALLOC_CTX *tmp_ctx;
951         struct ctdb_context *ctdb = client->ctdb;
952
953         /* place the packet as a child of a tmp_ctx. We then use
954            talloc_free() below to free it. If any of the calls want
955            to keep it, then they will steal it somewhere else, and the
956            talloc_free() will be a no-op */
957         tmp_ctx = talloc_new(client);
958         talloc_steal(tmp_ctx, hdr);
959
960         if (hdr->ctdb_magic != CTDB_MAGIC) {
961                 ctdb_set_error(client->ctdb, "Non CTDB packet rejected in daemon\n");
962                 goto done;
963         }
964
965         if (hdr->ctdb_version != CTDB_PROTOCOL) {
966                 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
967                 goto done;
968         }
969
970         switch (hdr->operation) {
971         case CTDB_REQ_CALL:
972                 CTDB_INCREMENT_STAT(ctdb, client.req_call);
973                 daemon_request_call_from_client(client, (struct ctdb_req_call_old *)hdr);
974                 break;
975
976         case CTDB_REQ_MESSAGE:
977                 CTDB_INCREMENT_STAT(ctdb, client.req_message);
978                 daemon_request_message_from_client(client, (struct ctdb_req_message_old *)hdr);
979                 break;
980
981         case CTDB_REQ_CONTROL:
982                 CTDB_INCREMENT_STAT(ctdb, client.req_control);
983                 daemon_request_control_from_client(client, (struct ctdb_req_control_old *)hdr);
984                 break;
985
986         case CTDB_REQ_TUNNEL:
987                 CTDB_INCREMENT_STAT(ctdb, client.req_tunnel);
988                 daemon_request_tunnel_from_client(client, (struct ctdb_req_tunnel_old *)hdr);
989                 break;
990
991         default:
992                 DEBUG(DEBUG_CRIT,(__location__ " daemon: unrecognized operation %u\n",
993                          hdr->operation));
994         }
995
996 done:
997         talloc_free(tmp_ctx);
998 }
999
1000 /*
1001   called when the daemon gets a incoming packet
1002  */
1003 static void ctdb_daemon_read_cb(uint8_t *data, size_t cnt, void *args)
1004 {
1005         struct ctdb_client *client = talloc_get_type(args, struct ctdb_client);
1006         struct ctdb_req_header *hdr;
1007
1008         if (cnt == 0) {
1009                 talloc_free(client);
1010                 return;
1011         }
1012
1013         CTDB_INCREMENT_STAT(client->ctdb, client_packets_recv);
1014
1015         if (cnt < sizeof(*hdr)) {
1016                 ctdb_set_error(client->ctdb, "Bad packet length %u in daemon\n", 
1017                                (unsigned)cnt);
1018                 return;
1019         }
1020         hdr = (struct ctdb_req_header *)data;
1021
1022         if (hdr->ctdb_magic != CTDB_MAGIC) {
1023                 ctdb_set_error(client->ctdb, "Non CTDB packet rejected\n");
1024                 goto err_out;
1025         }
1026
1027         if (hdr->ctdb_version != CTDB_PROTOCOL) {
1028                 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
1029                 goto err_out;
1030         }
1031
1032         DEBUG(DEBUG_DEBUG,(__location__ " client request %u of type %u length %u from "
1033                  "node %u to %u\n", hdr->reqid, hdr->operation, hdr->length,
1034                  hdr->srcnode, hdr->destnode));
1035
1036         /* it is the responsibility of the incoming packet function to free 'data' */
1037         daemon_incoming_packet(client, hdr);
1038         return;
1039
1040 err_out:
1041         TALLOC_FREE(data);
1042 }
1043
1044
1045 static int ctdb_clientpid_destructor(struct ctdb_client_pid_list *client_pid)
1046 {
1047         if (client_pid->ctdb->client_pids != NULL) {
1048                 DLIST_REMOVE(client_pid->ctdb->client_pids, client_pid);
1049         }
1050
1051         return 0;
1052 }
1053
1054 static int get_new_client_id(struct reqid_context *idr,
1055                              struct ctdb_client *client,
1056                              uint32_t *out)
1057 {
1058         uint32_t client_id;
1059
1060         client_id = reqid_new(idr, client);
1061         /*
1062          * Some places in the code (e.g. ctdb_control_db_attach(),
1063          * ctdb_control_db_detach()) assign a special meaning to
1064          * client_id 0.  The assumption is that if client_id is 0 then
1065          * the control has come from another daemon.  Therefore, we
1066          * should never return client_id == 0.
1067          */
1068         if (client_id == 0) {
1069                 /*
1070                  * Don't leak ID 0.  This is safe because the ID keeps
1071                  * increasing.  A test will be added to ensure that
1072                  * this doesn't change.
1073                  */
1074                 reqid_remove(idr, 0);
1075
1076                 client_id = reqid_new(idr, client);
1077         }
1078
1079         if (client_id == REQID_INVALID) {
1080                 return EINVAL;
1081         }
1082
1083         if (client_id == 0) {
1084                 /* Every other ID must have been used and we can't use 0 */
1085                 reqid_remove(idr, 0);
1086                 return EINVAL;
1087         }
1088
1089         *out = client_id;
1090         return 0;
1091 }
1092
1093 static void ctdb_accept_client(struct tevent_context *ev,
1094                                struct tevent_fd *fde, uint16_t flags,
1095                                void *private_data)
1096 {
1097         struct sockaddr_un addr;
1098         socklen_t len;
1099         int fd;
1100         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
1101         struct ctdb_client *client;
1102         struct ctdb_client_pid_list *client_pid;
1103         pid_t peer_pid = 0;
1104         int ret;
1105
1106         memset(&addr, 0, sizeof(addr));
1107         len = sizeof(addr);
1108         fd = accept(ctdb->daemon.sd, (struct sockaddr *)&addr, &len);
1109         if (fd == -1) {
1110                 return;
1111         }
1112         smb_set_close_on_exec(fd);
1113
1114         ret = set_blocking(fd, false);
1115         if (ret != 0) {
1116                 DEBUG(DEBUG_ERR,
1117                       (__location__
1118                        " failed to set socket non-blocking (%s)\n",
1119                        strerror(errno)));
1120                 close(fd);
1121                 return;
1122         }
1123
1124         set_close_on_exec(fd);
1125
1126         DEBUG(DEBUG_DEBUG,(__location__ " Created SOCKET FD:%d to connected child\n", fd));
1127
1128         client = talloc_zero(ctdb, struct ctdb_client);
1129         if (ctdb_get_peer_pid(fd, &peer_pid) == 0) {
1130                 DEBUG(DEBUG_INFO,("Connected client with pid:%u\n", (unsigned)peer_pid));
1131         }
1132
1133         client->ctdb = ctdb;
1134         client->fd = fd;
1135
1136         ret = get_new_client_id(ctdb->idr, client, &client->client_id);
1137         if (ret != 0) {
1138                 DBG_ERR("Unable to get client ID (%d)\n", ret);
1139                 close(fd);
1140                 talloc_free(client);
1141                 return;
1142         }
1143
1144         client->pid = peer_pid;
1145
1146         client_pid = talloc(client, struct ctdb_client_pid_list);
1147         if (client_pid == NULL) {
1148                 DEBUG(DEBUG_ERR,("Failed to allocate client pid structure\n"));
1149                 close(fd);
1150                 talloc_free(client);
1151                 return;
1152         }               
1153         client_pid->ctdb   = ctdb;
1154         client_pid->pid    = peer_pid;
1155         client_pid->client = client;
1156
1157         DLIST_ADD(ctdb->client_pids, client_pid);
1158
1159         client->queue = ctdb_queue_setup(ctdb, client, fd, CTDB_DS_ALIGNMENT, 
1160                                          ctdb_daemon_read_cb, client,
1161                                          "client-%u", client->pid);
1162
1163         talloc_set_destructor(client, ctdb_client_destructor);
1164         talloc_set_destructor(client_pid, ctdb_clientpid_destructor);
1165         ctdb->num_clients++;
1166 }
1167
1168
1169
1170 /*
1171   create a unix domain socket and bind it
1172   return a file descriptor open on the socket 
1173 */
1174 static int ux_socket_bind(struct ctdb_context *ctdb)
1175 {
1176         struct sockaddr_un addr;
1177         int ret;
1178
1179         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
1180         if (ctdb->daemon.sd == -1) {
1181                 return -1;
1182         }
1183
1184         memset(&addr, 0, sizeof(addr));
1185         addr.sun_family = AF_UNIX;
1186         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path)-1);
1187
1188         if (! sock_clean(ctdb->daemon.name)) {
1189                 return -1;
1190         }
1191
1192         set_close_on_exec(ctdb->daemon.sd);
1193
1194         ret = set_blocking(ctdb->daemon.sd, false);
1195         if (ret != 0) {
1196                 DEBUG(DEBUG_ERR,
1197                       (__location__
1198                        " failed to set socket non-blocking (%s)\n",
1199                        strerror(errno)));
1200                 goto failed;
1201         }
1202
1203         if (bind(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
1204                 DEBUG(DEBUG_CRIT,("Unable to bind on ctdb socket '%s'\n", ctdb->daemon.name));
1205                 goto failed;
1206         }
1207
1208         if (chown(ctdb->daemon.name, geteuid(), getegid()) != 0 ||
1209             chmod(ctdb->daemon.name, 0700) != 0) {
1210                 DEBUG(DEBUG_CRIT,("Unable to secure ctdb socket '%s', ctdb->daemon.name\n", ctdb->daemon.name));
1211                 goto failed;
1212         }
1213
1214
1215         if (listen(ctdb->daemon.sd, 100) != 0) {
1216                 DEBUG(DEBUG_CRIT,("Unable to listen on ctdb socket '%s'\n", ctdb->daemon.name));
1217                 goto failed;
1218         }
1219
1220         DEBUG(DEBUG_NOTICE, ("Listening to ctdb socket %s\n",
1221                              ctdb->daemon.name));
1222         return 0;
1223
1224 failed:
1225         close(ctdb->daemon.sd);
1226         ctdb->daemon.sd = -1;
1227         return -1;      
1228 }
1229
1230 static void initialise_node_flags (struct ctdb_context *ctdb)
1231 {
1232         unsigned int i;
1233
1234         /* Always found: PNN correctly set just before this is called */
1235         for (i = 0; i < ctdb->num_nodes; i++) {
1236                 if (ctdb->pnn == ctdb->nodes[i]->pnn) {
1237                         break;
1238                 }
1239         }
1240
1241         ctdb->nodes[i]->flags &= ~NODE_FLAGS_DISCONNECTED;
1242
1243         /* do we start out in DISABLED mode? */
1244         if (ctdb->start_as_disabled != 0) {
1245                 D_ERR("This node is configured to start in DISABLED state\n");
1246                 ctdb->nodes[i]->flags |= NODE_FLAGS_DISABLED;
1247         }
1248         /* do we start out in STOPPED mode? */
1249         if (ctdb->start_as_stopped != 0) {
1250                 D_ERR("This node is configured to start in STOPPED state\n");
1251                 ctdb->nodes[i]->flags |= NODE_FLAGS_STOPPED;
1252         }
1253 }
1254
1255 static void ctdb_setup_event_callback(struct ctdb_context *ctdb, int status,
1256                                       void *private_data)
1257 {
1258         if (status != 0) {
1259                 ctdb_die(ctdb, "Failed to run setup event");
1260         }
1261         ctdb_run_notification_script(ctdb, "setup");
1262
1263         /* Start the recovery daemon */
1264         if (ctdb_start_recoverd(ctdb) != 0) {
1265                 DEBUG(DEBUG_ALERT,("Failed to start recovery daemon\n"));
1266                 exit(11);
1267         }
1268
1269         ctdb_start_periodic_events(ctdb);
1270
1271         ctdb_wait_for_first_recovery(ctdb);
1272 }
1273
1274 static struct timeval tevent_before_wait_ts;
1275 static struct timeval tevent_after_wait_ts;
1276
1277 static void ctdb_tevent_trace_init(void)
1278 {
1279         struct timeval now;
1280
1281         now = timeval_current();
1282
1283         tevent_before_wait_ts = now;
1284         tevent_after_wait_ts = now;
1285 }
1286
1287 static void ctdb_tevent_trace(enum tevent_trace_point tp,
1288                               void *private_data)
1289 {
1290         struct timeval diff;
1291         struct timeval now;
1292         struct ctdb_context *ctdb =
1293                 talloc_get_type(private_data, struct ctdb_context);
1294
1295         if (getpid() != ctdb->ctdbd_pid) {
1296                 return;
1297         }
1298
1299         now = timeval_current();
1300
1301         switch (tp) {
1302         case TEVENT_TRACE_BEFORE_WAIT:
1303                 diff = timeval_until(&tevent_after_wait_ts, &now);
1304                 if (diff.tv_sec > 3) {
1305                         DEBUG(DEBUG_ERR,
1306                               ("Handling event took %ld seconds!\n",
1307                                (long)diff.tv_sec));
1308                 }
1309                 tevent_before_wait_ts = now;
1310                 break;
1311
1312         case TEVENT_TRACE_AFTER_WAIT:
1313                 diff = timeval_until(&tevent_before_wait_ts, &now);
1314                 if (diff.tv_sec > 3) {
1315                         DEBUG(DEBUG_ERR,
1316                               ("No event for %ld seconds!\n",
1317                                (long)diff.tv_sec));
1318                 }
1319                 tevent_after_wait_ts = now;
1320                 break;
1321
1322         default:
1323                 /* Do nothing for future tevent trace points */ ;
1324         }
1325 }
1326
1327 static void ctdb_remove_pidfile(void)
1328 {
1329         TALLOC_FREE(ctdbd_pidfile_ctx);
1330 }
1331
1332 static void ctdb_create_pidfile(TALLOC_CTX *mem_ctx)
1333 {
1334         if (ctdbd_pidfile != NULL) {
1335                 int ret = pidfile_context_create(mem_ctx, ctdbd_pidfile,
1336                                                  &ctdbd_pidfile_ctx);
1337                 if (ret != 0) {
1338                         DEBUG(DEBUG_ERR,
1339                               ("Failed to create PID file %s\n",
1340                                ctdbd_pidfile));
1341                         exit(11);
1342                 }
1343
1344                 DEBUG(DEBUG_NOTICE, ("Created PID file %s\n", ctdbd_pidfile));
1345                 atexit(ctdb_remove_pidfile);
1346         }
1347 }
1348
1349 static void ctdb_initialise_vnn_map(struct ctdb_context *ctdb)
1350 {
1351         unsigned int i, j, count;
1352
1353         /* initialize the vnn mapping table, skipping any deleted nodes */
1354         ctdb->vnn_map = talloc(ctdb, struct ctdb_vnn_map);
1355         CTDB_NO_MEMORY_FATAL(ctdb, ctdb->vnn_map);
1356
1357         count = 0;
1358         for (i = 0; i < ctdb->num_nodes; i++) {
1359                 if ((ctdb->nodes[i]->flags & NODE_FLAGS_DELETED) == 0) {
1360                         count++;
1361                 }
1362         }
1363
1364         ctdb->vnn_map->generation = INVALID_GENERATION;
1365         ctdb->vnn_map->size = count;
1366         ctdb->vnn_map->map = talloc_array(ctdb->vnn_map, uint32_t, ctdb->vnn_map->size);
1367         CTDB_NO_MEMORY_FATAL(ctdb, ctdb->vnn_map->map);
1368
1369         for(i=0, j=0; i < ctdb->vnn_map->size; i++) {
1370                 if (ctdb->nodes[i]->flags & NODE_FLAGS_DELETED) {
1371                         continue;
1372                 }
1373                 ctdb->vnn_map->map[j] = i;
1374                 j++;
1375         }
1376 }
1377
1378 static void ctdb_set_my_pnn(struct ctdb_context *ctdb)
1379 {
1380         if (ctdb->address == NULL) {
1381                 ctdb_fatal(ctdb,
1382                            "Can not determine PNN - node address is not set\n");
1383         }
1384
1385         ctdb->pnn = ctdb_ip_to_pnn(ctdb, ctdb->address);
1386         if (ctdb->pnn == CTDB_UNKNOWN_PNN) {
1387                 ctdb_fatal(ctdb,
1388                            "Can not determine PNN - unknown node address\n");
1389         }
1390
1391         D_NOTICE("PNN is %u\n", ctdb->pnn);
1392 }
1393
1394 /*
1395   start the protocol going as a daemon
1396 */
1397 int ctdb_start_daemon(struct ctdb_context *ctdb, bool do_fork)
1398 {
1399         int res, ret = -1;
1400         struct tevent_fd *fde;
1401
1402         become_daemon(do_fork, !do_fork, false);
1403
1404         ignore_signal(SIGPIPE);
1405         ignore_signal(SIGUSR1);
1406
1407         ctdb->ctdbd_pid = getpid();
1408         DEBUG(DEBUG_ERR, ("Starting CTDBD (Version %s) as PID: %u\n",
1409                           SAMBA_VERSION_STRING, ctdb->ctdbd_pid));
1410         ctdb_create_pidfile(ctdb);
1411
1412         /* create a unix domain stream socket to listen to */
1413         res = ux_socket_bind(ctdb);
1414         if (res!=0) {
1415                 DEBUG(DEBUG_ALERT,("Cannot continue.  Exiting!\n"));
1416                 exit(10);
1417         }
1418
1419         /* Make sure we log something when the daemon terminates.
1420          * This must be the first exit handler to run (so the last to
1421          * be registered.
1422          */
1423         __ctdbd_pid = getpid();
1424         atexit(print_exit_message);
1425
1426         if (ctdb->do_setsched) {
1427                 /* try to set us up as realtime */
1428                 if (!set_scheduler()) {
1429                         exit(1);
1430                 }
1431                 DEBUG(DEBUG_NOTICE, ("Set real-time scheduler priority\n"));
1432         }
1433
1434         ctdb->ev = tevent_context_init(NULL);
1435         if (ctdb->ev == NULL) {
1436                 DEBUG(DEBUG_ALERT,("tevent_context_init() failed\n"));
1437                 exit(1);
1438         }
1439         tevent_loop_allow_nesting(ctdb->ev);
1440         ctdb_tevent_trace_init();
1441         tevent_set_trace_callback(ctdb->ev, ctdb_tevent_trace, ctdb);
1442
1443         /* set up a handler to pick up sigchld */
1444         if (ctdb_init_sigchld(ctdb) == NULL) {
1445                 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD\n"));
1446                 exit(1);
1447         }
1448
1449         if (do_fork) {
1450                 ctdb_set_child_logging(ctdb);
1451         }
1452
1453         TALLOC_FREE(ctdb->srv);
1454         if (srvid_init(ctdb, &ctdb->srv) != 0) {
1455                 DEBUG(DEBUG_CRIT,("Failed to setup message srvid context\n"));
1456                 exit(1);
1457         }
1458
1459         TALLOC_FREE(ctdb->tunnels);
1460         if (srvid_init(ctdb, &ctdb->tunnels) != 0) {
1461                 DEBUG(DEBUG_ERR, ("Failed to setup tunnels context\n"));
1462                 exit(1);
1463         }
1464
1465         /* initialize statistics collection */
1466         ctdb_statistics_init(ctdb);
1467
1468         /* force initial recovery for election */
1469         ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
1470
1471         if (ctdb_start_eventd(ctdb) != 0) {
1472                 DEBUG(DEBUG_ERR, ("Failed to start event daemon\n"));
1473                 exit(1);
1474         }
1475
1476         ctdb_set_runstate(ctdb, CTDB_RUNSTATE_INIT);
1477         ret = ctdb_event_script(ctdb, CTDB_EVENT_INIT);
1478         if (ret != 0) {
1479                 ctdb_die(ctdb, "Failed to run init event\n");
1480         }
1481         ctdb_run_notification_script(ctdb, "init");
1482
1483         if (strcmp(ctdb->transport, "tcp") == 0) {
1484                 ret = ctdb_tcp_init(ctdb);
1485         }
1486 #ifdef USE_INFINIBAND
1487         if (strcmp(ctdb->transport, "ib") == 0) {
1488                 ret = ctdb_ibw_init(ctdb);
1489         }
1490 #endif
1491         if (ret != 0) {
1492                 DEBUG(DEBUG_ERR,("Failed to initialise transport '%s'\n", ctdb->transport));
1493                 return -1;
1494         }
1495
1496         if (ctdb->methods == NULL) {
1497                 DEBUG(DEBUG_ALERT,(__location__ " Can not initialize transport. ctdb->methods is NULL\n"));
1498                 ctdb_fatal(ctdb, "transport is unavailable. can not initialize.");
1499         }
1500
1501         /* Initialise the transport.  This sets the node address if it
1502          * was not set via the command-line. */
1503         if (ctdb->methods->initialise(ctdb) != 0) {
1504                 ctdb_fatal(ctdb, "transport failed to initialise");
1505         }
1506
1507         ctdb_set_my_pnn(ctdb);
1508
1509         initialise_node_flags(ctdb);
1510
1511         ret = ctdb_set_public_addresses(ctdb, true);
1512         if (ret == -1) {
1513                 D_ERR("Unable to setup public IP addresses\n");
1514                 exit(1);
1515         }
1516
1517         ctdb_initialise_vnn_map(ctdb);
1518
1519         /* attach to existing databases */
1520         if (ctdb_attach_databases(ctdb) != 0) {
1521                 ctdb_fatal(ctdb, "Failed to attach to databases\n");
1522         }
1523
1524         /* start frozen, then let the first election sort things out */
1525         if (!ctdb_blocking_freeze(ctdb)) {
1526                 ctdb_fatal(ctdb, "Failed to get initial freeze\n");
1527         }
1528
1529         /* now start accepting clients, only can do this once frozen */
1530         fde = tevent_add_fd(ctdb->ev, ctdb, ctdb->daemon.sd, TEVENT_FD_READ,
1531                             ctdb_accept_client, ctdb);
1532         if (fde == NULL) {
1533                 ctdb_fatal(ctdb, "Failed to add daemon socket to event loop");
1534         }
1535         tevent_fd_set_auto_close(fde);
1536
1537         /* Start the transport */
1538         if (ctdb->methods->start(ctdb) != 0) {
1539                 DEBUG(DEBUG_ALERT,("transport failed to start!\n"));
1540                 ctdb_fatal(ctdb, "transport failed to start");
1541         }
1542
1543         /* Recovery daemon and timed events are started from the
1544          * callback, only after the setup event completes
1545          * successfully.
1546          */
1547         ctdb_set_runstate(ctdb, CTDB_RUNSTATE_SETUP);
1548         ret = ctdb_event_script_callback(ctdb,
1549                                          ctdb,
1550                                          ctdb_setup_event_callback,
1551                                          ctdb,
1552                                          CTDB_EVENT_SETUP,
1553                                          "%s",
1554                                          "");
1555         if (ret != 0) {
1556                 DEBUG(DEBUG_CRIT,("Failed to set up 'setup' event\n"));
1557                 exit(1);
1558         }
1559
1560         lockdown_memory(ctdb->valgrinding);
1561
1562         /* go into a wait loop to allow other nodes to complete */
1563         tevent_loop_wait(ctdb->ev);
1564
1565         DEBUG(DEBUG_CRIT,("event_loop_wait() returned. this should not happen\n"));
1566         exit(1);
1567 }
1568
1569 /*
1570   allocate a packet for use in daemon<->daemon communication
1571  */
1572 struct ctdb_req_header *_ctdb_transport_allocate(struct ctdb_context *ctdb,
1573                                                  TALLOC_CTX *mem_ctx, 
1574                                                  enum ctdb_operation operation, 
1575                                                  size_t length, size_t slength,
1576                                                  const char *type)
1577 {
1578         int size;
1579         struct ctdb_req_header *hdr;
1580
1581         length = MAX(length, slength);
1582         size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
1583
1584         if (ctdb->methods == NULL) {
1585                 DEBUG(DEBUG_INFO,(__location__ " Unable to allocate transport packet for operation %u of length %u. Transport is DOWN.\n",
1586                          operation, (unsigned)length));
1587                 return NULL;
1588         }
1589
1590         hdr = (struct ctdb_req_header *)ctdb->methods->allocate_pkt(mem_ctx, size);
1591         if (hdr == NULL) {
1592                 DEBUG(DEBUG_ERR,("Unable to allocate transport packet for operation %u of length %u\n",
1593                          operation, (unsigned)length));
1594                 return NULL;
1595         }
1596         talloc_set_name_const(hdr, type);
1597         memset(hdr, 0, slength);
1598         hdr->length       = length;
1599         hdr->operation    = operation;
1600         hdr->ctdb_magic   = CTDB_MAGIC;
1601         hdr->ctdb_version = CTDB_PROTOCOL;
1602         hdr->generation   = ctdb->vnn_map->generation;
1603         hdr->srcnode      = ctdb->pnn;
1604
1605         return hdr;     
1606 }
1607
1608 struct daemon_control_state {
1609         struct daemon_control_state *next, *prev;
1610         struct ctdb_client *client;
1611         struct ctdb_req_control_old *c;
1612         uint32_t reqid;
1613         struct ctdb_node *node;
1614 };
1615
1616 /*
1617   callback when a control reply comes in
1618  */
1619 static void daemon_control_callback(struct ctdb_context *ctdb,
1620                                     int32_t status, TDB_DATA data, 
1621                                     const char *errormsg,
1622                                     void *private_data)
1623 {
1624         struct daemon_control_state *state = talloc_get_type(private_data, 
1625                                                              struct daemon_control_state);
1626         struct ctdb_client *client = state->client;
1627         struct ctdb_reply_control_old *r;
1628         size_t len;
1629         int ret;
1630
1631         /* construct a message to send to the client containing the data */
1632         len = offsetof(struct ctdb_reply_control_old, data) + data.dsize;
1633         if (errormsg) {
1634                 len += strlen(errormsg);
1635         }
1636         r = ctdbd_allocate_pkt(ctdb, state, CTDB_REPLY_CONTROL, len, 
1637                                struct ctdb_reply_control_old);
1638         CTDB_NO_MEMORY_VOID(ctdb, r);
1639
1640         r->hdr.reqid     = state->reqid;
1641         r->status        = status;
1642         r->datalen       = data.dsize;
1643         r->errorlen = 0;
1644         memcpy(&r->data[0], data.dptr, data.dsize);
1645         if (errormsg) {
1646                 r->errorlen = strlen(errormsg);
1647                 memcpy(&r->data[r->datalen], errormsg, r->errorlen);
1648         }
1649
1650         ret = daemon_queue_send(client, &r->hdr);
1651         if (ret != -1) {
1652                 talloc_free(state);
1653         }
1654 }
1655
1656 /*
1657   fail all pending controls to a disconnected node
1658  */
1659 void ctdb_daemon_cancel_controls(struct ctdb_context *ctdb, struct ctdb_node *node)
1660 {
1661         struct daemon_control_state *state;
1662         while ((state = node->pending_controls)) {
1663                 DLIST_REMOVE(node->pending_controls, state);
1664                 daemon_control_callback(ctdb, (uint32_t)-1, tdb_null, 
1665                                         "node is disconnected", state);
1666         }
1667 }
1668
1669 /*
1670   destroy a daemon_control_state
1671  */
1672 static int daemon_control_destructor(struct daemon_control_state *state)
1673 {
1674         if (state->node) {
1675                 DLIST_REMOVE(state->node->pending_controls, state);
1676         }
1677         return 0;
1678 }
1679
1680 /*
1681   this is called when the ctdb daemon received a ctdb request control
1682   from a local client over the unix domain socket
1683  */
1684 static void daemon_request_control_from_client(struct ctdb_client *client, 
1685                                                struct ctdb_req_control_old *c)
1686 {
1687         TDB_DATA data;
1688         int res;
1689         struct daemon_control_state *state;
1690         TALLOC_CTX *tmp_ctx = talloc_new(client);
1691
1692         if (c->hdr.destnode == CTDB_CURRENT_NODE) {
1693                 c->hdr.destnode = client->ctdb->pnn;
1694         }
1695
1696         state = talloc(client, struct daemon_control_state);
1697         CTDB_NO_MEMORY_VOID(client->ctdb, state);
1698
1699         state->client = client;
1700         state->c = talloc_steal(state, c);
1701         state->reqid = c->hdr.reqid;
1702         if (ctdb_validate_pnn(client->ctdb, c->hdr.destnode)) {
1703                 state->node = client->ctdb->nodes[c->hdr.destnode];
1704                 DLIST_ADD(state->node->pending_controls, state);
1705         } else {
1706                 state->node = NULL;
1707         }
1708
1709         talloc_set_destructor(state, daemon_control_destructor);
1710
1711         if (c->flags & CTDB_CTRL_FLAG_NOREPLY) {
1712                 talloc_steal(tmp_ctx, state);
1713         }
1714         
1715         data.dptr = &c->data[0];
1716         data.dsize = c->datalen;
1717         res = ctdb_daemon_send_control(client->ctdb, c->hdr.destnode,
1718                                        c->srvid, c->opcode, client->client_id,
1719                                        c->flags,
1720                                        data, daemon_control_callback,
1721                                        state);
1722         if (res != 0) {
1723                 DEBUG(DEBUG_ERR,(__location__ " Failed to send control to remote node %u\n",
1724                          c->hdr.destnode));
1725         }
1726
1727         talloc_free(tmp_ctx);
1728 }
1729
1730 static void daemon_request_tunnel_from_client(struct ctdb_client *client,
1731                                               struct ctdb_req_tunnel_old *c)
1732 {
1733         TDB_DATA data;
1734         int ret;
1735
1736         if (! ctdb_validate_pnn(client->ctdb, c->hdr.destnode)) {
1737                 DEBUG(DEBUG_ERR, ("Invalid destination 0x%x\n",
1738                                   c->hdr.destnode));
1739                 return;
1740         }
1741
1742         ret = srvid_exists(client->ctdb->tunnels, c->tunnel_id, NULL);
1743         if (ret != 0) {
1744                 DEBUG(DEBUG_ERR,
1745                       ("tunnel id 0x%"PRIx64" not registered, dropping pkt\n",
1746                        c->tunnel_id));
1747                 return;
1748         }
1749
1750         data = (TDB_DATA) {
1751                 .dsize = c->datalen,
1752                 .dptr = &c->data[0],
1753         };
1754
1755         ret = ctdb_daemon_send_tunnel(client->ctdb, c->hdr.destnode,
1756                                       c->tunnel_id, c->flags, data);
1757         if (ret != 0) {
1758                 DEBUG(DEBUG_ERR, ("Failed to set tunnel to remote note %u\n",
1759                                   c->hdr.destnode));
1760         }
1761 }
1762
1763 /*
1764   register a call function
1765 */
1766 int ctdb_daemon_set_call(struct ctdb_context *ctdb, uint32_t db_id,
1767                          ctdb_fn_t fn, int id)
1768 {
1769         struct ctdb_registered_call *call;
1770         struct ctdb_db_context *ctdb_db;
1771
1772         ctdb_db = find_ctdb_db(ctdb, db_id);
1773         if (ctdb_db == NULL) {
1774                 return -1;
1775         }
1776
1777         call = talloc(ctdb_db, struct ctdb_registered_call);
1778         call->fn = fn;
1779         call->id = id;
1780
1781         DLIST_ADD(ctdb_db->calls, call);        
1782         return 0;
1783 }
1784
1785
1786
1787 /*
1788   this local messaging handler is ugly, but is needed to prevent
1789   recursion in ctdb_send_message() when the destination node is the
1790   same as the source node
1791  */
1792 struct ctdb_local_message {
1793         struct ctdb_context *ctdb;
1794         uint64_t srvid;
1795         TDB_DATA data;
1796 };
1797
1798 static void ctdb_local_message_trigger(struct tevent_context *ev,
1799                                        struct tevent_timer *te,
1800                                        struct timeval t, void *private_data)
1801 {
1802         struct ctdb_local_message *m = talloc_get_type(
1803                 private_data, struct ctdb_local_message);
1804
1805         srvid_dispatch(m->ctdb->srv, m->srvid, CTDB_SRVID_ALL, m->data);
1806         talloc_free(m);
1807 }
1808
1809 static int ctdb_local_message(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data)
1810 {
1811         struct ctdb_local_message *m;
1812         m = talloc(ctdb, struct ctdb_local_message);
1813         CTDB_NO_MEMORY(ctdb, m);
1814
1815         m->ctdb = ctdb;
1816         m->srvid = srvid;
1817         m->data  = data;
1818         m->data.dptr = talloc_memdup(m, m->data.dptr, m->data.dsize);
1819         if (m->data.dptr == NULL) {
1820                 talloc_free(m);
1821                 return -1;
1822         }
1823
1824         /* this needs to be done as an event to prevent recursion */
1825         tevent_add_timer(ctdb->ev, m, timeval_zero(),
1826                          ctdb_local_message_trigger, m);
1827         return 0;
1828 }
1829
1830 /*
1831   send a ctdb message
1832 */
1833 int ctdb_daemon_send_message(struct ctdb_context *ctdb, uint32_t pnn,
1834                              uint64_t srvid, TDB_DATA data)
1835 {
1836         struct ctdb_req_message_old *r;
1837         int len;
1838
1839         if (ctdb->methods == NULL) {
1840                 DEBUG(DEBUG_INFO,(__location__ " Failed to send message. Transport is DOWN\n"));
1841                 return -1;
1842         }
1843
1844         /* see if this is a message to ourselves */
1845         if (pnn == ctdb->pnn) {
1846                 return ctdb_local_message(ctdb, srvid, data);
1847         }
1848
1849         len = offsetof(struct ctdb_req_message_old, data) + data.dsize;
1850         r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_MESSAGE, len,
1851                                     struct ctdb_req_message_old);
1852         CTDB_NO_MEMORY(ctdb, r);
1853
1854         r->hdr.destnode  = pnn;
1855         r->srvid         = srvid;
1856         r->datalen       = data.dsize;
1857         memcpy(&r->data[0], data.dptr, data.dsize);
1858
1859         ctdb_queue_packet(ctdb, &r->hdr);
1860
1861         talloc_free(r);
1862         return 0;
1863 }
1864
1865
1866
1867 struct ctdb_client_notify_list {
1868         struct ctdb_client_notify_list *next, *prev;
1869         struct ctdb_context *ctdb;
1870         uint64_t srvid;
1871         TDB_DATA data;
1872 };
1873
1874
1875 static int ctdb_client_notify_destructor(struct ctdb_client_notify_list *nl)
1876 {
1877         int ret;
1878
1879         DEBUG(DEBUG_ERR,("Sending client notify message for srvid:%llu\n", (unsigned long long)nl->srvid));
1880
1881         ret = ctdb_daemon_send_message(nl->ctdb, CTDB_BROADCAST_CONNECTED, (unsigned long long)nl->srvid, nl->data);
1882         if (ret != 0) {
1883                 DEBUG(DEBUG_ERR,("Failed to send client notify message\n"));
1884         }
1885
1886         return 0;
1887 }
1888
1889 int32_t ctdb_control_register_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1890 {
1891         struct ctdb_notify_data_old *notify = (struct ctdb_notify_data_old *)indata.dptr;
1892         struct ctdb_client *client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
1893         struct ctdb_client_notify_list *nl;
1894
1895         DEBUG(DEBUG_INFO,("Register srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
1896
1897         if (indata.dsize < offsetof(struct ctdb_notify_data_old, notify_data)) {
1898                 DEBUG(DEBUG_ERR,(__location__ " Too little data in control : %d\n", (int)indata.dsize));
1899                 return -1;
1900         }
1901
1902         if (indata.dsize != (notify->len + offsetof(struct ctdb_notify_data_old, notify_data))) {
1903                 DEBUG(DEBUG_ERR,(__location__ " Wrong amount of data in control. Got %d, expected %d\n", (int)indata.dsize, (int)(notify->len + offsetof(struct ctdb_notify_data_old, notify_data))));
1904                 return -1;
1905         }
1906
1907
1908         if (client == NULL) {
1909                 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1910                 return -1;
1911         }
1912
1913         for(nl=client->notify; nl; nl=nl->next) {
1914                 if (nl->srvid == notify->srvid) {
1915                         break;
1916                 }
1917         }
1918         if (nl != NULL) {
1919                 DEBUG(DEBUG_ERR,(__location__ " Notification for srvid:%llu already exists for this client\n", (unsigned long long)notify->srvid));
1920                 return -1;
1921         }
1922
1923         nl = talloc(client, struct ctdb_client_notify_list);
1924         CTDB_NO_MEMORY(ctdb, nl);
1925         nl->ctdb       = ctdb;
1926         nl->srvid      = notify->srvid;
1927         nl->data.dsize = notify->len;
1928         nl->data.dptr  = talloc_memdup(nl, notify->notify_data,
1929                                        nl->data.dsize);
1930         CTDB_NO_MEMORY(ctdb, nl->data.dptr);
1931         
1932         DLIST_ADD(client->notify, nl);
1933         talloc_set_destructor(nl, ctdb_client_notify_destructor);
1934
1935         return 0;
1936 }
1937
1938 int32_t ctdb_control_deregister_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1939 {
1940         uint64_t srvid = *(uint64_t *)indata.dptr;
1941         struct ctdb_client *client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
1942         struct ctdb_client_notify_list *nl;
1943
1944         DEBUG(DEBUG_INFO,("Deregister srvid %llu for client %d\n", (unsigned long long)srvid, client_id));
1945
1946         if (client == NULL) {
1947                 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1948                 return -1;
1949         }
1950
1951         for(nl=client->notify; nl; nl=nl->next) {
1952                 if (nl->srvid == srvid) {
1953                         break;
1954                 }
1955         }
1956         if (nl == NULL) {
1957                 DEBUG(DEBUG_ERR,(__location__ " No notification for srvid:%llu found for this client\n", (unsigned long long)srvid));
1958                 return -1;
1959         }
1960
1961         DLIST_REMOVE(client->notify, nl);
1962         talloc_set_destructor(nl, NULL);
1963         talloc_free(nl);
1964
1965         return 0;
1966 }
1967
1968 struct ctdb_client *ctdb_find_client_by_pid(struct ctdb_context *ctdb, pid_t pid)
1969 {
1970         struct ctdb_client_pid_list *client_pid;
1971
1972         for (client_pid = ctdb->client_pids; client_pid; client_pid=client_pid->next) {
1973                 if (client_pid->pid == pid) {
1974                         return client_pid->client;
1975                 }
1976         }
1977         return NULL;
1978 }
1979
1980
1981 /* This control is used by samba when probing if a process (of a samba daemon)
1982    exists on the node.
1983    Samba does this when it needs/wants to check if a subrecord in one of the
1984    databases is still valid, or if it is stale and can be removed.
1985    If the node is in unhealthy or stopped state we just kill of the samba
1986    process holding this sub-record and return to the calling samba that
1987    the process does not exist.
1988    This allows us to forcefully recall subrecords registered by samba processes
1989    on banned and stopped nodes.
1990 */
1991 int32_t ctdb_control_process_exists(struct ctdb_context *ctdb, pid_t pid)
1992 {
1993         struct ctdb_client *client;
1994
1995         client = ctdb_find_client_by_pid(ctdb, pid);
1996         if (client == NULL) {
1997                 return -1;
1998         }
1999
2000         if (ctdb->nodes[ctdb->pnn]->flags & NODE_FLAGS_INACTIVE) {
2001                 DEBUG(DEBUG_NOTICE,
2002                       ("Killing client with pid:%d on banned/stopped node\n",
2003                        (int)pid));
2004                 talloc_free(client);
2005                 return -1;
2006         }
2007
2008         return kill(pid, 0);
2009 }
2010
2011 int32_t ctdb_control_check_pid_srvid(struct ctdb_context *ctdb,
2012                                      TDB_DATA indata)
2013 {
2014         struct ctdb_client_pid_list *client_pid;
2015         pid_t pid;
2016         uint64_t srvid;
2017         int ret;
2018
2019         pid = *(pid_t *)indata.dptr;
2020         srvid = *(uint64_t *)(indata.dptr + sizeof(pid_t));
2021
2022         for (client_pid = ctdb->client_pids;
2023              client_pid != NULL;
2024              client_pid = client_pid->next) {
2025                 if (client_pid->pid == pid) {
2026                         ret = srvid_exists(ctdb->srv, srvid,
2027                                            client_pid->client);
2028                         if (ret == 0) {
2029                                 return 0;
2030                         }
2031                 }
2032         }
2033
2034         return -1;
2035 }
2036
2037 int ctdb_control_getnodesfile(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
2038 {
2039         struct ctdb_node_map_old *node_map = NULL;
2040
2041         CHECK_CONTROL_DATA_SIZE(0);
2042
2043         node_map = ctdb_read_nodes_file(ctdb, ctdb->nodes_file);
2044         if (node_map == NULL) {
2045                 DEBUG(DEBUG_ERR, ("Failed to read nodes file\n"));
2046                 return -1;
2047         }
2048
2049         outdata->dptr  = (unsigned char *)node_map;
2050         outdata->dsize = talloc_get_size(outdata->dptr);
2051
2052         return 0;
2053 }
2054
2055 void ctdb_shutdown_sequence(struct ctdb_context *ctdb, int exit_code)
2056 {
2057         if (ctdb->runstate == CTDB_RUNSTATE_SHUTDOWN) {
2058                 DEBUG(DEBUG_NOTICE,("Already shutting down so will not proceed.\n"));
2059                 return;
2060         }
2061
2062         DEBUG(DEBUG_ERR,("Shutdown sequence commencing.\n"));
2063         ctdb_set_runstate(ctdb, CTDB_RUNSTATE_SHUTDOWN);
2064         ctdb_stop_recoverd(ctdb);
2065         ctdb_stop_keepalive(ctdb);
2066         ctdb_stop_monitoring(ctdb);
2067         ctdb_release_all_ips(ctdb);
2068         ctdb_event_script(ctdb, CTDB_EVENT_SHUTDOWN);
2069         ctdb_stop_eventd(ctdb);
2070         if (ctdb->methods != NULL && ctdb->methods->shutdown != NULL) {
2071                 ctdb->methods->shutdown(ctdb);
2072         }
2073
2074         DEBUG(DEBUG_ERR,("Shutdown sequence complete, exiting.\n"));
2075         exit(exit_code);
2076 }
2077
2078 /* When forking the main daemon and the child process needs to connect
2079  * back to the daemon as a client process, this function can be used
2080  * to change the ctdb context from daemon into client mode.  The child
2081  * process must be created using ctdb_fork() and not fork() -
2082  * ctdb_fork() does some necessary housekeeping.
2083  */
2084 int switch_from_server_to_client(struct ctdb_context *ctdb)
2085 {
2086         int ret;
2087
2088         /* get a new event context */
2089         ctdb->ev = tevent_context_init(ctdb);
2090         if (ctdb->ev == NULL) {
2091                 DEBUG(DEBUG_ALERT,("tevent_context_init() failed\n"));
2092                 exit(1);
2093         }
2094         tevent_loop_allow_nesting(ctdb->ev);
2095
2096         /* Connect to main CTDB daemon */
2097         ret = ctdb_socket_connect(ctdb);
2098         if (ret != 0) {
2099                 DEBUG(DEBUG_ALERT, (__location__ " Failed to init ctdb client\n"));
2100                 return -1;
2101         }
2102
2103         ctdb->can_send_controls = true;
2104
2105         return 0;
2106 }