8ae43517a8563ea99e084f525e991b76bde9ef3c
[vlendec/samba-autobuild/.git] / ctdb / server / ctdb_daemon.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2006
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "replace.h"
21 #include "system/network.h"
22 #include "system/filesys.h"
23 #include "system/wait.h"
24 #include "system/time.h"
25
26 #include <talloc.h>
27 /* Allow use of deprecated function tevent_loop_allow_nesting() */
28 #define TEVENT_DEPRECATED
29 #include <tevent.h>
30 #include <tdb.h>
31
32 #include "lib/tdb_wrap/tdb_wrap.h"
33 #include "lib/util/dlinklist.h"
34 #include "lib/util/debug.h"
35 #include "lib/util/time.h"
36 #include "lib/util/blocking.h"
37 #include "lib/util/become_daemon.h"
38
39 #include "ctdb_version.h"
40 #include "ctdb_private.h"
41 #include "ctdb_client.h"
42
43 #include "common/rb_tree.h"
44 #include "common/reqid.h"
45 #include "common/system.h"
46 #include "common/common.h"
47 #include "common/logging.h"
48 #include "common/pidfile.h"
49 #include "common/sock_io.h"
50
51 struct ctdb_client_pid_list {
52         struct ctdb_client_pid_list *next, *prev;
53         struct ctdb_context *ctdb;
54         pid_t pid;
55         struct ctdb_client *client;
56 };
57
58 const char *ctdbd_pidfile = NULL;
59 static struct pidfile_context *ctdbd_pidfile_ctx = NULL;
60
61 static void daemon_incoming_packet(void *, struct ctdb_req_header *);
62
63 static pid_t __ctdbd_pid;
64
65 static void print_exit_message(void)
66 {
67         if (getpid() == __ctdbd_pid) {
68                 DEBUG(DEBUG_NOTICE,("CTDB daemon shutting down\n"));
69
70                 /* Wait a second to allow pending log messages to be flushed */
71                 sleep(1);
72         }
73 }
74
75
76
77 static void ctdb_time_tick(struct tevent_context *ev, struct tevent_timer *te,
78                                   struct timeval t, void *private_data)
79 {
80         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
81
82         if (getpid() != ctdb->ctdbd_pid) {
83                 return;
84         }
85
86         tevent_add_timer(ctdb->ev, ctdb,
87                          timeval_current_ofs(1, 0),
88                          ctdb_time_tick, ctdb);
89 }
90
91 /* Used to trigger a dummy event once per second, to make
92  * detection of hangs more reliable.
93  */
94 static void ctdb_start_time_tickd(struct ctdb_context *ctdb)
95 {
96         tevent_add_timer(ctdb->ev, ctdb,
97                          timeval_current_ofs(1, 0),
98                          ctdb_time_tick, ctdb);
99 }
100
101 static void ctdb_start_periodic_events(struct ctdb_context *ctdb)
102 {
103         /* start monitoring for connected/disconnected nodes */
104         ctdb_start_keepalive(ctdb);
105
106         /* start periodic update of tcp tickle lists */
107         ctdb_start_tcp_tickle_update(ctdb);
108
109         /* start listening for recovery daemon pings */
110         ctdb_control_recd_ping(ctdb);
111
112         /* start listening to timer ticks */
113         ctdb_start_time_tickd(ctdb);
114 }
115
116 static void ignore_signal(int signum)
117 {
118         struct sigaction act;
119
120         memset(&act, 0, sizeof(act));
121
122         act.sa_handler = SIG_IGN;
123         sigemptyset(&act.sa_mask);
124         sigaddset(&act.sa_mask, signum);
125         sigaction(signum, &act, NULL);
126 }
127
128
129 /*
130   send a packet to a client
131  */
132 static int daemon_queue_send(struct ctdb_client *client, struct ctdb_req_header *hdr)
133 {
134         CTDB_INCREMENT_STAT(client->ctdb, client_packets_sent);
135         if (hdr->operation == CTDB_REQ_MESSAGE) {
136                 if (ctdb_queue_length(client->queue) > client->ctdb->tunable.max_queue_depth_drop_msg) {
137                         DEBUG(DEBUG_ERR,("CTDB_REQ_MESSAGE queue full - killing client connection.\n"));
138                         talloc_free(client);
139                         return -1;
140                 }
141         }
142         return ctdb_queue_send(client->queue, (uint8_t *)hdr, hdr->length);
143 }
144
145 /*
146   message handler for when we are in daemon mode. This redirects the message
147   to the right client
148  */
149 static void daemon_message_handler(uint64_t srvid, TDB_DATA data,
150                                    void *private_data)
151 {
152         struct ctdb_client *client = talloc_get_type(private_data, struct ctdb_client);
153         struct ctdb_req_message_old *r;
154         int len;
155
156         /* construct a message to send to the client containing the data */
157         len = offsetof(struct ctdb_req_message_old, data) + data.dsize;
158         r = ctdbd_allocate_pkt(client->ctdb, client->ctdb, CTDB_REQ_MESSAGE,
159                                len, struct ctdb_req_message_old);
160         CTDB_NO_MEMORY_VOID(client->ctdb, r);
161
162         talloc_set_name_const(r, "req_message packet");
163
164         r->srvid         = srvid;
165         r->datalen       = data.dsize;
166         memcpy(&r->data[0], data.dptr, data.dsize);
167
168         daemon_queue_send(client, &r->hdr);
169
170         talloc_free(r);
171 }
172
173 /*
174   this is called when the ctdb daemon received a ctdb request to 
175   set the srvid from the client
176  */
177 int daemon_register_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
178 {
179         struct ctdb_client *client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
180         int res;
181         if (client == NULL) {
182                 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_register_message_handler\n"));
183                 return -1;
184         }
185         res = srvid_register(ctdb->srv, client, srvid, daemon_message_handler,
186                              client);
187         if (res != 0) {
188                 DEBUG(DEBUG_ERR,(__location__ " Failed to register handler %llu in daemon\n", 
189                          (unsigned long long)srvid));
190         } else {
191                 DEBUG(DEBUG_INFO,(__location__ " Registered message handler for srvid=%llu\n", 
192                          (unsigned long long)srvid));
193         }
194
195         return res;
196 }
197
198 /*
199   this is called when the ctdb daemon received a ctdb request to 
200   remove a srvid from the client
201  */
202 int daemon_deregister_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
203 {
204         struct ctdb_client *client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
205         if (client == NULL) {
206                 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_deregister_message_handler\n"));
207                 return -1;
208         }
209         return srvid_deregister(ctdb->srv, srvid, client);
210 }
211
212 /*
213   destroy a ctdb_client
214 */
215 static int ctdb_client_destructor(struct ctdb_client *client)
216 {
217         struct ctdb_db_context *ctdb_db;
218
219         ctdb_takeover_client_destructor_hook(client);
220         reqid_remove(client->ctdb->idr, client->client_id);
221         client->ctdb->num_clients--;
222
223         if (client->num_persistent_updates != 0) {
224                 DEBUG(DEBUG_ERR,(__location__ " Client disconnecting with %u persistent updates in flight. Starting recovery\n", client->num_persistent_updates));
225                 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
226         }
227         ctdb_db = find_ctdb_db(client->ctdb, client->db_id);
228         if (ctdb_db) {
229                 DEBUG(DEBUG_ERR, (__location__ " client exit while transaction "
230                                   "commit active. Forcing recovery.\n"));
231                 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
232
233                 /*
234                  * trans3 transaction state:
235                  *
236                  * The destructor sets the pointer to NULL.
237                  */
238                 talloc_free(ctdb_db->persistent_state);
239         }
240
241         return 0;
242 }
243
244
245 /*
246   this is called when the ctdb daemon received a ctdb request message
247   from a local client over the unix domain socket
248  */
249 static void daemon_request_message_from_client(struct ctdb_client *client, 
250                                                struct ctdb_req_message_old *c)
251 {
252         TDB_DATA data;
253         int res;
254
255         if (c->hdr.destnode == CTDB_CURRENT_NODE) {
256                 c->hdr.destnode = ctdb_get_pnn(client->ctdb);
257         }
258
259         /* maybe the message is for another client on this node */
260         if (ctdb_get_pnn(client->ctdb)==c->hdr.destnode) {
261                 ctdb_request_message(client->ctdb, (struct ctdb_req_header *)c);
262                 return;
263         }
264
265         /* its for a remote node */
266         data.dptr = &c->data[0];
267         data.dsize = c->datalen;
268         res = ctdb_daemon_send_message(client->ctdb, c->hdr.destnode,
269                                        c->srvid, data);
270         if (res != 0) {
271                 DEBUG(DEBUG_ERR,(__location__ " Failed to send message to remote node %u\n",
272                          c->hdr.destnode));
273         }
274 }
275
276
277 struct daemon_call_state {
278         struct ctdb_client *client;
279         uint32_t reqid;
280         struct ctdb_call *call;
281         struct timeval start_time;
282
283         /* readonly request ? */
284         uint32_t readonly_fetch;
285         uint32_t client_callid;
286 };
287
288 /* 
289    complete a call from a client 
290 */
291 static void daemon_call_from_client_callback(struct ctdb_call_state *state)
292 {
293         struct daemon_call_state *dstate = talloc_get_type(state->async.private_data, 
294                                                            struct daemon_call_state);
295         struct ctdb_reply_call_old *r;
296         int res;
297         uint32_t length;
298         struct ctdb_client *client = dstate->client;
299         struct ctdb_db_context *ctdb_db = state->ctdb_db;
300
301         talloc_steal(client, dstate);
302         talloc_steal(dstate, dstate->call);
303
304         res = ctdb_daemon_call_recv(state, dstate->call);
305         if (res != 0) {
306                 DEBUG(DEBUG_ERR, (__location__ " ctdbd_call_recv() returned error\n"));
307                 CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
308
309                 CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 1", call_latency, dstate->start_time);
310                 return;
311         }
312
313         length = offsetof(struct ctdb_reply_call_old, data) + dstate->call->reply_data.dsize;
314         /* If the client asked for readonly FETCH, we remapped this to 
315            FETCH_WITH_HEADER when calling the daemon. So we must
316            strip the extra header off the reply data before passing
317            it back to the client.
318         */
319         if (dstate->readonly_fetch
320         && dstate->client_callid == CTDB_FETCH_FUNC) {
321                 length -= sizeof(struct ctdb_ltdb_header);
322         }
323
324         r = ctdbd_allocate_pkt(client->ctdb, dstate, CTDB_REPLY_CALL, 
325                                length, struct ctdb_reply_call_old);
326         if (r == NULL) {
327                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate reply_call in ctdb daemon\n"));
328                 CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
329                 CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 2", call_latency, dstate->start_time);
330                 return;
331         }
332         r->hdr.reqid        = dstate->reqid;
333         r->status           = dstate->call->status;
334
335         if (dstate->readonly_fetch
336         && dstate->client_callid == CTDB_FETCH_FUNC) {
337                 /* client only asked for a FETCH so we must strip off
338                    the extra ctdb_ltdb header
339                 */
340                 r->datalen          = dstate->call->reply_data.dsize - sizeof(struct ctdb_ltdb_header);
341                 memcpy(&r->data[0], dstate->call->reply_data.dptr + sizeof(struct ctdb_ltdb_header), r->datalen);
342         } else {
343                 r->datalen          = dstate->call->reply_data.dsize;
344                 memcpy(&r->data[0], dstate->call->reply_data.dptr, r->datalen);
345         }
346
347         res = daemon_queue_send(client, &r->hdr);
348         if (res == -1) {
349                 /* client is dead - return immediately */
350                 return;
351         }
352         if (res != 0) {
353                 DEBUG(DEBUG_ERR, (__location__ " Failed to queue packet from daemon to client\n"));
354         }
355         CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 3", call_latency, dstate->start_time);
356         CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
357         talloc_free(dstate);
358 }
359
360 struct ctdb_daemon_packet_wrap {
361         struct ctdb_context *ctdb;
362         uint32_t client_id;
363 };
364
365 /*
366   a wrapper to catch disconnected clients
367  */
368 static void daemon_incoming_packet_wrap(void *p, struct ctdb_req_header *hdr)
369 {
370         struct ctdb_client *client;
371         struct ctdb_daemon_packet_wrap *w = talloc_get_type(p, 
372                                                             struct ctdb_daemon_packet_wrap);
373         if (w == NULL) {
374                 DEBUG(DEBUG_CRIT,(__location__ " Bad packet type '%s'\n", talloc_get_name(p)));
375                 return;
376         }
377
378         client = reqid_find(w->ctdb->idr, w->client_id, struct ctdb_client);
379         if (client == NULL) {
380                 DEBUG(DEBUG_ERR,(__location__ " Packet for disconnected client %u\n",
381                          w->client_id));
382                 talloc_free(w);
383                 return;
384         }
385         talloc_free(w);
386
387         /* process it */
388         daemon_incoming_packet(client, hdr);    
389 }
390
391 struct ctdb_deferred_fetch_call {
392         struct ctdb_deferred_fetch_call *next, *prev;
393         struct ctdb_req_call_old *c;
394         struct ctdb_daemon_packet_wrap *w;
395 };
396
397 struct ctdb_deferred_fetch_queue {
398         struct ctdb_deferred_fetch_call *deferred_calls;
399 };
400
401 struct ctdb_deferred_requeue {
402         struct ctdb_deferred_fetch_call *dfc;
403         struct ctdb_client *client;
404 };
405
406 /* called from a timer event and starts reprocessing the deferred call.*/
407 static void reprocess_deferred_call(struct tevent_context *ev,
408                                     struct tevent_timer *te,
409                                     struct timeval t, void *private_data)
410 {
411         struct ctdb_deferred_requeue *dfr = (struct ctdb_deferred_requeue *)private_data;
412         struct ctdb_client *client = dfr->client;
413
414         talloc_steal(client, dfr->dfc->c);
415         daemon_incoming_packet(client, (struct ctdb_req_header *)dfr->dfc->c);
416         talloc_free(dfr);
417 }
418
419 /* the referral context is destroyed either after a timeout or when the initial
420    fetch-lock has finished.
421    at this stage, immediately start reprocessing the queued up deferred
422    calls so they get reprocessed immediately (and since we are dmaster at
423    this stage, trigger the waiting smbd processes to pick up and aquire the
424    record right away.
425 */
426 static int deferred_fetch_queue_destructor(struct ctdb_deferred_fetch_queue *dfq)
427 {
428
429         /* need to reprocess the packets from the queue explicitely instead of
430            just using a normal destructor since we want, need, to
431            call the clients in the same oder as the requests queued up
432         */
433         while (dfq->deferred_calls != NULL) {
434                 struct ctdb_client *client;
435                 struct ctdb_deferred_fetch_call *dfc = dfq->deferred_calls;
436                 struct ctdb_deferred_requeue *dfr;
437
438                 DLIST_REMOVE(dfq->deferred_calls, dfc);
439
440                 client = reqid_find(dfc->w->ctdb->idr, dfc->w->client_id, struct ctdb_client);
441                 if (client == NULL) {
442                         DEBUG(DEBUG_ERR,(__location__ " Packet for disconnected client %u\n",
443                                  dfc->w->client_id));
444                         continue;
445                 }
446
447                 /* process it by pushing it back onto the eventloop */
448                 dfr = talloc(client, struct ctdb_deferred_requeue);
449                 if (dfr == NULL) {
450                         DEBUG(DEBUG_ERR,("Failed to allocate deferred fetch requeue structure\n"));
451                         continue;
452                 }
453
454                 dfr->dfc    = talloc_steal(dfr, dfc);
455                 dfr->client = client;
456
457                 tevent_add_timer(dfc->w->ctdb->ev, client, timeval_zero(),
458                                  reprocess_deferred_call, dfr);
459         }
460
461         return 0;
462 }
463
464 /* insert the new deferral context into the rb tree.
465    there should never be a pre-existing context here, but check for it
466    warn and destroy the previous context if there is already a deferral context
467    for this key.
468 */
469 static void *insert_dfq_callback(void *parm, void *data)
470 {
471         if (data) {
472                 DEBUG(DEBUG_ERR,("Already have DFQ registered. Free old %p and create new %p\n", data, parm));
473                 talloc_free(data);
474         }
475         return parm;
476 }
477
478 /* if the original fetch-lock did not complete within a reasonable time,
479    free the context and context for all deferred requests to cause them to be
480    re-inserted into the event system.
481 */
482 static void dfq_timeout(struct tevent_context *ev, struct tevent_timer *te,
483                         struct timeval t, void *private_data)
484 {
485         talloc_free(private_data);
486 }
487
488 /* This function is used in the local daemon to register a KEY in a database
489    for being "fetched"
490    While the remote fetch is in-flight, any futher attempts to re-fetch the
491    same record will be deferred until the fetch completes.
492 */
493 static int setup_deferred_fetch_locks(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
494 {
495         uint32_t *k;
496         struct ctdb_deferred_fetch_queue *dfq;
497
498         k = ctdb_key_to_idkey(call, call->key);
499         if (k == NULL) {
500                 DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch\n"));
501                 return -1;
502         }
503
504         dfq  = talloc(call, struct ctdb_deferred_fetch_queue);
505         if (dfq == NULL) {
506                 DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch queue structure\n"));
507                 talloc_free(k);
508                 return -1;
509         }
510         dfq->deferred_calls = NULL;
511
512         trbt_insertarray32_callback(ctdb_db->deferred_fetch, k[0], &k[0], insert_dfq_callback, dfq);
513
514         talloc_set_destructor(dfq, deferred_fetch_queue_destructor);
515
516         /* if the fetch havent completed in 30 seconds, just tear it all down
517            and let it try again as the events are reissued */
518         tevent_add_timer(ctdb_db->ctdb->ev, dfq, timeval_current_ofs(30, 0),
519                          dfq_timeout, dfq);
520
521         talloc_free(k);
522         return 0;
523 }
524
525 /* check if this is a duplicate request to a fetch already in-flight
526    if it is, make this call deferred to be reprocessed later when
527    the in-flight fetch completes.
528 */
529 static int requeue_duplicate_fetch(struct ctdb_db_context *ctdb_db, struct ctdb_client *client, TDB_DATA key, struct ctdb_req_call_old *c)
530 {
531         uint32_t *k;
532         struct ctdb_deferred_fetch_queue *dfq;
533         struct ctdb_deferred_fetch_call *dfc;
534
535         k = ctdb_key_to_idkey(c, key);
536         if (k == NULL) {
537                 DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch\n"));
538                 return -1;
539         }
540
541         dfq = trbt_lookuparray32(ctdb_db->deferred_fetch, k[0], &k[0]);
542         if (dfq == NULL) {
543                 talloc_free(k);
544                 return -1;
545         }
546
547
548         talloc_free(k);
549
550         dfc = talloc(dfq, struct ctdb_deferred_fetch_call);
551         if (dfc == NULL) {
552                 DEBUG(DEBUG_ERR, ("Failed to allocate deferred fetch call structure\n"));
553                 return -1;
554         }
555
556         dfc->w = talloc(dfc, struct ctdb_daemon_packet_wrap);
557         if (dfc->w == NULL) {
558                 DEBUG(DEBUG_ERR,("Failed to allocate deferred fetch daemon packet wrap structure\n"));
559                 talloc_free(dfc);
560                 return -1;
561         }
562
563         dfc->c = talloc_steal(dfc, c);
564         dfc->w->ctdb = ctdb_db->ctdb;
565         dfc->w->client_id = client->client_id;
566
567         DLIST_ADD_END(dfq->deferred_calls, dfc);
568
569         return 0;
570 }
571
572
573 /*
574   this is called when the ctdb daemon received a ctdb request call
575   from a local client over the unix domain socket
576  */
577 static void daemon_request_call_from_client(struct ctdb_client *client, 
578                                             struct ctdb_req_call_old *c)
579 {
580         struct ctdb_call_state *state;
581         struct ctdb_db_context *ctdb_db;
582         struct daemon_call_state *dstate;
583         struct ctdb_call *call;
584         struct ctdb_ltdb_header header;
585         TDB_DATA key, data;
586         int ret;
587         struct ctdb_context *ctdb = client->ctdb;
588         struct ctdb_daemon_packet_wrap *w;
589
590         CTDB_INCREMENT_STAT(ctdb, total_calls);
591         CTDB_INCREMENT_STAT(ctdb, pending_calls);
592
593         ctdb_db = find_ctdb_db(client->ctdb, c->db_id);
594         if (!ctdb_db) {
595                 DEBUG(DEBUG_ERR, (__location__ " Unknown database in request. db_id==0x%08x",
596                           c->db_id));
597                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
598                 return;
599         }
600
601         if (ctdb_db->unhealthy_reason) {
602                 /*
603                  * this is just a warning, as the tdb should be empty anyway,
604                  * and only persistent databases can be unhealthy, which doesn't
605                  * use this code patch
606                  */
607                 DEBUG(DEBUG_WARNING,("warn: db(%s) unhealty in daemon_request_call_from_client(): %s\n",
608                                      ctdb_db->db_name, ctdb_db->unhealthy_reason));
609         }
610
611         key.dptr = c->data;
612         key.dsize = c->keylen;
613
614         w = talloc(ctdb, struct ctdb_daemon_packet_wrap);
615         CTDB_NO_MEMORY_VOID(ctdb, w);   
616
617         w->ctdb = ctdb;
618         w->client_id = client->client_id;
619
620         ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header, 
621                                            (struct ctdb_req_header *)c, &data,
622                                            daemon_incoming_packet_wrap, w, true);
623         if (ret == -2) {
624                 /* will retry later */
625                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
626                 return;
627         }
628
629         talloc_free(w);
630
631         if (ret != 0) {
632                 DEBUG(DEBUG_ERR,(__location__ " Unable to fetch record\n"));
633                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
634                 return;
635         }
636
637
638         /* check if this fetch request is a duplicate for a
639            request we already have in flight. If so defer it until
640            the first request completes.
641         */
642         if (ctdb->tunable.fetch_collapse == 1) {
643                 if (requeue_duplicate_fetch(ctdb_db, client, key, c) == 0) {
644                         ret = ctdb_ltdb_unlock(ctdb_db, key);
645                         if (ret != 0) {
646                                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
647                         }
648                         CTDB_DECREMENT_STAT(ctdb, pending_calls);
649                         talloc_free(data.dptr);
650                         return;
651                 }
652         }
653
654         /* Dont do READONLY if we don't have a tracking database */
655         if ((c->flags & CTDB_WANT_READONLY) && !ctdb_db_readonly(ctdb_db)) {
656                 c->flags &= ~CTDB_WANT_READONLY;
657         }
658
659         if (header.flags & CTDB_REC_RO_REVOKE_COMPLETE) {
660                 header.flags &= ~CTDB_REC_RO_FLAGS;
661                 CTDB_INCREMENT_STAT(ctdb, total_ro_revokes);
662                 CTDB_INCREMENT_DB_STAT(ctdb_db, db_ro_revokes);
663                 if (ctdb_ltdb_store(ctdb_db, key, &header, data) != 0) {
664                         ctdb_fatal(ctdb, "Failed to write header with cleared REVOKE flag");
665                 }
666                 /* and clear out the tracking data */
667                 if (tdb_delete(ctdb_db->rottdb, key) != 0) {
668                         DEBUG(DEBUG_ERR,(__location__ " Failed to clear out trackingdb record\n"));
669                 }
670         }
671
672         /* if we are revoking, we must defer all other calls until the revoke
673          * had completed.
674          */
675         if (header.flags & CTDB_REC_RO_REVOKING_READONLY) {
676                 talloc_free(data.dptr);
677                 ret = ctdb_ltdb_unlock(ctdb_db, key);
678
679                 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, key, (struct ctdb_req_header *)c, daemon_incoming_packet, client) != 0) {
680                         ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
681                 }
682                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
683                 return;
684         }
685
686         if ((header.dmaster == ctdb->pnn)
687         && (!(c->flags & CTDB_WANT_READONLY))
688         && (header.flags & (CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY)) ) {
689                 header.flags   |= CTDB_REC_RO_REVOKING_READONLY;
690                 if (ctdb_ltdb_store(ctdb_db, key, &header, data) != 0) {
691                         ctdb_fatal(ctdb, "Failed to store record with HAVE_DELEGATIONS set");
692                 }
693                 ret = ctdb_ltdb_unlock(ctdb_db, key);
694
695                 if (ctdb_start_revoke_ro_record(ctdb, ctdb_db, key, &header, data) != 0) {
696                         ctdb_fatal(ctdb, "Failed to start record revoke");
697                 }
698                 talloc_free(data.dptr);
699
700                 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, key, (struct ctdb_req_header *)c, daemon_incoming_packet, client) != 0) {
701                         ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
702                 }
703
704                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
705                 return;
706         }               
707
708         dstate = talloc(client, struct daemon_call_state);
709         if (dstate == NULL) {
710                 ret = ctdb_ltdb_unlock(ctdb_db, key);
711                 if (ret != 0) {
712                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
713                 }
714
715                 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate dstate\n"));
716                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
717                 return;
718         }
719         dstate->start_time = timeval_current();
720         dstate->client = client;
721         dstate->reqid  = c->hdr.reqid;
722         talloc_steal(dstate, data.dptr);
723
724         call = dstate->call = talloc_zero(dstate, struct ctdb_call);
725         if (call == NULL) {
726                 ret = ctdb_ltdb_unlock(ctdb_db, key);
727                 if (ret != 0) {
728                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
729                 }
730
731                 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate call\n"));
732                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
733                 CTDB_UPDATE_LATENCY(ctdb, ctdb_db, "call_from_client 1", call_latency, dstate->start_time);
734                 return;
735         }
736
737         dstate->readonly_fetch = 0;
738         call->call_id = c->callid;
739         call->key = key;
740         call->call_data.dptr = c->data + c->keylen;
741         call->call_data.dsize = c->calldatalen;
742         call->flags = c->flags;
743
744         if (c->flags & CTDB_WANT_READONLY) {
745                 /* client wants readonly record, so translate this into a 
746                    fetch with header. remember what the client asked for
747                    so we can remap the reply back to the proper format for
748                    the client in the reply
749                  */
750                 dstate->client_callid = call->call_id;
751                 call->call_id = CTDB_FETCH_WITH_HEADER_FUNC;
752                 dstate->readonly_fetch = 1;
753         }
754
755         if (header.dmaster == ctdb->pnn) {
756                 state = ctdb_call_local_send(ctdb_db, call, &header, &data);
757         } else {
758                 state = ctdb_daemon_call_send_remote(ctdb_db, call, &header);
759                 if (ctdb->tunable.fetch_collapse == 1) {
760                         /* This request triggered a remote fetch-lock.
761                            set up a deferral for this key so any additional
762                            fetch-locks are deferred until the current one
763                            finishes.
764                          */
765                         setup_deferred_fetch_locks(ctdb_db, call);
766                 }
767         }
768
769         ret = ctdb_ltdb_unlock(ctdb_db, key);
770         if (ret != 0) {
771                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
772         }
773
774         if (state == NULL) {
775                 DEBUG(DEBUG_ERR,(__location__ " Unable to setup call send\n"));
776                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
777                 CTDB_UPDATE_LATENCY(ctdb, ctdb_db, "call_from_client 2", call_latency, dstate->start_time);
778                 return;
779         }
780         talloc_steal(state, dstate);
781         talloc_steal(client, state);
782
783         state->async.fn = daemon_call_from_client_callback;
784         state->async.private_data = dstate;
785 }
786
787
788 static void daemon_request_control_from_client(struct ctdb_client *client, 
789                                                struct ctdb_req_control_old *c);
790
791 /* data contains a packet from the client */
792 static void daemon_incoming_packet(void *p, struct ctdb_req_header *hdr)
793 {
794         struct ctdb_client *client = talloc_get_type(p, struct ctdb_client);
795         TALLOC_CTX *tmp_ctx;
796         struct ctdb_context *ctdb = client->ctdb;
797
798         /* place the packet as a child of a tmp_ctx. We then use
799            talloc_free() below to free it. If any of the calls want
800            to keep it, then they will steal it somewhere else, and the
801            talloc_free() will be a no-op */
802         tmp_ctx = talloc_new(client);
803         talloc_steal(tmp_ctx, hdr);
804
805         if (hdr->ctdb_magic != CTDB_MAGIC) {
806                 ctdb_set_error(client->ctdb, "Non CTDB packet rejected in daemon\n");
807                 goto done;
808         }
809
810         if (hdr->ctdb_version != CTDB_PROTOCOL) {
811                 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
812                 goto done;
813         }
814
815         switch (hdr->operation) {
816         case CTDB_REQ_CALL:
817                 CTDB_INCREMENT_STAT(ctdb, client.req_call);
818                 daemon_request_call_from_client(client, (struct ctdb_req_call_old *)hdr);
819                 break;
820
821         case CTDB_REQ_MESSAGE:
822                 CTDB_INCREMENT_STAT(ctdb, client.req_message);
823                 daemon_request_message_from_client(client, (struct ctdb_req_message_old *)hdr);
824                 break;
825
826         case CTDB_REQ_CONTROL:
827                 CTDB_INCREMENT_STAT(ctdb, client.req_control);
828                 daemon_request_control_from_client(client, (struct ctdb_req_control_old *)hdr);
829                 break;
830
831         default:
832                 DEBUG(DEBUG_CRIT,(__location__ " daemon: unrecognized operation %u\n",
833                          hdr->operation));
834         }
835
836 done:
837         talloc_free(tmp_ctx);
838 }
839
840 /*
841   called when the daemon gets a incoming packet
842  */
843 static void ctdb_daemon_read_cb(uint8_t *data, size_t cnt, void *args)
844 {
845         struct ctdb_client *client = talloc_get_type(args, struct ctdb_client);
846         struct ctdb_req_header *hdr;
847
848         if (cnt == 0) {
849                 talloc_free(client);
850                 return;
851         }
852
853         CTDB_INCREMENT_STAT(client->ctdb, client_packets_recv);
854
855         if (cnt < sizeof(*hdr)) {
856                 ctdb_set_error(client->ctdb, "Bad packet length %u in daemon\n", 
857                                (unsigned)cnt);
858                 return;
859         }
860         hdr = (struct ctdb_req_header *)data;
861         if (cnt != hdr->length) {
862                 ctdb_set_error(client->ctdb, "Bad header length %u expected %u\n in daemon", 
863                                (unsigned)hdr->length, (unsigned)cnt);
864                 return;
865         }
866
867         if (hdr->ctdb_magic != CTDB_MAGIC) {
868                 ctdb_set_error(client->ctdb, "Non CTDB packet rejected\n");
869                 return;
870         }
871
872         if (hdr->ctdb_version != CTDB_PROTOCOL) {
873                 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
874                 return;
875         }
876
877         DEBUG(DEBUG_DEBUG,(__location__ " client request %u of type %u length %u from "
878                  "node %u to %u\n", hdr->reqid, hdr->operation, hdr->length,
879                  hdr->srcnode, hdr->destnode));
880
881         /* it is the responsibility of the incoming packet function to free 'data' */
882         daemon_incoming_packet(client, hdr);
883 }
884
885
886 static int ctdb_clientpid_destructor(struct ctdb_client_pid_list *client_pid)
887 {
888         if (client_pid->ctdb->client_pids != NULL) {
889                 DLIST_REMOVE(client_pid->ctdb->client_pids, client_pid);
890         }
891
892         return 0;
893 }
894
895
896 static void ctdb_accept_client(struct tevent_context *ev,
897                                struct tevent_fd *fde, uint16_t flags,
898                                void *private_data)
899 {
900         struct sockaddr_un addr;
901         socklen_t len;
902         int fd;
903         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
904         struct ctdb_client *client;
905         struct ctdb_client_pid_list *client_pid;
906         pid_t peer_pid = 0;
907         int ret;
908
909         memset(&addr, 0, sizeof(addr));
910         len = sizeof(addr);
911         fd = accept(ctdb->daemon.sd, (struct sockaddr *)&addr, &len);
912         if (fd == -1) {
913                 return;
914         }
915
916         ret = set_blocking(fd, false);
917         if (ret != 0) {
918                 DEBUG(DEBUG_ERR,
919                       (__location__
920                        " failed to set socket non-blocking (%s)\n",
921                        strerror(errno)));
922                 close(fd);
923                 return;
924         }
925
926         set_close_on_exec(fd);
927
928         DEBUG(DEBUG_DEBUG,(__location__ " Created SOCKET FD:%d to connected child\n", fd));
929
930         client = talloc_zero(ctdb, struct ctdb_client);
931         if (ctdb_get_peer_pid(fd, &peer_pid) == 0) {
932                 DEBUG(DEBUG_INFO,("Connected client with pid:%u\n", (unsigned)peer_pid));
933         }
934
935         client->ctdb = ctdb;
936         client->fd = fd;
937         client->client_id = reqid_new(ctdb->idr, client);
938         client->pid = peer_pid;
939
940         client_pid = talloc(client, struct ctdb_client_pid_list);
941         if (client_pid == NULL) {
942                 DEBUG(DEBUG_ERR,("Failed to allocate client pid structure\n"));
943                 close(fd);
944                 talloc_free(client);
945                 return;
946         }               
947         client_pid->ctdb   = ctdb;
948         client_pid->pid    = peer_pid;
949         client_pid->client = client;
950
951         DLIST_ADD(ctdb->client_pids, client_pid);
952
953         client->queue = ctdb_queue_setup(ctdb, client, fd, CTDB_DS_ALIGNMENT, 
954                                          ctdb_daemon_read_cb, client,
955                                          "client-%u", client->pid);
956
957         talloc_set_destructor(client, ctdb_client_destructor);
958         talloc_set_destructor(client_pid, ctdb_clientpid_destructor);
959         ctdb->num_clients++;
960 }
961
962
963
964 /*
965   create a unix domain socket and bind it
966   return a file descriptor open on the socket 
967 */
968 static int ux_socket_bind(struct ctdb_context *ctdb)
969 {
970         struct sockaddr_un addr;
971         int ret;
972
973         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
974         if (ctdb->daemon.sd == -1) {
975                 return -1;
976         }
977
978         memset(&addr, 0, sizeof(addr));
979         addr.sun_family = AF_UNIX;
980         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path)-1);
981
982         if (! sock_clean(ctdb->daemon.name)) {
983                 return -1;
984         }
985
986         set_close_on_exec(ctdb->daemon.sd);
987
988         ret = set_blocking(ctdb->daemon.sd, false);
989         if (ret != 0) {
990                 DEBUG(DEBUG_ERR,
991                       (__location__
992                        " failed to set socket non-blocking (%s)\n",
993                        strerror(errno)));
994                 goto failed;
995         }
996
997         if (bind(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
998                 DEBUG(DEBUG_CRIT,("Unable to bind on ctdb socket '%s'\n", ctdb->daemon.name));
999                 goto failed;
1000         }
1001
1002         if (chown(ctdb->daemon.name, geteuid(), getegid()) != 0 ||
1003             chmod(ctdb->daemon.name, 0700) != 0) {
1004                 DEBUG(DEBUG_CRIT,("Unable to secure ctdb socket '%s', ctdb->daemon.name\n", ctdb->daemon.name));
1005                 goto failed;
1006         }
1007
1008
1009         if (listen(ctdb->daemon.sd, 100) != 0) {
1010                 DEBUG(DEBUG_CRIT,("Unable to listen on ctdb socket '%s'\n", ctdb->daemon.name));
1011                 goto failed;
1012         }
1013
1014         DEBUG(DEBUG_NOTICE, ("Listening to ctdb socket %s\n",
1015                              ctdb->daemon.name));
1016         return 0;
1017
1018 failed:
1019         close(ctdb->daemon.sd);
1020         ctdb->daemon.sd = -1;
1021         return -1;      
1022 }
1023
1024 static void initialise_node_flags (struct ctdb_context *ctdb)
1025 {
1026         if (ctdb->pnn == -1) {
1027                 ctdb_fatal(ctdb, "PNN is set to -1 (unknown value)");
1028         }
1029
1030         ctdb->nodes[ctdb->pnn]->flags &= ~NODE_FLAGS_DISCONNECTED;
1031
1032         /* do we start out in DISABLED mode? */
1033         if (ctdb->start_as_disabled != 0) {
1034                 DEBUG(DEBUG_ERR,
1035                       ("This node is configured to start in DISABLED state\n"));
1036                 ctdb->nodes[ctdb->pnn]->flags |= NODE_FLAGS_DISABLED;
1037         }
1038         /* do we start out in STOPPED mode? */
1039         if (ctdb->start_as_stopped != 0) {
1040                 DEBUG(DEBUG_ERR,
1041                       ("This node is configured to start in STOPPED state\n"));
1042                 ctdb->nodes[ctdb->pnn]->flags |= NODE_FLAGS_STOPPED;
1043         }
1044 }
1045
1046 static void ctdb_setup_event_callback(struct ctdb_context *ctdb, int status,
1047                                       void *private_data)
1048 {
1049         if (status != 0) {
1050                 ctdb_die(ctdb, "Failed to run setup event");
1051         }
1052         ctdb_run_notification_script(ctdb, "setup");
1053
1054         /* tell all other nodes we've just started up */
1055         ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL,
1056                                  0, CTDB_CONTROL_STARTUP, 0,
1057                                  CTDB_CTRL_FLAG_NOREPLY,
1058                                  tdb_null, NULL, NULL);
1059
1060         /* Start the recovery daemon */
1061         if (ctdb_start_recoverd(ctdb) != 0) {
1062                 DEBUG(DEBUG_ALERT,("Failed to start recovery daemon\n"));
1063                 exit(11);
1064         }
1065
1066         ctdb_start_periodic_events(ctdb);
1067
1068         ctdb_wait_for_first_recovery(ctdb);
1069 }
1070
1071 static struct timeval tevent_before_wait_ts;
1072 static struct timeval tevent_after_wait_ts;
1073
1074 static void ctdb_tevent_trace_init(void)
1075 {
1076         struct timeval now;
1077
1078         now = timeval_current();
1079
1080         tevent_before_wait_ts = now;
1081         tevent_after_wait_ts = now;
1082 }
1083
1084 static void ctdb_tevent_trace(enum tevent_trace_point tp,
1085                               void *private_data)
1086 {
1087         struct timeval diff;
1088         struct timeval now;
1089         struct ctdb_context *ctdb =
1090                 talloc_get_type(private_data, struct ctdb_context);
1091
1092         if (getpid() != ctdb->ctdbd_pid) {
1093                 return;
1094         }
1095
1096         now = timeval_current();
1097
1098         switch (tp) {
1099         case TEVENT_TRACE_BEFORE_WAIT:
1100                 diff = timeval_until(&tevent_after_wait_ts, &now);
1101                 if (diff.tv_sec > 3) {
1102                         DEBUG(DEBUG_ERR,
1103                               ("Handling event took %ld seconds!\n",
1104                                (long)diff.tv_sec));
1105                 }
1106                 tevent_before_wait_ts = now;
1107                 break;
1108
1109         case TEVENT_TRACE_AFTER_WAIT:
1110                 diff = timeval_until(&tevent_before_wait_ts, &now);
1111                 if (diff.tv_sec > 3) {
1112                         DEBUG(DEBUG_ERR,
1113                               ("No event for %ld seconds!\n",
1114                                (long)diff.tv_sec));
1115                 }
1116                 tevent_after_wait_ts = now;
1117                 break;
1118
1119         default:
1120                 /* Do nothing for future tevent trace points */ ;
1121         }
1122 }
1123
1124 static void ctdb_remove_pidfile(void)
1125 {
1126         TALLOC_FREE(ctdbd_pidfile_ctx);
1127 }
1128
1129 static void ctdb_create_pidfile(TALLOC_CTX *mem_ctx)
1130 {
1131         if (ctdbd_pidfile != NULL) {
1132                 int ret = pidfile_context_create(mem_ctx, ctdbd_pidfile,
1133                                                  &ctdbd_pidfile_ctx);
1134                 if (ret != 0) {
1135                         DEBUG(DEBUG_ERR,
1136                               ("Failed to create PID file %s\n",
1137                                ctdbd_pidfile));
1138                         exit(11);
1139                 }
1140
1141                 DEBUG(DEBUG_NOTICE, ("Created PID file %s\n", ctdbd_pidfile));
1142                 atexit(ctdb_remove_pidfile);
1143         }
1144 }
1145
1146 static void ctdb_initialise_vnn_map(struct ctdb_context *ctdb)
1147 {
1148         int i, j, count;
1149
1150         /* initialize the vnn mapping table, skipping any deleted nodes */
1151         ctdb->vnn_map = talloc(ctdb, struct ctdb_vnn_map);
1152         CTDB_NO_MEMORY_FATAL(ctdb, ctdb->vnn_map);
1153
1154         count = 0;
1155         for (i = 0; i < ctdb->num_nodes; i++) {
1156                 if ((ctdb->nodes[i]->flags & NODE_FLAGS_DELETED) == 0) {
1157                         count++;
1158                 }
1159         }
1160
1161         ctdb->vnn_map->generation = INVALID_GENERATION;
1162         ctdb->vnn_map->size = count;
1163         ctdb->vnn_map->map = talloc_array(ctdb->vnn_map, uint32_t, ctdb->vnn_map->size);
1164         CTDB_NO_MEMORY_FATAL(ctdb, ctdb->vnn_map->map);
1165
1166         for(i=0, j=0; i < ctdb->vnn_map->size; i++) {
1167                 if (ctdb->nodes[i]->flags & NODE_FLAGS_DELETED) {
1168                         continue;
1169                 }
1170                 ctdb->vnn_map->map[j] = i;
1171                 j++;
1172         }
1173 }
1174
1175 static void ctdb_set_my_pnn(struct ctdb_context *ctdb)
1176 {
1177         int nodeid;
1178
1179         if (ctdb->address == NULL) {
1180                 ctdb_fatal(ctdb,
1181                            "Can not determine PNN - node address is not set\n");
1182         }
1183
1184         nodeid = ctdb_ip_to_nodeid(ctdb, ctdb->address);
1185         if (nodeid == -1) {
1186                 ctdb_fatal(ctdb,
1187                            "Can not determine PNN - node address not found in node list\n");
1188         }
1189
1190         ctdb->pnn = ctdb->nodes[nodeid]->pnn;
1191         DEBUG(DEBUG_NOTICE, ("PNN is %u\n", ctdb->pnn));
1192 }
1193
1194 /*
1195   start the protocol going as a daemon
1196 */
1197 int ctdb_start_daemon(struct ctdb_context *ctdb, bool do_fork)
1198 {
1199         int res, ret = -1;
1200         struct tevent_fd *fde;
1201
1202         become_daemon(do_fork, false, false);
1203
1204         ignore_signal(SIGPIPE);
1205         ignore_signal(SIGUSR1);
1206
1207         ctdb->ctdbd_pid = getpid();
1208         DEBUG(DEBUG_ERR, ("Starting CTDBD (Version %s) as PID: %u\n",
1209                           CTDB_VERSION_STRING, ctdb->ctdbd_pid));
1210         ctdb_create_pidfile(ctdb);
1211
1212         /* create a unix domain stream socket to listen to */
1213         res = ux_socket_bind(ctdb);
1214         if (res!=0) {
1215                 DEBUG(DEBUG_ALERT,("Cannot continue.  Exiting!\n"));
1216                 exit(10);
1217         }
1218
1219         /* Make sure we log something when the daemon terminates.
1220          * This must be the first exit handler to run (so the last to
1221          * be registered.
1222          */
1223         __ctdbd_pid = getpid();
1224         atexit(print_exit_message);
1225
1226         if (ctdb->do_setsched) {
1227                 /* try to set us up as realtime */
1228                 if (!set_scheduler()) {
1229                         exit(1);
1230                 }
1231                 DEBUG(DEBUG_NOTICE, ("Set real-time scheduler priority\n"));
1232         }
1233
1234         ctdb->ev = tevent_context_init(NULL);
1235         if (ctdb->ev == NULL) {
1236                 DEBUG(DEBUG_ALERT,("tevent_context_init() failed\n"));
1237                 exit(1);
1238         }
1239         tevent_loop_allow_nesting(ctdb->ev);
1240         ctdb_tevent_trace_init();
1241         tevent_set_trace_callback(ctdb->ev, ctdb_tevent_trace, ctdb);
1242
1243         /* set up a handler to pick up sigchld */
1244         if (ctdb_init_sigchld(ctdb) == NULL) {
1245                 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD\n"));
1246                 exit(1);
1247         }
1248
1249         if (do_fork) {
1250                 ctdb_set_child_logging(ctdb);
1251         }
1252
1253         TALLOC_FREE(ctdb->srv);
1254         if (srvid_init(ctdb, &ctdb->srv) != 0) {
1255                 DEBUG(DEBUG_CRIT,("Failed to setup message srvid context\n"));
1256                 exit(1);
1257         }
1258
1259         /* initialize statistics collection */
1260         ctdb_statistics_init(ctdb);
1261
1262         /* force initial recovery for election */
1263         ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
1264
1265         if (ctdb_start_eventd(ctdb) != 0) {
1266                 DEBUG(DEBUG_ERR, ("Failed to start event daemon\n"));
1267                 exit(1);
1268         }
1269
1270         ctdb_set_runstate(ctdb, CTDB_RUNSTATE_INIT);
1271         ret = ctdb_event_script(ctdb, CTDB_EVENT_INIT);
1272         if (ret != 0) {
1273                 ctdb_die(ctdb, "Failed to run init event\n");
1274         }
1275         ctdb_run_notification_script(ctdb, "init");
1276
1277         if (strcmp(ctdb->transport, "tcp") == 0) {
1278                 ret = ctdb_tcp_init(ctdb);
1279         }
1280 #ifdef USE_INFINIBAND
1281         if (strcmp(ctdb->transport, "ib") == 0) {
1282                 ret = ctdb_ibw_init(ctdb);
1283         }
1284 #endif
1285         if (ret != 0) {
1286                 DEBUG(DEBUG_ERR,("Failed to initialise transport '%s'\n", ctdb->transport));
1287                 return -1;
1288         }
1289
1290         if (ctdb->methods == NULL) {
1291                 DEBUG(DEBUG_ALERT,(__location__ " Can not initialize transport. ctdb->methods is NULL\n"));
1292                 ctdb_fatal(ctdb, "transport is unavailable. can not initialize.");
1293         }
1294
1295         /* Initialise the transport.  This sets the node address if it
1296          * was not set via the command-line. */
1297         if (ctdb->methods->initialise(ctdb) != 0) {
1298                 ctdb_fatal(ctdb, "transport failed to initialise");
1299         }
1300
1301         ctdb_set_my_pnn(ctdb);
1302
1303         initialise_node_flags(ctdb);
1304
1305         if (ctdb->public_addresses_file) {
1306                 ret = ctdb_set_public_addresses(ctdb, true);
1307                 if (ret == -1) {
1308                         DEBUG(DEBUG_ALERT,("Unable to setup public address list\n"));
1309                         exit(1);
1310                 }
1311         }
1312
1313         ctdb_initialise_vnn_map(ctdb);
1314
1315         /* attach to existing databases */
1316         if (ctdb_attach_databases(ctdb) != 0) {
1317                 ctdb_fatal(ctdb, "Failed to attach to databases\n");
1318         }
1319
1320         /* start frozen, then let the first election sort things out */
1321         if (!ctdb_blocking_freeze(ctdb)) {
1322                 ctdb_fatal(ctdb, "Failed to get initial freeze\n");
1323         }
1324
1325         /* now start accepting clients, only can do this once frozen */
1326         fde = tevent_add_fd(ctdb->ev, ctdb, ctdb->daemon.sd, TEVENT_FD_READ,
1327                             ctdb_accept_client, ctdb);
1328         if (fde == NULL) {
1329                 ctdb_fatal(ctdb, "Failed to add daemon socket to event loop");
1330         }
1331         tevent_fd_set_auto_close(fde);
1332
1333         /* Start the transport */
1334         if (ctdb->methods->start(ctdb) != 0) {
1335                 DEBUG(DEBUG_ALERT,("transport failed to start!\n"));
1336                 ctdb_fatal(ctdb, "transport failed to start");
1337         }
1338
1339         /* Recovery daemon and timed events are started from the
1340          * callback, only after the setup event completes
1341          * successfully.
1342          */
1343         ctdb_set_runstate(ctdb, CTDB_RUNSTATE_SETUP);
1344         ret = ctdb_event_script_callback(ctdb,
1345                                          ctdb,
1346                                          ctdb_setup_event_callback,
1347                                          ctdb,
1348                                          CTDB_EVENT_SETUP,
1349                                          "%s",
1350                                          "");
1351         if (ret != 0) {
1352                 DEBUG(DEBUG_CRIT,("Failed to set up 'setup' event\n"));
1353                 exit(1);
1354         }
1355
1356         lockdown_memory(ctdb->valgrinding);
1357
1358         /* go into a wait loop to allow other nodes to complete */
1359         tevent_loop_wait(ctdb->ev);
1360
1361         DEBUG(DEBUG_CRIT,("event_loop_wait() returned. this should not happen\n"));
1362         exit(1);
1363 }
1364
1365 /*
1366   allocate a packet for use in daemon<->daemon communication
1367  */
1368 struct ctdb_req_header *_ctdb_transport_allocate(struct ctdb_context *ctdb,
1369                                                  TALLOC_CTX *mem_ctx, 
1370                                                  enum ctdb_operation operation, 
1371                                                  size_t length, size_t slength,
1372                                                  const char *type)
1373 {
1374         int size;
1375         struct ctdb_req_header *hdr;
1376
1377         length = MAX(length, slength);
1378         size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
1379
1380         if (ctdb->methods == NULL) {
1381                 DEBUG(DEBUG_INFO,(__location__ " Unable to allocate transport packet for operation %u of length %u. Transport is DOWN.\n",
1382                          operation, (unsigned)length));
1383                 return NULL;
1384         }
1385
1386         hdr = (struct ctdb_req_header *)ctdb->methods->allocate_pkt(mem_ctx, size);
1387         if (hdr == NULL) {
1388                 DEBUG(DEBUG_ERR,("Unable to allocate transport packet for operation %u of length %u\n",
1389                          operation, (unsigned)length));
1390                 return NULL;
1391         }
1392         talloc_set_name_const(hdr, type);
1393         memset(hdr, 0, slength);
1394         hdr->length       = length;
1395         hdr->operation    = operation;
1396         hdr->ctdb_magic   = CTDB_MAGIC;
1397         hdr->ctdb_version = CTDB_PROTOCOL;
1398         hdr->generation   = ctdb->vnn_map->generation;
1399         hdr->srcnode      = ctdb->pnn;
1400
1401         return hdr;     
1402 }
1403
1404 struct daemon_control_state {
1405         struct daemon_control_state *next, *prev;
1406         struct ctdb_client *client;
1407         struct ctdb_req_control_old *c;
1408         uint32_t reqid;
1409         struct ctdb_node *node;
1410 };
1411
1412 /*
1413   callback when a control reply comes in
1414  */
1415 static void daemon_control_callback(struct ctdb_context *ctdb,
1416                                     int32_t status, TDB_DATA data, 
1417                                     const char *errormsg,
1418                                     void *private_data)
1419 {
1420         struct daemon_control_state *state = talloc_get_type(private_data, 
1421                                                              struct daemon_control_state);
1422         struct ctdb_client *client = state->client;
1423         struct ctdb_reply_control_old *r;
1424         size_t len;
1425         int ret;
1426
1427         /* construct a message to send to the client containing the data */
1428         len = offsetof(struct ctdb_reply_control_old, data) + data.dsize;
1429         if (errormsg) {
1430                 len += strlen(errormsg);
1431         }
1432         r = ctdbd_allocate_pkt(ctdb, state, CTDB_REPLY_CONTROL, len, 
1433                                struct ctdb_reply_control_old);
1434         CTDB_NO_MEMORY_VOID(ctdb, r);
1435
1436         r->hdr.reqid     = state->reqid;
1437         r->status        = status;
1438         r->datalen       = data.dsize;
1439         r->errorlen = 0;
1440         memcpy(&r->data[0], data.dptr, data.dsize);
1441         if (errormsg) {
1442                 r->errorlen = strlen(errormsg);
1443                 memcpy(&r->data[r->datalen], errormsg, r->errorlen);
1444         }
1445
1446         ret = daemon_queue_send(client, &r->hdr);
1447         if (ret != -1) {
1448                 talloc_free(state);
1449         }
1450 }
1451
1452 /*
1453   fail all pending controls to a disconnected node
1454  */
1455 void ctdb_daemon_cancel_controls(struct ctdb_context *ctdb, struct ctdb_node *node)
1456 {
1457         struct daemon_control_state *state;
1458         while ((state = node->pending_controls)) {
1459                 DLIST_REMOVE(node->pending_controls, state);
1460                 daemon_control_callback(ctdb, (uint32_t)-1, tdb_null, 
1461                                         "node is disconnected", state);
1462         }
1463 }
1464
1465 /*
1466   destroy a daemon_control_state
1467  */
1468 static int daemon_control_destructor(struct daemon_control_state *state)
1469 {
1470         if (state->node) {
1471                 DLIST_REMOVE(state->node->pending_controls, state);
1472         }
1473         return 0;
1474 }
1475
1476 /*
1477   this is called when the ctdb daemon received a ctdb request control
1478   from a local client over the unix domain socket
1479  */
1480 static void daemon_request_control_from_client(struct ctdb_client *client, 
1481                                                struct ctdb_req_control_old *c)
1482 {
1483         TDB_DATA data;
1484         int res;
1485         struct daemon_control_state *state;
1486         TALLOC_CTX *tmp_ctx = talloc_new(client);
1487
1488         if (c->hdr.destnode == CTDB_CURRENT_NODE) {
1489                 c->hdr.destnode = client->ctdb->pnn;
1490         }
1491
1492         state = talloc(client, struct daemon_control_state);
1493         CTDB_NO_MEMORY_VOID(client->ctdb, state);
1494
1495         state->client = client;
1496         state->c = talloc_steal(state, c);
1497         state->reqid = c->hdr.reqid;
1498         if (ctdb_validate_pnn(client->ctdb, c->hdr.destnode)) {
1499                 state->node = client->ctdb->nodes[c->hdr.destnode];
1500                 DLIST_ADD(state->node->pending_controls, state);
1501         } else {
1502                 state->node = NULL;
1503         }
1504
1505         talloc_set_destructor(state, daemon_control_destructor);
1506
1507         if (c->flags & CTDB_CTRL_FLAG_NOREPLY) {
1508                 talloc_steal(tmp_ctx, state);
1509         }
1510         
1511         data.dptr = &c->data[0];
1512         data.dsize = c->datalen;
1513         res = ctdb_daemon_send_control(client->ctdb, c->hdr.destnode,
1514                                        c->srvid, c->opcode, client->client_id,
1515                                        c->flags,
1516                                        data, daemon_control_callback,
1517                                        state);
1518         if (res != 0) {
1519                 DEBUG(DEBUG_ERR,(__location__ " Failed to send control to remote node %u\n",
1520                          c->hdr.destnode));
1521         }
1522
1523         talloc_free(tmp_ctx);
1524 }
1525
1526 /*
1527   register a call function
1528 */
1529 int ctdb_daemon_set_call(struct ctdb_context *ctdb, uint32_t db_id,
1530                          ctdb_fn_t fn, int id)
1531 {
1532         struct ctdb_registered_call *call;
1533         struct ctdb_db_context *ctdb_db;
1534
1535         ctdb_db = find_ctdb_db(ctdb, db_id);
1536         if (ctdb_db == NULL) {
1537                 return -1;
1538         }
1539
1540         call = talloc(ctdb_db, struct ctdb_registered_call);
1541         call->fn = fn;
1542         call->id = id;
1543
1544         DLIST_ADD(ctdb_db->calls, call);        
1545         return 0;
1546 }
1547
1548
1549
1550 /*
1551   this local messaging handler is ugly, but is needed to prevent
1552   recursion in ctdb_send_message() when the destination node is the
1553   same as the source node
1554  */
1555 struct ctdb_local_message {
1556         struct ctdb_context *ctdb;
1557         uint64_t srvid;
1558         TDB_DATA data;
1559 };
1560
1561 static void ctdb_local_message_trigger(struct tevent_context *ev,
1562                                        struct tevent_timer *te,
1563                                        struct timeval t, void *private_data)
1564 {
1565         struct ctdb_local_message *m = talloc_get_type(
1566                 private_data, struct ctdb_local_message);
1567
1568         srvid_dispatch(m->ctdb->srv, m->srvid, CTDB_SRVID_ALL, m->data);
1569         talloc_free(m);
1570 }
1571
1572 static int ctdb_local_message(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data)
1573 {
1574         struct ctdb_local_message *m;
1575         m = talloc(ctdb, struct ctdb_local_message);
1576         CTDB_NO_MEMORY(ctdb, m);
1577
1578         m->ctdb = ctdb;
1579         m->srvid = srvid;
1580         m->data  = data;
1581         m->data.dptr = talloc_memdup(m, m->data.dptr, m->data.dsize);
1582         if (m->data.dptr == NULL) {
1583                 talloc_free(m);
1584                 return -1;
1585         }
1586
1587         /* this needs to be done as an event to prevent recursion */
1588         tevent_add_timer(ctdb->ev, m, timeval_zero(),
1589                          ctdb_local_message_trigger, m);
1590         return 0;
1591 }
1592
1593 /*
1594   send a ctdb message
1595 */
1596 int ctdb_daemon_send_message(struct ctdb_context *ctdb, uint32_t pnn,
1597                              uint64_t srvid, TDB_DATA data)
1598 {
1599         struct ctdb_req_message_old *r;
1600         int len;
1601
1602         if (ctdb->methods == NULL) {
1603                 DEBUG(DEBUG_INFO,(__location__ " Failed to send message. Transport is DOWN\n"));
1604                 return -1;
1605         }
1606
1607         /* see if this is a message to ourselves */
1608         if (pnn == ctdb->pnn) {
1609                 return ctdb_local_message(ctdb, srvid, data);
1610         }
1611
1612         len = offsetof(struct ctdb_req_message_old, data) + data.dsize;
1613         r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_MESSAGE, len,
1614                                     struct ctdb_req_message_old);
1615         CTDB_NO_MEMORY(ctdb, r);
1616
1617         r->hdr.destnode  = pnn;
1618         r->srvid         = srvid;
1619         r->datalen       = data.dsize;
1620         memcpy(&r->data[0], data.dptr, data.dsize);
1621
1622         ctdb_queue_packet(ctdb, &r->hdr);
1623
1624         talloc_free(r);
1625         return 0;
1626 }
1627
1628
1629
1630 struct ctdb_client_notify_list {
1631         struct ctdb_client_notify_list *next, *prev;
1632         struct ctdb_context *ctdb;
1633         uint64_t srvid;
1634         TDB_DATA data;
1635 };
1636
1637
1638 static int ctdb_client_notify_destructor(struct ctdb_client_notify_list *nl)
1639 {
1640         int ret;
1641
1642         DEBUG(DEBUG_ERR,("Sending client notify message for srvid:%llu\n", (unsigned long long)nl->srvid));
1643
1644         ret = ctdb_daemon_send_message(nl->ctdb, CTDB_BROADCAST_CONNECTED, (unsigned long long)nl->srvid, nl->data);
1645         if (ret != 0) {
1646                 DEBUG(DEBUG_ERR,("Failed to send client notify message\n"));
1647         }
1648
1649         return 0;
1650 }
1651
1652 int32_t ctdb_control_register_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1653 {
1654         struct ctdb_notify_data_old *notify = (struct ctdb_notify_data_old *)indata.dptr;
1655         struct ctdb_client *client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
1656         struct ctdb_client_notify_list *nl;
1657
1658         DEBUG(DEBUG_INFO,("Register srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
1659
1660         if (indata.dsize < offsetof(struct ctdb_notify_data_old, notify_data)) {
1661                 DEBUG(DEBUG_ERR,(__location__ " Too little data in control : %d\n", (int)indata.dsize));
1662                 return -1;
1663         }
1664
1665         if (indata.dsize != (notify->len + offsetof(struct ctdb_notify_data_old, notify_data))) {
1666                 DEBUG(DEBUG_ERR,(__location__ " Wrong amount of data in control. Got %d, expected %d\n", (int)indata.dsize, (int)(notify->len + offsetof(struct ctdb_notify_data_old, notify_data))));
1667                 return -1;
1668         }
1669
1670
1671         if (client == NULL) {
1672                 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1673                 return -1;
1674         }
1675
1676         for(nl=client->notify; nl; nl=nl->next) {
1677                 if (nl->srvid == notify->srvid) {
1678                         break;
1679                 }
1680         }
1681         if (nl != NULL) {
1682                 DEBUG(DEBUG_ERR,(__location__ " Notification for srvid:%llu already exists for this client\n", (unsigned long long)notify->srvid));
1683                 return -1;
1684         }
1685
1686         nl = talloc(client, struct ctdb_client_notify_list);
1687         CTDB_NO_MEMORY(ctdb, nl);
1688         nl->ctdb       = ctdb;
1689         nl->srvid      = notify->srvid;
1690         nl->data.dsize = notify->len;
1691         nl->data.dptr  = talloc_memdup(nl, notify->notify_data,
1692                                        nl->data.dsize);
1693         CTDB_NO_MEMORY(ctdb, nl->data.dptr);
1694         
1695         DLIST_ADD(client->notify, nl);
1696         talloc_set_destructor(nl, ctdb_client_notify_destructor);
1697
1698         return 0;
1699 }
1700
1701 int32_t ctdb_control_deregister_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1702 {
1703         uint64_t srvid = *(uint64_t *)indata.dptr;
1704         struct ctdb_client *client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
1705         struct ctdb_client_notify_list *nl;
1706
1707         DEBUG(DEBUG_INFO,("Deregister srvid %llu for client %d\n", (unsigned long long)srvid, client_id));
1708
1709         if (client == NULL) {
1710                 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1711                 return -1;
1712         }
1713
1714         for(nl=client->notify; nl; nl=nl->next) {
1715                 if (nl->srvid == srvid) {
1716                         break;
1717                 }
1718         }
1719         if (nl == NULL) {
1720                 DEBUG(DEBUG_ERR,(__location__ " No notification for srvid:%llu found for this client\n", (unsigned long long)srvid));
1721                 return -1;
1722         }
1723
1724         DLIST_REMOVE(client->notify, nl);
1725         talloc_set_destructor(nl, NULL);
1726         talloc_free(nl);
1727
1728         return 0;
1729 }
1730
1731 struct ctdb_client *ctdb_find_client_by_pid(struct ctdb_context *ctdb, pid_t pid)
1732 {
1733         struct ctdb_client_pid_list *client_pid;
1734
1735         for (client_pid = ctdb->client_pids; client_pid; client_pid=client_pid->next) {
1736                 if (client_pid->pid == pid) {
1737                         return client_pid->client;
1738                 }
1739         }
1740         return NULL;
1741 }
1742
1743
1744 /* This control is used by samba when probing if a process (of a samba daemon)
1745    exists on the node.
1746    Samba does this when it needs/wants to check if a subrecord in one of the
1747    databases is still valied, or if it is stale and can be removed.
1748    If the node is in unhealthy or stopped state we just kill of the samba
1749    process holding this sub-record and return to the calling samba that
1750    the process does not exist.
1751    This allows us to forcefully recall subrecords registered by samba processes
1752    on banned and stopped nodes.
1753 */
1754 int32_t ctdb_control_process_exists(struct ctdb_context *ctdb, pid_t pid)
1755 {
1756         struct ctdb_client *client;
1757
1758         client = ctdb_find_client_by_pid(ctdb, pid);
1759         if (client == NULL) {
1760                 return -1;
1761         }
1762
1763         if (ctdb->nodes[ctdb->pnn]->flags & NODE_FLAGS_INACTIVE) {
1764                 DEBUG(DEBUG_NOTICE,
1765                       ("Killing client with pid:%d on banned/stopped node\n",
1766                        (int)pid));
1767                 talloc_free(client);
1768                 return -1;
1769         }
1770
1771         return kill(pid, 0);
1772 }
1773
1774 int ctdb_control_getnodesfile(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
1775 {
1776         struct ctdb_node_map_old *node_map = NULL;
1777
1778         CHECK_CONTROL_DATA_SIZE(0);
1779
1780         node_map = ctdb_read_nodes_file(ctdb, ctdb->nodes_file);
1781         if (node_map == NULL) {
1782                 DEBUG(DEBUG_ERR, ("Failed to read nodes file\n"));
1783                 return -1;
1784         }
1785
1786         outdata->dptr  = (unsigned char *)node_map;
1787         outdata->dsize = talloc_get_size(outdata->dptr);
1788
1789         return 0;
1790 }
1791
1792 void ctdb_shutdown_sequence(struct ctdb_context *ctdb, int exit_code)
1793 {
1794         if (ctdb->runstate == CTDB_RUNSTATE_SHUTDOWN) {
1795                 DEBUG(DEBUG_NOTICE,("Already shutting down so will not proceed.\n"));
1796                 return;
1797         }
1798
1799         DEBUG(DEBUG_ERR,("Shutdown sequence commencing.\n"));
1800         ctdb_set_runstate(ctdb, CTDB_RUNSTATE_SHUTDOWN);
1801         ctdb_stop_recoverd(ctdb);
1802         ctdb_stop_keepalive(ctdb);
1803         ctdb_stop_monitoring(ctdb);
1804         ctdb_release_all_ips(ctdb);
1805         ctdb_event_script(ctdb, CTDB_EVENT_SHUTDOWN);
1806         ctdb_stop_eventd(ctdb);
1807         if (ctdb->methods != NULL && ctdb->methods->shutdown != NULL) {
1808                 ctdb->methods->shutdown(ctdb);
1809         }
1810
1811         DEBUG(DEBUG_ERR,("Shutdown sequence complete, exiting.\n"));
1812         exit(exit_code);
1813 }
1814
1815 /* When forking the main daemon and the child process needs to connect
1816  * back to the daemon as a client process, this function can be used
1817  * to change the ctdb context from daemon into client mode.  The child
1818  * process must be created using ctdb_fork() and not fork() -
1819  * ctdb_fork() does some necessary housekeeping.
1820  */
1821 int switch_from_server_to_client(struct ctdb_context *ctdb)
1822 {
1823         int ret;
1824
1825         /* get a new event context */
1826         ctdb->ev = tevent_context_init(ctdb);
1827         if (ctdb->ev == NULL) {
1828                 DEBUG(DEBUG_ALERT,("tevent_context_init() failed\n"));
1829                 exit(1);
1830         }
1831         tevent_loop_allow_nesting(ctdb->ev);
1832
1833         /* Connect to main CTDB daemon */
1834         ret = ctdb_socket_connect(ctdb);
1835         if (ret != 0) {
1836                 DEBUG(DEBUG_ALERT, (__location__ " Failed to init ctdb client\n"));
1837                 return -1;
1838         }
1839
1840         ctdb->can_send_controls = true;
1841
1842         return 0;
1843 }