Remove explicit include of lib/tevent/tevent.h.
[garming/samba-autobuild/.git] / ctdb / server / ctdb_daemon.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2006
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "db_wrap.h"
22 #include "lib/tdb/include/tdb.h"
23 #include "lib/util/dlinklist.h"
24 #include "system/network.h"
25 #include "system/filesys.h"
26 #include "system/wait.h"
27 #include "../include/ctdb_client.h"
28 #include "../include/ctdb_private.h"
29 #include "../common/rb_tree.h"
30 #include <sys/socket.h>
31
32 struct ctdb_client_pid_list {
33         struct ctdb_client_pid_list *next, *prev;
34         struct ctdb_context *ctdb;
35         pid_t pid;
36         struct ctdb_client *client;
37 };
38
39 static void daemon_incoming_packet(void *, struct ctdb_req_header *);
40
41 static void print_exit_message(void)
42 {
43         DEBUG(DEBUG_NOTICE,("CTDB daemon shutting down\n"));
44 }
45
46
47
48 static void ctdb_time_tick(struct event_context *ev, struct timed_event *te, 
49                                   struct timeval t, void *private_data)
50 {
51         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
52
53         if (getpid() != ctdbd_pid) {
54                 return;
55         }
56
57         event_add_timed(ctdb->ev, ctdb, 
58                         timeval_current_ofs(1, 0), 
59                         ctdb_time_tick, ctdb);
60 }
61
62 /* Used to trigger a dummy event once per second, to make
63  * detection of hangs more reliable.
64  */
65 static void ctdb_start_time_tickd(struct ctdb_context *ctdb)
66 {
67         event_add_timed(ctdb->ev, ctdb, 
68                         timeval_current_ofs(1, 0), 
69                         ctdb_time_tick, ctdb);
70 }
71
72
73 /* called when the "startup" event script has finished */
74 static void ctdb_start_transport(struct ctdb_context *ctdb)
75 {
76         if (ctdb->methods == NULL) {
77                 DEBUG(DEBUG_ALERT,(__location__ " startup event finished but transport is DOWN.\n"));
78                 ctdb_fatal(ctdb, "transport is not initialized but startup completed");
79         }
80
81         /* start the transport running */
82         if (ctdb->methods->start(ctdb) != 0) {
83                 DEBUG(DEBUG_ALERT,("transport failed to start!\n"));
84                 ctdb_fatal(ctdb, "transport failed to start");
85         }
86
87         /* start the recovery daemon process */
88         if (ctdb_start_recoverd(ctdb) != 0) {
89                 DEBUG(DEBUG_ALERT,("Failed to start recovery daemon\n"));
90                 exit(11);
91         }
92
93         /* Make sure we log something when the daemon terminates */
94         atexit(print_exit_message);
95
96         /* start monitoring for connected/disconnected nodes */
97         ctdb_start_keepalive(ctdb);
98
99         /* start monitoring for node health */
100         ctdb_start_monitoring(ctdb);
101
102         /* start periodic update of tcp tickle lists */
103         ctdb_start_tcp_tickle_update(ctdb);
104
105         /* start listening for recovery daemon pings */
106         ctdb_control_recd_ping(ctdb);
107
108         /* start listening to timer ticks */
109         ctdb_start_time_tickd(ctdb);
110 }
111
112 static void block_signal(int signum)
113 {
114         struct sigaction act;
115
116         memset(&act, 0, sizeof(act));
117
118         act.sa_handler = SIG_IGN;
119         sigemptyset(&act.sa_mask);
120         sigaddset(&act.sa_mask, signum);
121         sigaction(signum, &act, NULL);
122 }
123
124
125 /*
126   send a packet to a client
127  */
128 static int daemon_queue_send(struct ctdb_client *client, struct ctdb_req_header *hdr)
129 {
130         CTDB_INCREMENT_STAT(client->ctdb, client_packets_sent);
131         if (hdr->operation == CTDB_REQ_MESSAGE) {
132                 if (ctdb_queue_length(client->queue) > client->ctdb->tunable.max_queue_depth_drop_msg) {
133                         DEBUG(DEBUG_ERR,("CTDB_REQ_MESSAGE queue full - killing client connection.\n"));
134                         talloc_free(client);
135                         return -1;
136                 }
137         }
138         return ctdb_queue_send(client->queue, (uint8_t *)hdr, hdr->length);
139 }
140
141 /*
142   message handler for when we are in daemon mode. This redirects the message
143   to the right client
144  */
145 static void daemon_message_handler(struct ctdb_context *ctdb, uint64_t srvid, 
146                                     TDB_DATA data, void *private_data)
147 {
148         struct ctdb_client *client = talloc_get_type(private_data, struct ctdb_client);
149         struct ctdb_req_message *r;
150         int len;
151
152         /* construct a message to send to the client containing the data */
153         len = offsetof(struct ctdb_req_message, data) + data.dsize;
154         r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE, 
155                                len, struct ctdb_req_message);
156         CTDB_NO_MEMORY_VOID(ctdb, r);
157
158         talloc_set_name_const(r, "req_message packet");
159
160         r->srvid         = srvid;
161         r->datalen       = data.dsize;
162         memcpy(&r->data[0], data.dptr, data.dsize);
163
164         daemon_queue_send(client, &r->hdr);
165
166         talloc_free(r);
167 }
168
169 /*
170   this is called when the ctdb daemon received a ctdb request to 
171   set the srvid from the client
172  */
173 int daemon_register_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
174 {
175         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
176         int res;
177         if (client == NULL) {
178                 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_register_message_handler\n"));
179                 return -1;
180         }
181         res = ctdb_register_message_handler(ctdb, client, srvid, daemon_message_handler, client);
182         if (res != 0) {
183                 DEBUG(DEBUG_ERR,(__location__ " Failed to register handler %llu in daemon\n", 
184                          (unsigned long long)srvid));
185         } else {
186                 DEBUG(DEBUG_INFO,(__location__ " Registered message handler for srvid=%llu\n", 
187                          (unsigned long long)srvid));
188         }
189
190         return res;
191 }
192
193 /*
194   this is called when the ctdb daemon received a ctdb request to 
195   remove a srvid from the client
196  */
197 int daemon_deregister_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
198 {
199         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
200         if (client == NULL) {
201                 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_deregister_message_handler\n"));
202                 return -1;
203         }
204         return ctdb_deregister_message_handler(ctdb, srvid, client);
205 }
206
207 int daemon_check_srvids(struct ctdb_context *ctdb, TDB_DATA indata,
208                         TDB_DATA *outdata)
209 {
210         uint64_t *ids;
211         int i, num_ids;
212         uint8_t *results;
213
214         if ((indata.dsize % sizeof(uint64_t)) != 0) {
215                 DEBUG(DEBUG_ERR, ("Bad indata in daemon_check_srvids, "
216                                   "size=%d\n", (int)indata.dsize));
217                 return -1;
218         }
219
220         ids = (uint64_t *)indata.dptr;
221         num_ids = indata.dsize / 8;
222
223         results = talloc_zero_array(outdata, uint8_t, (num_ids+7)/8);
224         if (results == NULL) {
225                 DEBUG(DEBUG_ERR, ("talloc failed in daemon_check_srvids\n"));
226                 return -1;
227         }
228         for (i=0; i<num_ids; i++) {
229                 struct ctdb_message_list *ml;
230                 for (ml=ctdb->message_list; ml; ml=ml->next) {
231                         if (ml->srvid == ids[i]) {
232                                 break;
233                         }
234                 }
235                 if (ml != NULL) {
236                         results[i/8] |= (1 << (i%8));
237                 }
238         }
239         outdata->dptr = (uint8_t *)results;
240         outdata->dsize = talloc_get_size(results);
241         return 0;
242 }
243
244 /*
245   destroy a ctdb_client
246 */
247 static int ctdb_client_destructor(struct ctdb_client *client)
248 {
249         struct ctdb_db_context *ctdb_db;
250
251         ctdb_takeover_client_destructor_hook(client);
252         ctdb_reqid_remove(client->ctdb, client->client_id);
253         CTDB_DECREMENT_STAT(client->ctdb, num_clients);
254
255         if (client->num_persistent_updates != 0) {
256                 DEBUG(DEBUG_ERR,(__location__ " Client disconnecting with %u persistent updates in flight. Starting recovery\n", client->num_persistent_updates));
257                 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
258         }
259         ctdb_db = find_ctdb_db(client->ctdb, client->db_id);
260         if (ctdb_db) {
261                 DEBUG(DEBUG_ERR, (__location__ " client exit while transaction "
262                                   "commit active. Forcing recovery.\n"));
263                 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
264
265                 /* legacy trans2 transaction state: */
266                 ctdb_db->transaction_active = false;
267
268                 /*
269                  * trans3 transaction state:
270                  *
271                  * The destructor sets the pointer to NULL.
272                  */
273                 talloc_free(ctdb_db->persistent_state);
274         }
275
276         return 0;
277 }
278
279
280 /*
281   this is called when the ctdb daemon received a ctdb request message
282   from a local client over the unix domain socket
283  */
284 static void daemon_request_message_from_client(struct ctdb_client *client, 
285                                                struct ctdb_req_message *c)
286 {
287         TDB_DATA data;
288         int res;
289
290         if (c->hdr.destnode == CTDB_CURRENT_NODE) {
291                 c->hdr.destnode = ctdb_get_pnn(client->ctdb);
292         }
293
294         /* maybe the message is for another client on this node */
295         if (ctdb_get_pnn(client->ctdb)==c->hdr.destnode) {
296                 ctdb_request_message(client->ctdb, (struct ctdb_req_header *)c);
297                 return;
298         }
299
300         /* its for a remote node */
301         data.dptr = &c->data[0];
302         data.dsize = c->datalen;
303         res = ctdb_daemon_send_message(client->ctdb, c->hdr.destnode,
304                                        c->srvid, data);
305         if (res != 0) {
306                 DEBUG(DEBUG_ERR,(__location__ " Failed to send message to remote node %u\n",
307                          c->hdr.destnode));
308         }
309 }
310
311
312 struct daemon_call_state {
313         struct ctdb_client *client;
314         uint32_t reqid;
315         struct ctdb_call *call;
316         struct timeval start_time;
317
318         /* readonly request ? */
319         uint32_t readonly_fetch;
320         uint32_t client_callid;
321 };
322
323 /* 
324    complete a call from a client 
325 */
326 static void daemon_call_from_client_callback(struct ctdb_call_state *state)
327 {
328         struct daemon_call_state *dstate = talloc_get_type(state->async.private_data, 
329                                                            struct daemon_call_state);
330         struct ctdb_reply_call *r;
331         int res;
332         uint32_t length;
333         struct ctdb_client *client = dstate->client;
334         struct ctdb_db_context *ctdb_db = state->ctdb_db;
335
336         talloc_steal(client, dstate);
337         talloc_steal(dstate, dstate->call);
338
339         res = ctdb_daemon_call_recv(state, dstate->call);
340         if (res != 0) {
341                 DEBUG(DEBUG_ERR, (__location__ " ctdbd_call_recv() returned error\n"));
342                 CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
343
344                 CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 1", call_latency, dstate->start_time);
345                 return;
346         }
347
348         length = offsetof(struct ctdb_reply_call, data) + dstate->call->reply_data.dsize;
349         /* If the client asked for readonly FETCH, we remapped this to 
350            FETCH_WITH_HEADER when calling the daemon. So we must
351            strip the extra header off the reply data before passing
352            it back to the client.
353         */
354         if (dstate->readonly_fetch
355         && dstate->client_callid == CTDB_FETCH_FUNC) {
356                 length -= sizeof(struct ctdb_ltdb_header);
357         }
358
359         r = ctdbd_allocate_pkt(client->ctdb, dstate, CTDB_REPLY_CALL, 
360                                length, struct ctdb_reply_call);
361         if (r == NULL) {
362                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate reply_call in ctdb daemon\n"));
363                 CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
364                 CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 2", call_latency, dstate->start_time);
365                 return;
366         }
367         r->hdr.reqid        = dstate->reqid;
368         r->status           = dstate->call->status;
369
370         if (dstate->readonly_fetch
371         && dstate->client_callid == CTDB_FETCH_FUNC) {
372                 /* client only asked for a FETCH so we must strip off
373                    the extra ctdb_ltdb header
374                 */
375                 r->datalen          = dstate->call->reply_data.dsize - sizeof(struct ctdb_ltdb_header);
376                 memcpy(&r->data[0], dstate->call->reply_data.dptr + sizeof(struct ctdb_ltdb_header), r->datalen);
377         } else {
378                 r->datalen          = dstate->call->reply_data.dsize;
379                 memcpy(&r->data[0], dstate->call->reply_data.dptr, r->datalen);
380         }
381
382         res = daemon_queue_send(client, &r->hdr);
383         if (res == -1) {
384                 /* client is dead - return immediately */
385                 return;
386         }
387         if (res != 0) {
388                 DEBUG(DEBUG_ERR, (__location__ " Failed to queue packet from daemon to client\n"));
389         }
390         CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 3", call_latency, dstate->start_time);
391         CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
392         talloc_free(dstate);
393 }
394
395 struct ctdb_daemon_packet_wrap {
396         struct ctdb_context *ctdb;
397         uint32_t client_id;
398 };
399
400 /*
401   a wrapper to catch disconnected clients
402  */
403 static void daemon_incoming_packet_wrap(void *p, struct ctdb_req_header *hdr)
404 {
405         struct ctdb_client *client;
406         struct ctdb_daemon_packet_wrap *w = talloc_get_type(p, 
407                                                             struct ctdb_daemon_packet_wrap);
408         if (w == NULL) {
409                 DEBUG(DEBUG_CRIT,(__location__ " Bad packet type '%s'\n", talloc_get_name(p)));
410                 return;
411         }
412
413         client = ctdb_reqid_find(w->ctdb, w->client_id, struct ctdb_client);
414         if (client == NULL) {
415                 DEBUG(DEBUG_ERR,(__location__ " Packet for disconnected client %u\n",
416                          w->client_id));
417                 talloc_free(w);
418                 return;
419         }
420         talloc_free(w);
421
422         /* process it */
423         daemon_incoming_packet(client, hdr);    
424 }
425
426 struct ctdb_deferred_fetch_call {
427         struct ctdb_deferred_fetch_call *next, *prev;
428         struct ctdb_req_call *c;
429         struct ctdb_daemon_packet_wrap *w;
430 };
431
432 struct ctdb_deferred_fetch_queue {
433         struct ctdb_deferred_fetch_call *deferred_calls;
434 };
435
436 struct ctdb_deferred_requeue {
437         struct ctdb_deferred_fetch_call *dfc;
438         struct ctdb_client *client;
439 };
440
441 /* called from a timer event and starts reprocessing the deferred call.*/
442 static void reprocess_deferred_call(struct event_context *ev, struct timed_event *te, 
443                                        struct timeval t, void *private_data)
444 {
445         struct ctdb_deferred_requeue *dfr = (struct ctdb_deferred_requeue *)private_data;
446         struct ctdb_client *client = dfr->client;
447
448         talloc_steal(client, dfr->dfc->c);
449         daemon_incoming_packet(client, (struct ctdb_req_header *)dfr->dfc->c);
450         talloc_free(dfr);
451 }
452
453 /* the referral context is destroyed either after a timeout or when the initial
454    fetch-lock has finished.
455    at this stage, immediately start reprocessing the queued up deferred
456    calls so they get reprocessed immediately (and since we are dmaster at
457    this stage, trigger the waiting smbd processes to pick up and aquire the
458    record right away.
459 */
460 static int deferred_fetch_queue_destructor(struct ctdb_deferred_fetch_queue *dfq)
461 {
462
463         /* need to reprocess the packets from the queue explicitely instead of
464            just using a normal destructor since we want, need, to
465            call the clients in the same oder as the requests queued up
466         */
467         while (dfq->deferred_calls != NULL) {
468                 struct ctdb_client *client;
469                 struct ctdb_deferred_fetch_call *dfc = dfq->deferred_calls;
470                 struct ctdb_deferred_requeue *dfr;
471
472                 DLIST_REMOVE(dfq->deferred_calls, dfc);
473
474                 client = ctdb_reqid_find(dfc->w->ctdb, dfc->w->client_id, struct ctdb_client);
475                 if (client == NULL) {
476                         DEBUG(DEBUG_ERR,(__location__ " Packet for disconnected client %u\n",
477                                  dfc->w->client_id));
478                         continue;
479                 }
480
481                 /* process it by pushing it back onto the eventloop */
482                 dfr = talloc(client, struct ctdb_deferred_requeue);
483                 if (dfr == NULL) {
484                         DEBUG(DEBUG_ERR,("Failed to allocate deferred fetch requeue structure\n"));
485                         continue;
486                 }
487
488                 dfr->dfc    = talloc_steal(dfr, dfc);
489                 dfr->client = client;
490
491                 event_add_timed(dfc->w->ctdb->ev, client, timeval_zero(), reprocess_deferred_call, dfr);
492         }
493
494         return 0;
495 }
496
497 /* insert the new deferral context into the rb tree.
498    there should never be a pre-existing context here, but check for it
499    warn and destroy the previous context if there is already a deferral context
500    for this key.
501 */
502 static void *insert_dfq_callback(void *parm, void *data)
503 {
504         if (data) {
505                 DEBUG(DEBUG_ERR,("Already have DFQ registered. Free old %p and create new %p\n", data, parm));
506                 talloc_free(data);
507         }
508         return parm;
509 }
510
511 /* if the original fetch-lock did not complete within a reasonable time,
512    free the context and context for all deferred requests to cause them to be
513    re-inserted into the event system.
514 */
515 static void dfq_timeout(struct event_context *ev, struct timed_event *te, 
516                                   struct timeval t, void *private_data)
517 {
518         talloc_free(private_data);
519 }
520
521 /* This function is used in the local daemon to register a KEY in a database
522    for being "fetched"
523    While the remote fetch is in-flight, any futher attempts to re-fetch the
524    same record will be deferred until the fetch completes.
525 */
526 static int setup_deferred_fetch_locks(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
527 {
528         uint32_t *k;
529         struct ctdb_deferred_fetch_queue *dfq;
530
531         k = talloc_zero_size(call, ((call->key.dsize + 3) & 0xfffffffc) + 4);
532         if (k == NULL) {
533                 DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch\n"));
534                 return -1;
535         }
536
537         k[0] = (call->key.dsize + 3) / 4 + 1;
538         memcpy(&k[1], call->key.dptr, call->key.dsize);
539
540         dfq  = talloc(call, struct ctdb_deferred_fetch_queue);
541         if (dfq == NULL) {
542                 DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch queue structure\n"));
543                 talloc_free(k);
544                 return -1;
545         }
546         dfq->deferred_calls = NULL;
547
548         trbt_insertarray32_callback(ctdb_db->deferred_fetch, k[0], &k[0], insert_dfq_callback, dfq);
549
550         talloc_set_destructor(dfq, deferred_fetch_queue_destructor);
551
552         /* if the fetch havent completed in 30 seconds, just tear it all down
553            and let it try again as the events are reissued */
554         event_add_timed(ctdb_db->ctdb->ev, dfq, timeval_current_ofs(30, 0), dfq_timeout, dfq);
555
556         talloc_free(k);
557         return 0;
558 }
559
560 /* check if this is a duplicate request to a fetch already in-flight
561    if it is, make this call deferred to be reprocessed later when
562    the in-flight fetch completes.
563 */
564 static int requeue_duplicate_fetch(struct ctdb_db_context *ctdb_db, struct ctdb_client *client, TDB_DATA key, struct ctdb_req_call *c)
565 {
566         uint32_t *k;
567         struct ctdb_deferred_fetch_queue *dfq;
568         struct ctdb_deferred_fetch_call *dfc;
569
570         k = talloc_zero_size(c, ((key.dsize + 3) & 0xfffffffc) + 4);
571         if (k == NULL) {
572                 DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch\n"));
573                 return -1;
574         }
575
576         k[0] = (key.dsize + 3) / 4 + 1;
577         memcpy(&k[1], key.dptr, key.dsize);
578
579         dfq = trbt_lookuparray32(ctdb_db->deferred_fetch, k[0], &k[0]);
580         if (dfq == NULL) {
581                 talloc_free(k);
582                 return -1;
583         }
584
585
586         talloc_free(k);
587
588         dfc = talloc(dfq, struct ctdb_deferred_fetch_call);
589         if (dfc == NULL) {
590                 DEBUG(DEBUG_ERR, ("Failed to allocate deferred fetch call structure\n"));
591                 return -1;
592         }
593
594         dfc->w = talloc(dfc, struct ctdb_daemon_packet_wrap);
595         if (dfc->w == NULL) {
596                 DEBUG(DEBUG_ERR,("Failed to allocate deferred fetch daemon packet wrap structure\n"));
597                 talloc_free(dfc);
598                 return -1;
599         }
600
601         dfc->c = talloc_steal(dfc, c);
602         dfc->w->ctdb = ctdb_db->ctdb;
603         dfc->w->client_id = client->client_id;
604
605         DLIST_ADD_END(dfq->deferred_calls, dfc, NULL);
606
607         return 0;
608 }
609
610
611 /*
612   this is called when the ctdb daemon received a ctdb request call
613   from a local client over the unix domain socket
614  */
615 static void daemon_request_call_from_client(struct ctdb_client *client, 
616                                             struct ctdb_req_call *c)
617 {
618         struct ctdb_call_state *state;
619         struct ctdb_db_context *ctdb_db;
620         struct daemon_call_state *dstate;
621         struct ctdb_call *call;
622         struct ctdb_ltdb_header header;
623         TDB_DATA key, data;
624         int ret;
625         struct ctdb_context *ctdb = client->ctdb;
626         struct ctdb_daemon_packet_wrap *w;
627
628         CTDB_INCREMENT_STAT(ctdb, total_calls);
629         CTDB_DECREMENT_STAT(ctdb, pending_calls);
630
631         ctdb_db = find_ctdb_db(client->ctdb, c->db_id);
632         if (!ctdb_db) {
633                 DEBUG(DEBUG_ERR, (__location__ " Unknown database in request. db_id==0x%08x",
634                           c->db_id));
635                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
636                 return;
637         }
638
639         if (ctdb_db->unhealthy_reason) {
640                 /*
641                  * this is just a warning, as the tdb should be empty anyway,
642                  * and only persistent databases can be unhealthy, which doesn't
643                  * use this code patch
644                  */
645                 DEBUG(DEBUG_WARNING,("warn: db(%s) unhealty in daemon_request_call_from_client(): %s\n",
646                                      ctdb_db->db_name, ctdb_db->unhealthy_reason));
647         }
648
649         key.dptr = c->data;
650         key.dsize = c->keylen;
651
652         w = talloc(ctdb, struct ctdb_daemon_packet_wrap);
653         CTDB_NO_MEMORY_VOID(ctdb, w);   
654
655         w->ctdb = ctdb;
656         w->client_id = client->client_id;
657
658         ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header, 
659                                            (struct ctdb_req_header *)c, &data,
660                                            daemon_incoming_packet_wrap, w, True);
661         if (ret == -2) {
662                 /* will retry later */
663                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
664                 return;
665         }
666
667         talloc_free(w);
668
669         if (ret != 0) {
670                 DEBUG(DEBUG_ERR,(__location__ " Unable to fetch record\n"));
671                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
672                 return;
673         }
674
675
676         /* check if this fetch request is a duplicate for a
677            request we already have in flight. If so defer it until
678            the first request completes.
679         */
680         if (ctdb->tunable.fetch_collapse == 1) {
681                 if (requeue_duplicate_fetch(ctdb_db, client, key, c) == 0) {
682                         ret = ctdb_ltdb_unlock(ctdb_db, key);
683                         if (ret != 0) {
684                                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
685                         }
686                         return;
687                 }
688         }
689
690         /* Dont do READONLY if we dont have a tracking database */
691         if ((c->flags & CTDB_WANT_READONLY) && !ctdb_db->readonly) {
692                 c->flags &= ~CTDB_WANT_READONLY;
693         }
694
695         if (header.flags & CTDB_REC_RO_REVOKE_COMPLETE) {
696                 header.flags &= ~(CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY|CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_REVOKE_COMPLETE);
697                 CTDB_INCREMENT_STAT(ctdb, total_ro_revokes);
698                 CTDB_INCREMENT_DB_STAT(ctdb_db, db_ro_revokes);
699                 if (ctdb_ltdb_store(ctdb_db, key, &header, data) != 0) {
700                         ctdb_fatal(ctdb, "Failed to write header with cleared REVOKE flag");
701                 }
702                 /* and clear out the tracking data */
703                 if (tdb_delete(ctdb_db->rottdb, key) != 0) {
704                         DEBUG(DEBUG_ERR,(__location__ " Failed to clear out trackingdb record\n"));
705                 }
706         }
707
708         /* if we are revoking, we must defer all other calls until the revoke
709          * had completed.
710          */
711         if (header.flags & CTDB_REC_RO_REVOKING_READONLY) {
712                 talloc_free(data.dptr);
713                 ret = ctdb_ltdb_unlock(ctdb_db, key);
714
715                 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, key, (struct ctdb_req_header *)c, daemon_incoming_packet, client) != 0) {
716                         ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
717                 }
718                 return;
719         }
720
721         if ((header.dmaster == ctdb->pnn)
722         && (!(c->flags & CTDB_WANT_READONLY))
723         && (header.flags & (CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY)) ) {
724                 header.flags   |= CTDB_REC_RO_REVOKING_READONLY;
725                 if (ctdb_ltdb_store(ctdb_db, key, &header, data) != 0) {
726                         ctdb_fatal(ctdb, "Failed to store record with HAVE_DELEGATIONS set");
727                 }
728                 ret = ctdb_ltdb_unlock(ctdb_db, key);
729
730                 if (ctdb_start_revoke_ro_record(ctdb, ctdb_db, key, &header, data) != 0) {
731                         ctdb_fatal(ctdb, "Failed to start record revoke");
732                 }
733                 talloc_free(data.dptr);
734
735                 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, key, (struct ctdb_req_header *)c, daemon_incoming_packet, client) != 0) {
736                         ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
737                 }
738
739                 return;
740         }               
741
742         dstate = talloc(client, struct daemon_call_state);
743         if (dstate == NULL) {
744                 ret = ctdb_ltdb_unlock(ctdb_db, key);
745                 if (ret != 0) {
746                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
747                 }
748
749                 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate dstate\n"));
750                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
751                 return;
752         }
753         dstate->start_time = timeval_current();
754         dstate->client = client;
755         dstate->reqid  = c->hdr.reqid;
756         talloc_steal(dstate, data.dptr);
757
758         call = dstate->call = talloc_zero(dstate, struct ctdb_call);
759         if (call == NULL) {
760                 ret = ctdb_ltdb_unlock(ctdb_db, key);
761                 if (ret != 0) {
762                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
763                 }
764
765                 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate call\n"));
766                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
767                 CTDB_UPDATE_LATENCY(ctdb, ctdb_db, "call_from_client 1", call_latency, dstate->start_time);
768                 return;
769         }
770
771         dstate->readonly_fetch = 0;
772         call->call_id = c->callid;
773         call->key = key;
774         call->call_data.dptr = c->data + c->keylen;
775         call->call_data.dsize = c->calldatalen;
776         call->flags = c->flags;
777
778         if (c->flags & CTDB_WANT_READONLY) {
779                 /* client wants readonly record, so translate this into a 
780                    fetch with header. remember what the client asked for
781                    so we can remap the reply back to the proper format for
782                    the client in the reply
783                  */
784                 dstate->client_callid = call->call_id;
785                 call->call_id = CTDB_FETCH_WITH_HEADER_FUNC;
786                 dstate->readonly_fetch = 1;
787         }
788
789         if (header.dmaster == ctdb->pnn) {
790                 state = ctdb_call_local_send(ctdb_db, call, &header, &data);
791         } else {
792                 state = ctdb_daemon_call_send_remote(ctdb_db, call, &header);
793                 if (ctdb->tunable.fetch_collapse == 1) {
794                         /* This request triggered a remote fetch-lock.
795                            set up a deferral for this key so any additional
796                            fetch-locks are deferred until the current one
797                            finishes.
798                          */
799                         setup_deferred_fetch_locks(ctdb_db, call);
800                 }
801         }
802
803         ret = ctdb_ltdb_unlock(ctdb_db, key);
804         if (ret != 0) {
805                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
806         }
807
808         if (state == NULL) {
809                 DEBUG(DEBUG_ERR,(__location__ " Unable to setup call send\n"));
810                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
811                 CTDB_UPDATE_LATENCY(ctdb, ctdb_db, "call_from_client 2", call_latency, dstate->start_time);
812                 return;
813         }
814         talloc_steal(state, dstate);
815         talloc_steal(client, state);
816
817         state->async.fn = daemon_call_from_client_callback;
818         state->async.private_data = dstate;
819 }
820
821
822 static void daemon_request_control_from_client(struct ctdb_client *client, 
823                                                struct ctdb_req_control *c);
824
825 /* data contains a packet from the client */
826 static void daemon_incoming_packet(void *p, struct ctdb_req_header *hdr)
827 {
828         struct ctdb_client *client = talloc_get_type(p, struct ctdb_client);
829         TALLOC_CTX *tmp_ctx;
830         struct ctdb_context *ctdb = client->ctdb;
831
832         /* place the packet as a child of a tmp_ctx. We then use
833            talloc_free() below to free it. If any of the calls want
834            to keep it, then they will steal it somewhere else, and the
835            talloc_free() will be a no-op */
836         tmp_ctx = talloc_new(client);
837         talloc_steal(tmp_ctx, hdr);
838
839         if (hdr->ctdb_magic != CTDB_MAGIC) {
840                 ctdb_set_error(client->ctdb, "Non CTDB packet rejected in daemon\n");
841                 goto done;
842         }
843
844         if (hdr->ctdb_version != CTDB_VERSION) {
845                 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
846                 goto done;
847         }
848
849         switch (hdr->operation) {
850         case CTDB_REQ_CALL:
851                 CTDB_INCREMENT_STAT(ctdb, client.req_call);
852                 daemon_request_call_from_client(client, (struct ctdb_req_call *)hdr);
853                 break;
854
855         case CTDB_REQ_MESSAGE:
856                 CTDB_INCREMENT_STAT(ctdb, client.req_message);
857                 daemon_request_message_from_client(client, (struct ctdb_req_message *)hdr);
858                 break;
859
860         case CTDB_REQ_CONTROL:
861                 CTDB_INCREMENT_STAT(ctdb, client.req_control);
862                 daemon_request_control_from_client(client, (struct ctdb_req_control *)hdr);
863                 break;
864
865         default:
866                 DEBUG(DEBUG_CRIT,(__location__ " daemon: unrecognized operation %u\n",
867                          hdr->operation));
868         }
869
870 done:
871         talloc_free(tmp_ctx);
872 }
873
874 /*
875   called when the daemon gets a incoming packet
876  */
877 static void ctdb_daemon_read_cb(uint8_t *data, size_t cnt, void *args)
878 {
879         struct ctdb_client *client = talloc_get_type(args, struct ctdb_client);
880         struct ctdb_req_header *hdr;
881
882         if (cnt == 0) {
883                 talloc_free(client);
884                 return;
885         }
886
887         CTDB_INCREMENT_STAT(client->ctdb, client_packets_recv);
888
889         if (cnt < sizeof(*hdr)) {
890                 ctdb_set_error(client->ctdb, "Bad packet length %u in daemon\n", 
891                                (unsigned)cnt);
892                 return;
893         }
894         hdr = (struct ctdb_req_header *)data;
895         if (cnt != hdr->length) {
896                 ctdb_set_error(client->ctdb, "Bad header length %u expected %u\n in daemon", 
897                                (unsigned)hdr->length, (unsigned)cnt);
898                 return;
899         }
900
901         if (hdr->ctdb_magic != CTDB_MAGIC) {
902                 ctdb_set_error(client->ctdb, "Non CTDB packet rejected\n");
903                 return;
904         }
905
906         if (hdr->ctdb_version != CTDB_VERSION) {
907                 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
908                 return;
909         }
910
911         DEBUG(DEBUG_DEBUG,(__location__ " client request %u of type %u length %u from "
912                  "node %u to %u\n", hdr->reqid, hdr->operation, hdr->length,
913                  hdr->srcnode, hdr->destnode));
914
915         /* it is the responsibility of the incoming packet function to free 'data' */
916         daemon_incoming_packet(client, hdr);
917 }
918
919
920 static int ctdb_clientpid_destructor(struct ctdb_client_pid_list *client_pid)
921 {
922         if (client_pid->ctdb->client_pids != NULL) {
923                 DLIST_REMOVE(client_pid->ctdb->client_pids, client_pid);
924         }
925
926         return 0;
927 }
928
929
930 static void ctdb_accept_client(struct event_context *ev, struct fd_event *fde, 
931                          uint16_t flags, void *private_data)
932 {
933         struct sockaddr_un addr;
934         socklen_t len;
935         int fd;
936         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
937         struct ctdb_client *client;
938         struct ctdb_client_pid_list *client_pid;
939         pid_t peer_pid = 0;
940
941         memset(&addr, 0, sizeof(addr));
942         len = sizeof(addr);
943         fd = accept(ctdb->daemon.sd, (struct sockaddr *)&addr, &len);
944         if (fd == -1) {
945                 return;
946         }
947
948         set_nonblocking(fd);
949         set_close_on_exec(fd);
950
951         DEBUG(DEBUG_DEBUG,(__location__ " Created SOCKET FD:%d to connected child\n", fd));
952
953         client = talloc_zero(ctdb, struct ctdb_client);
954         if (ctdb_get_peer_pid(fd, &peer_pid) == 0) {
955                 DEBUG(DEBUG_INFO,("Connected client with pid:%u\n", (unsigned)peer_pid));
956         }
957
958         client->ctdb = ctdb;
959         client->fd = fd;
960         client->client_id = ctdb_reqid_new(ctdb, client);
961         client->pid = peer_pid;
962
963         client_pid = talloc(client, struct ctdb_client_pid_list);
964         if (client_pid == NULL) {
965                 DEBUG(DEBUG_ERR,("Failed to allocate client pid structure\n"));
966                 close(fd);
967                 talloc_free(client);
968                 return;
969         }               
970         client_pid->ctdb   = ctdb;
971         client_pid->pid    = peer_pid;
972         client_pid->client = client;
973
974         DLIST_ADD(ctdb->client_pids, client_pid);
975
976         client->queue = ctdb_queue_setup(ctdb, client, fd, CTDB_DS_ALIGNMENT, 
977                                          ctdb_daemon_read_cb, client,
978                                          "client-%u", client->pid);
979
980         talloc_set_destructor(client, ctdb_client_destructor);
981         talloc_set_destructor(client_pid, ctdb_clientpid_destructor);
982         CTDB_INCREMENT_STAT(ctdb, num_clients);
983 }
984
985
986
987 /*
988   create a unix domain socket and bind it
989   return a file descriptor open on the socket 
990 */
991 static int ux_socket_bind(struct ctdb_context *ctdb)
992 {
993         struct sockaddr_un addr;
994
995         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
996         if (ctdb->daemon.sd == -1) {
997                 return -1;
998         }
999
1000         set_close_on_exec(ctdb->daemon.sd);
1001         set_nonblocking(ctdb->daemon.sd);
1002
1003         memset(&addr, 0, sizeof(addr));
1004         addr.sun_family = AF_UNIX;
1005         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
1006
1007         if (bind(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
1008                 DEBUG(DEBUG_CRIT,("Unable to bind on ctdb socket '%s'\n", ctdb->daemon.name));
1009                 goto failed;
1010         }       
1011
1012         if (chown(ctdb->daemon.name, geteuid(), getegid()) != 0 ||
1013             chmod(ctdb->daemon.name, 0700) != 0) {
1014                 DEBUG(DEBUG_CRIT,("Unable to secure ctdb socket '%s', ctdb->daemon.name\n", ctdb->daemon.name));
1015                 goto failed;
1016         } 
1017
1018
1019         if (listen(ctdb->daemon.sd, 100) != 0) {
1020                 DEBUG(DEBUG_CRIT,("Unable to listen on ctdb socket '%s'\n", ctdb->daemon.name));
1021                 goto failed;
1022         }
1023
1024         return 0;
1025
1026 failed:
1027         close(ctdb->daemon.sd);
1028         ctdb->daemon.sd = -1;
1029         return -1;      
1030 }
1031
1032 static void sig_child_handler(struct event_context *ev,
1033         struct signal_event *se, int signum, int count,
1034         void *dont_care, 
1035         void *private_data)
1036 {
1037 //      struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
1038         int status;
1039         pid_t pid = -1;
1040
1041         while (pid != 0) {
1042                 pid = waitpid(-1, &status, WNOHANG);
1043                 if (pid == -1) {
1044                         DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%d\n", errno));
1045                         return;
1046                 }
1047                 if (pid > 0) {
1048                         DEBUG(DEBUG_DEBUG, ("SIGCHLD from %d\n", (int)pid));
1049                 }
1050         }
1051 }
1052
1053 static void ctdb_setup_event_callback(struct ctdb_context *ctdb, int status,
1054                                       void *private_data)
1055 {
1056         if (status != 0) {
1057                 ctdb_fatal(ctdb, "Failed to run setup event\n");
1058                 return;
1059         }
1060         ctdb_run_notification_script(ctdb, "setup");
1061
1062         /* tell all other nodes we've just started up */
1063         ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL,
1064                                  0, CTDB_CONTROL_STARTUP, 0,
1065                                  CTDB_CTRL_FLAG_NOREPLY,
1066                                  tdb_null, NULL, NULL);
1067 }
1068
1069 /*
1070   start the protocol going as a daemon
1071 */
1072 int ctdb_start_daemon(struct ctdb_context *ctdb, bool do_fork, bool use_syslog, const char *public_address_list)
1073 {
1074         int res, ret = -1;
1075         struct fd_event *fde;
1076         const char *domain_socket_name;
1077         struct signal_event *se;
1078
1079         /* get rid of any old sockets */
1080         unlink(ctdb->daemon.name);
1081
1082         /* create a unix domain stream socket to listen to */
1083         res = ux_socket_bind(ctdb);
1084         if (res!=0) {
1085                 DEBUG(DEBUG_ALERT,(__location__ " Failed to open CTDB unix domain socket\n"));
1086                 exit(10);
1087         }
1088
1089         if (do_fork && fork()) {
1090                 return 0;
1091         }
1092
1093         tdb_reopen_all(False);
1094
1095         if (do_fork) {
1096                 setsid();
1097                 close(0);
1098                 if (open("/dev/null", O_RDONLY) != 0) {
1099                         DEBUG(DEBUG_ALERT,(__location__ " Failed to setup stdin on /dev/null\n"));
1100                         exit(11);
1101                 }
1102         }
1103         block_signal(SIGPIPE);
1104
1105         ctdbd_pid = getpid();
1106         ctdb->ctdbd_pid = ctdbd_pid;
1107
1108
1109         DEBUG(DEBUG_ERR, ("Starting CTDBD as pid : %u\n", ctdbd_pid));
1110
1111         if (ctdb->do_setsched) {
1112                 /* try to set us up as realtime */
1113                 ctdb_set_scheduler(ctdb);
1114         }
1115
1116         /* ensure the socket is deleted on exit of the daemon */
1117         domain_socket_name = talloc_strdup(talloc_autofree_context(), ctdb->daemon.name);
1118         if (domain_socket_name == NULL) {
1119                 DEBUG(DEBUG_ALERT,(__location__ " talloc_strdup failed.\n"));
1120                 exit(12);
1121         }
1122
1123         ctdb->ev = event_context_init(NULL);
1124         tevent_loop_allow_nesting(ctdb->ev);
1125         ret = ctdb_init_tevent_logging(ctdb);
1126         if (ret != 0) {
1127                 DEBUG(DEBUG_ALERT,("Failed to initialize TEVENT logging\n"));
1128                 exit(1);
1129         }
1130
1131         ctdb_set_child_logging(ctdb);
1132
1133         /* initialize statistics collection */
1134         ctdb_statistics_init(ctdb);
1135
1136         /* force initial recovery for election */
1137         ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
1138
1139         if (strcmp(ctdb->transport, "tcp") == 0) {
1140                 int ctdb_tcp_init(struct ctdb_context *);
1141                 ret = ctdb_tcp_init(ctdb);
1142         }
1143 #ifdef USE_INFINIBAND
1144         if (strcmp(ctdb->transport, "ib") == 0) {
1145                 int ctdb_ibw_init(struct ctdb_context *);
1146                 ret = ctdb_ibw_init(ctdb);
1147         }
1148 #endif
1149         if (ret != 0) {
1150                 DEBUG(DEBUG_ERR,("Failed to initialise transport '%s'\n", ctdb->transport));
1151                 return -1;
1152         }
1153
1154         if (ctdb->methods == NULL) {
1155                 DEBUG(DEBUG_ALERT,(__location__ " Can not initialize transport. ctdb->methods is NULL\n"));
1156                 ctdb_fatal(ctdb, "transport is unavailable. can not initialize.");
1157         }
1158
1159         /* initialise the transport  */
1160         if (ctdb->methods->initialise(ctdb) != 0) {
1161                 ctdb_fatal(ctdb, "transport failed to initialise");
1162         }
1163         if (public_address_list) {
1164                 ret = ctdb_set_public_addresses(ctdb, public_address_list);
1165                 if (ret == -1) {
1166                         DEBUG(DEBUG_ALERT,("Unable to setup public address list\n"));
1167                         exit(1);
1168                 }
1169         }
1170
1171
1172         /* attach to existing databases */
1173         if (ctdb_attach_databases(ctdb) != 0) {
1174                 ctdb_fatal(ctdb, "Failed to attach to databases\n");
1175         }
1176
1177         ret = ctdb_event_script(ctdb, CTDB_EVENT_INIT);
1178         if (ret != 0) {
1179                 ctdb_fatal(ctdb, "Failed to run init event\n");
1180         }
1181         ctdb_run_notification_script(ctdb, "init");
1182
1183         /* start frozen, then let the first election sort things out */
1184         if (ctdb_blocking_freeze(ctdb)) {
1185                 ctdb_fatal(ctdb, "Failed to get initial freeze\n");
1186         }
1187
1188         /* now start accepting clients, only can do this once frozen */
1189         fde = event_add_fd(ctdb->ev, ctdb, ctdb->daemon.sd, 
1190                            EVENT_FD_READ,
1191                            ctdb_accept_client, ctdb);
1192         tevent_fd_set_auto_close(fde);
1193
1194         /* release any IPs we hold from previous runs of the daemon */
1195         if (ctdb->tunable.disable_ip_failover == 0) {
1196                 ctdb_release_all_ips(ctdb);
1197         }
1198
1199         /* start the transport going */
1200         ctdb_start_transport(ctdb);
1201
1202         /* set up a handler to pick up sigchld */
1203         se = event_add_signal(ctdb->ev, ctdb,
1204                                      SIGCHLD, 0,
1205                                      sig_child_handler,
1206                                      ctdb);
1207         if (se == NULL) {
1208                 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD\n"));
1209                 exit(1);
1210         }
1211
1212         ret = ctdb_event_script_callback(ctdb,
1213                                          ctdb,
1214                                          ctdb_setup_event_callback,
1215                                          ctdb,
1216                                          false,
1217                                          CTDB_EVENT_SETUP,
1218                                          "%s",
1219                                          "");
1220         if (ret != 0) {
1221                 DEBUG(DEBUG_CRIT,("Failed to set up 'setup' event\n"));
1222                 exit(1);
1223         }
1224
1225         if (use_syslog) {
1226                 if (start_syslog_daemon(ctdb)) {
1227                         DEBUG(DEBUG_CRIT, ("Failed to start syslog daemon\n"));
1228                         exit(10);
1229                 }
1230         }
1231
1232         ctdb_lockdown_memory(ctdb);
1233           
1234         /* go into a wait loop to allow other nodes to complete */
1235         event_loop_wait(ctdb->ev);
1236
1237         DEBUG(DEBUG_CRIT,("event_loop_wait() returned. this should not happen\n"));
1238         exit(1);
1239 }
1240
1241 /*
1242   allocate a packet for use in daemon<->daemon communication
1243  */
1244 struct ctdb_req_header *_ctdb_transport_allocate(struct ctdb_context *ctdb,
1245                                                  TALLOC_CTX *mem_ctx, 
1246                                                  enum ctdb_operation operation, 
1247                                                  size_t length, size_t slength,
1248                                                  const char *type)
1249 {
1250         int size;
1251         struct ctdb_req_header *hdr;
1252
1253         length = MAX(length, slength);
1254         size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
1255
1256         if (ctdb->methods == NULL) {
1257                 DEBUG(DEBUG_INFO,(__location__ " Unable to allocate transport packet for operation %u of length %u. Transport is DOWN.\n",
1258                          operation, (unsigned)length));
1259                 return NULL;
1260         }
1261
1262         hdr = (struct ctdb_req_header *)ctdb->methods->allocate_pkt(mem_ctx, size);
1263         if (hdr == NULL) {
1264                 DEBUG(DEBUG_ERR,("Unable to allocate transport packet for operation %u of length %u\n",
1265                          operation, (unsigned)length));
1266                 return NULL;
1267         }
1268         talloc_set_name_const(hdr, type);
1269         memset(hdr, 0, slength);
1270         hdr->length       = length;
1271         hdr->operation    = operation;
1272         hdr->ctdb_magic   = CTDB_MAGIC;
1273         hdr->ctdb_version = CTDB_VERSION;
1274         hdr->generation   = ctdb->vnn_map->generation;
1275         hdr->srcnode      = ctdb->pnn;
1276
1277         return hdr;     
1278 }
1279
1280 struct daemon_control_state {
1281         struct daemon_control_state *next, *prev;
1282         struct ctdb_client *client;
1283         struct ctdb_req_control *c;
1284         uint32_t reqid;
1285         struct ctdb_node *node;
1286 };
1287
1288 /*
1289   callback when a control reply comes in
1290  */
1291 static void daemon_control_callback(struct ctdb_context *ctdb,
1292                                     int32_t status, TDB_DATA data, 
1293                                     const char *errormsg,
1294                                     void *private_data)
1295 {
1296         struct daemon_control_state *state = talloc_get_type(private_data, 
1297                                                              struct daemon_control_state);
1298         struct ctdb_client *client = state->client;
1299         struct ctdb_reply_control *r;
1300         size_t len;
1301         int ret;
1302
1303         /* construct a message to send to the client containing the data */
1304         len = offsetof(struct ctdb_reply_control, data) + data.dsize;
1305         if (errormsg) {
1306                 len += strlen(errormsg);
1307         }
1308         r = ctdbd_allocate_pkt(ctdb, state, CTDB_REPLY_CONTROL, len, 
1309                                struct ctdb_reply_control);
1310         CTDB_NO_MEMORY_VOID(ctdb, r);
1311
1312         r->hdr.reqid     = state->reqid;
1313         r->status        = status;
1314         r->datalen       = data.dsize;
1315         r->errorlen = 0;
1316         memcpy(&r->data[0], data.dptr, data.dsize);
1317         if (errormsg) {
1318                 r->errorlen = strlen(errormsg);
1319                 memcpy(&r->data[r->datalen], errormsg, r->errorlen);
1320         }
1321
1322         ret = daemon_queue_send(client, &r->hdr);
1323         if (ret != -1) {
1324                 talloc_free(state);
1325         }
1326 }
1327
1328 /*
1329   fail all pending controls to a disconnected node
1330  */
1331 void ctdb_daemon_cancel_controls(struct ctdb_context *ctdb, struct ctdb_node *node)
1332 {
1333         struct daemon_control_state *state;
1334         while ((state = node->pending_controls)) {
1335                 DLIST_REMOVE(node->pending_controls, state);
1336                 daemon_control_callback(ctdb, (uint32_t)-1, tdb_null, 
1337                                         "node is disconnected", state);
1338         }
1339 }
1340
1341 /*
1342   destroy a daemon_control_state
1343  */
1344 static int daemon_control_destructor(struct daemon_control_state *state)
1345 {
1346         if (state->node) {
1347                 DLIST_REMOVE(state->node->pending_controls, state);
1348         }
1349         return 0;
1350 }
1351
1352 /*
1353   this is called when the ctdb daemon received a ctdb request control
1354   from a local client over the unix domain socket
1355  */
1356 static void daemon_request_control_from_client(struct ctdb_client *client, 
1357                                                struct ctdb_req_control *c)
1358 {
1359         TDB_DATA data;
1360         int res;
1361         struct daemon_control_state *state;
1362         TALLOC_CTX *tmp_ctx = talloc_new(client);
1363
1364         if (c->hdr.destnode == CTDB_CURRENT_NODE) {
1365                 c->hdr.destnode = client->ctdb->pnn;
1366         }
1367
1368         state = talloc(client, struct daemon_control_state);
1369         CTDB_NO_MEMORY_VOID(client->ctdb, state);
1370
1371         state->client = client;
1372         state->c = talloc_steal(state, c);
1373         state->reqid = c->hdr.reqid;
1374         if (ctdb_validate_pnn(client->ctdb, c->hdr.destnode)) {
1375                 state->node = client->ctdb->nodes[c->hdr.destnode];
1376                 DLIST_ADD(state->node->pending_controls, state);
1377         } else {
1378                 state->node = NULL;
1379         }
1380
1381         talloc_set_destructor(state, daemon_control_destructor);
1382
1383         if (c->flags & CTDB_CTRL_FLAG_NOREPLY) {
1384                 talloc_steal(tmp_ctx, state);
1385         }
1386         
1387         data.dptr = &c->data[0];
1388         data.dsize = c->datalen;
1389         res = ctdb_daemon_send_control(client->ctdb, c->hdr.destnode,
1390                                        c->srvid, c->opcode, client->client_id,
1391                                        c->flags,
1392                                        data, daemon_control_callback,
1393                                        state);
1394         if (res != 0) {
1395                 DEBUG(DEBUG_ERR,(__location__ " Failed to send control to remote node %u\n",
1396                          c->hdr.destnode));
1397         }
1398
1399         talloc_free(tmp_ctx);
1400 }
1401
1402 /*
1403   register a call function
1404 */
1405 int ctdb_daemon_set_call(struct ctdb_context *ctdb, uint32_t db_id,
1406                          ctdb_fn_t fn, int id)
1407 {
1408         struct ctdb_registered_call *call;
1409         struct ctdb_db_context *ctdb_db;
1410
1411         ctdb_db = find_ctdb_db(ctdb, db_id);
1412         if (ctdb_db == NULL) {
1413                 return -1;
1414         }
1415
1416         call = talloc(ctdb_db, struct ctdb_registered_call);
1417         call->fn = fn;
1418         call->id = id;
1419
1420         DLIST_ADD(ctdb_db->calls, call);        
1421         return 0;
1422 }
1423
1424
1425
1426 /*
1427   this local messaging handler is ugly, but is needed to prevent
1428   recursion in ctdb_send_message() when the destination node is the
1429   same as the source node
1430  */
1431 struct ctdb_local_message {
1432         struct ctdb_context *ctdb;
1433         uint64_t srvid;
1434         TDB_DATA data;
1435 };
1436
1437 static void ctdb_local_message_trigger(struct event_context *ev, struct timed_event *te, 
1438                                        struct timeval t, void *private_data)
1439 {
1440         struct ctdb_local_message *m = talloc_get_type(private_data, 
1441                                                        struct ctdb_local_message);
1442         int res;
1443
1444         res = ctdb_dispatch_message(m->ctdb, m->srvid, m->data);
1445         if (res != 0) {
1446                 DEBUG(DEBUG_ERR, (__location__ " Failed to dispatch message for srvid=%llu\n", 
1447                           (unsigned long long)m->srvid));
1448         }
1449         talloc_free(m);
1450 }
1451
1452 static int ctdb_local_message(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data)
1453 {
1454         struct ctdb_local_message *m;
1455         m = talloc(ctdb, struct ctdb_local_message);
1456         CTDB_NO_MEMORY(ctdb, m);
1457
1458         m->ctdb = ctdb;
1459         m->srvid = srvid;
1460         m->data  = data;
1461         m->data.dptr = talloc_memdup(m, m->data.dptr, m->data.dsize);
1462         if (m->data.dptr == NULL) {
1463                 talloc_free(m);
1464                 return -1;
1465         }
1466
1467         /* this needs to be done as an event to prevent recursion */
1468         event_add_timed(ctdb->ev, m, timeval_zero(), ctdb_local_message_trigger, m);
1469         return 0;
1470 }
1471
1472 /*
1473   send a ctdb message
1474 */
1475 int ctdb_daemon_send_message(struct ctdb_context *ctdb, uint32_t pnn,
1476                              uint64_t srvid, TDB_DATA data)
1477 {
1478         struct ctdb_req_message *r;
1479         int len;
1480
1481         if (ctdb->methods == NULL) {
1482                 DEBUG(DEBUG_INFO,(__location__ " Failed to send message. Transport is DOWN\n"));
1483                 return -1;
1484         }
1485
1486         /* see if this is a message to ourselves */
1487         if (pnn == ctdb->pnn) {
1488                 return ctdb_local_message(ctdb, srvid, data);
1489         }
1490
1491         len = offsetof(struct ctdb_req_message, data) + data.dsize;
1492         r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_MESSAGE, len,
1493                                     struct ctdb_req_message);
1494         CTDB_NO_MEMORY(ctdb, r);
1495
1496         r->hdr.destnode  = pnn;
1497         r->srvid         = srvid;
1498         r->datalen       = data.dsize;
1499         memcpy(&r->data[0], data.dptr, data.dsize);
1500
1501         ctdb_queue_packet(ctdb, &r->hdr);
1502
1503         talloc_free(r);
1504         return 0;
1505 }
1506
1507
1508
1509 struct ctdb_client_notify_list {
1510         struct ctdb_client_notify_list *next, *prev;
1511         struct ctdb_context *ctdb;
1512         uint64_t srvid;
1513         TDB_DATA data;
1514 };
1515
1516
1517 static int ctdb_client_notify_destructor(struct ctdb_client_notify_list *nl)
1518 {
1519         int ret;
1520
1521         DEBUG(DEBUG_ERR,("Sending client notify message for srvid:%llu\n", (unsigned long long)nl->srvid));
1522
1523         ret = ctdb_daemon_send_message(nl->ctdb, CTDB_BROADCAST_CONNECTED, (unsigned long long)nl->srvid, nl->data);
1524         if (ret != 0) {
1525                 DEBUG(DEBUG_ERR,("Failed to send client notify message\n"));
1526         }
1527
1528         return 0;
1529 }
1530
1531 int32_t ctdb_control_register_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1532 {
1533         struct ctdb_client_notify_register *notify = (struct ctdb_client_notify_register *)indata.dptr;
1534         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client); 
1535         struct ctdb_client_notify_list *nl;
1536
1537         DEBUG(DEBUG_INFO,("Register srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
1538
1539         if (indata.dsize < offsetof(struct ctdb_client_notify_register, notify_data)) {
1540                 DEBUG(DEBUG_ERR,(__location__ " Too little data in control : %d\n", (int)indata.dsize));
1541                 return -1;
1542         }
1543
1544         if (indata.dsize != (notify->len + offsetof(struct ctdb_client_notify_register, notify_data))) {
1545                 DEBUG(DEBUG_ERR,(__location__ " Wrong amount of data in control. Got %d, expected %d\n", (int)indata.dsize, (int)(notify->len + offsetof(struct ctdb_client_notify_register, notify_data))));
1546                 return -1;
1547         }
1548
1549
1550         if (client == NULL) {
1551                 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1552                 return -1;
1553         }
1554
1555         for(nl=client->notify; nl; nl=nl->next) {
1556                 if (nl->srvid == notify->srvid) {
1557                         break;
1558                 }
1559         }
1560         if (nl != NULL) {
1561                 DEBUG(DEBUG_ERR,(__location__ " Notification for srvid:%llu already exists for this client\n", (unsigned long long)notify->srvid));
1562                 return -1;
1563         }
1564
1565         nl = talloc(client, struct ctdb_client_notify_list);
1566         CTDB_NO_MEMORY(ctdb, nl);
1567         nl->ctdb       = ctdb;
1568         nl->srvid      = notify->srvid;
1569         nl->data.dsize = notify->len;
1570         nl->data.dptr  = talloc_size(nl, nl->data.dsize);
1571         CTDB_NO_MEMORY(ctdb, nl->data.dptr);
1572         memcpy(nl->data.dptr, notify->notify_data, nl->data.dsize);
1573         
1574         DLIST_ADD(client->notify, nl);
1575         talloc_set_destructor(nl, ctdb_client_notify_destructor);
1576
1577         return 0;
1578 }
1579
1580 int32_t ctdb_control_deregister_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1581 {
1582         struct ctdb_client_notify_deregister *notify = (struct ctdb_client_notify_deregister *)indata.dptr;
1583         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client); 
1584         struct ctdb_client_notify_list *nl;
1585
1586         DEBUG(DEBUG_INFO,("Deregister srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
1587
1588         if (client == NULL) {
1589                 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1590                 return -1;
1591         }
1592
1593         for(nl=client->notify; nl; nl=nl->next) {
1594                 if (nl->srvid == notify->srvid) {
1595                         break;
1596                 }
1597         }
1598         if (nl == NULL) {
1599                 DEBUG(DEBUG_ERR,(__location__ " No notification for srvid:%llu found for this client\n", (unsigned long long)notify->srvid));
1600                 return -1;
1601         }
1602
1603         DLIST_REMOVE(client->notify, nl);
1604         talloc_set_destructor(nl, NULL);
1605         talloc_free(nl);
1606
1607         return 0;
1608 }
1609
1610 struct ctdb_client *ctdb_find_client_by_pid(struct ctdb_context *ctdb, pid_t pid)
1611 {
1612         struct ctdb_client_pid_list *client_pid;
1613
1614         for (client_pid = ctdb->client_pids; client_pid; client_pid=client_pid->next) {
1615                 if (client_pid->pid == pid) {
1616                         return client_pid->client;
1617                 }
1618         }
1619         return NULL;
1620 }
1621
1622
1623 /* This control is used by samba when probing if a process (of a samba daemon)
1624    exists on the node.
1625    Samba does this when it needs/wants to check if a subrecord in one of the
1626    databases is still valied, or if it is stale and can be removed.
1627    If the node is in unhealthy or stopped state we just kill of the samba
1628    process holding htis sub-record and return to the calling samba that
1629    the process does not exist.
1630    This allows us to forcefully recall subrecords registered by samba processes
1631    on banned and stopped nodes.
1632 */
1633 int32_t ctdb_control_process_exists(struct ctdb_context *ctdb, pid_t pid)
1634 {
1635         struct ctdb_client *client;
1636
1637         if (ctdb->nodes[ctdb->pnn]->flags & (NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
1638                 client = ctdb_find_client_by_pid(ctdb, pid);
1639                 if (client != NULL) {
1640                         DEBUG(DEBUG_NOTICE,(__location__ " Killing client with pid:%d on banned/stopped node\n", (int)pid));
1641                         talloc_free(client);
1642                 }
1643                 return -1;
1644         }
1645
1646         return kill(pid, 0);
1647 }