Move platform-specific code to common/system_*
[ctdb.git] / server / ctdb_daemon.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2006
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "db_wrap.h"
22 #include "lib/tdb/include/tdb.h"
23 #include "lib/tevent/tevent.h"
24 #include "lib/util/dlinklist.h"
25 #include "system/network.h"
26 #include "system/filesys.h"
27 #include "system/wait.h"
28 #include "../include/ctdb_client.h"
29 #include "../include/ctdb_private.h"
30 #include "../common/rb_tree.h"
31 #include <sys/socket.h>
32
33 struct ctdb_client_pid_list {
34         struct ctdb_client_pid_list *next, *prev;
35         struct ctdb_context *ctdb;
36         pid_t pid;
37         struct ctdb_client *client;
38 };
39
40 static void daemon_incoming_packet(void *, struct ctdb_req_header *);
41
42 static void print_exit_message(void)
43 {
44         DEBUG(DEBUG_NOTICE,("CTDB daemon shutting down\n"));
45 }
46
47
48
49 static void ctdb_time_tick(struct event_context *ev, struct timed_event *te, 
50                                   struct timeval t, void *private_data)
51 {
52         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
53
54         if (getpid() != ctdbd_pid) {
55                 return;
56         }
57
58         event_add_timed(ctdb->ev, ctdb, 
59                         timeval_current_ofs(1, 0), 
60                         ctdb_time_tick, ctdb);
61 }
62
63 /* Used to trigger a dummy event once per second, to make
64  * detection of hangs more reliable.
65  */
66 static void ctdb_start_time_tickd(struct ctdb_context *ctdb)
67 {
68         event_add_timed(ctdb->ev, ctdb, 
69                         timeval_current_ofs(1, 0), 
70                         ctdb_time_tick, ctdb);
71 }
72
73
74 /* called when the "startup" event script has finished */
75 static void ctdb_start_transport(struct ctdb_context *ctdb)
76 {
77         if (ctdb->methods == NULL) {
78                 DEBUG(DEBUG_ALERT,(__location__ " startup event finished but transport is DOWN.\n"));
79                 ctdb_fatal(ctdb, "transport is not initialized but startup completed");
80         }
81
82         /* start the transport running */
83         if (ctdb->methods->start(ctdb) != 0) {
84                 DEBUG(DEBUG_ALERT,("transport failed to start!\n"));
85                 ctdb_fatal(ctdb, "transport failed to start");
86         }
87
88         /* start the recovery daemon process */
89         if (ctdb_start_recoverd(ctdb) != 0) {
90                 DEBUG(DEBUG_ALERT,("Failed to start recovery daemon\n"));
91                 exit(11);
92         }
93
94         /* Make sure we log something when the daemon terminates */
95         atexit(print_exit_message);
96
97         /* start monitoring for connected/disconnected nodes */
98         ctdb_start_keepalive(ctdb);
99
100         /* start monitoring for node health */
101         ctdb_start_monitoring(ctdb);
102
103         /* start periodic update of tcp tickle lists */
104         ctdb_start_tcp_tickle_update(ctdb);
105
106         /* start listening for recovery daemon pings */
107         ctdb_control_recd_ping(ctdb);
108
109         /* start listening to timer ticks */
110         ctdb_start_time_tickd(ctdb);
111 }
112
113 static void block_signal(int signum)
114 {
115         struct sigaction act;
116
117         memset(&act, 0, sizeof(act));
118
119         act.sa_handler = SIG_IGN;
120         sigemptyset(&act.sa_mask);
121         sigaddset(&act.sa_mask, signum);
122         sigaction(signum, &act, NULL);
123 }
124
125
126 /*
127   send a packet to a client
128  */
129 static int daemon_queue_send(struct ctdb_client *client, struct ctdb_req_header *hdr)
130 {
131         CTDB_INCREMENT_STAT(client->ctdb, client_packets_sent);
132         if (hdr->operation == CTDB_REQ_MESSAGE) {
133                 if (ctdb_queue_length(client->queue) > client->ctdb->tunable.max_queue_depth_drop_msg) {
134                         DEBUG(DEBUG_ERR,("CTDB_REQ_MESSAGE queue full - killing client connection.\n"));
135                         talloc_free(client);
136                         return -1;
137                 }
138         }
139         return ctdb_queue_send(client->queue, (uint8_t *)hdr, hdr->length);
140 }
141
142 /*
143   message handler for when we are in daemon mode. This redirects the message
144   to the right client
145  */
146 static void daemon_message_handler(struct ctdb_context *ctdb, uint64_t srvid, 
147                                     TDB_DATA data, void *private_data)
148 {
149         struct ctdb_client *client = talloc_get_type(private_data, struct ctdb_client);
150         struct ctdb_req_message *r;
151         int len;
152
153         /* construct a message to send to the client containing the data */
154         len = offsetof(struct ctdb_req_message, data) + data.dsize;
155         r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE, 
156                                len, struct ctdb_req_message);
157         CTDB_NO_MEMORY_VOID(ctdb, r);
158
159         talloc_set_name_const(r, "req_message packet");
160
161         r->srvid         = srvid;
162         r->datalen       = data.dsize;
163         memcpy(&r->data[0], data.dptr, data.dsize);
164
165         daemon_queue_send(client, &r->hdr);
166
167         talloc_free(r);
168 }
169
170 /*
171   this is called when the ctdb daemon received a ctdb request to 
172   set the srvid from the client
173  */
174 int daemon_register_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
175 {
176         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
177         int res;
178         if (client == NULL) {
179                 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_register_message_handler\n"));
180                 return -1;
181         }
182         res = ctdb_register_message_handler(ctdb, client, srvid, daemon_message_handler, client);
183         if (res != 0) {
184                 DEBUG(DEBUG_ERR,(__location__ " Failed to register handler %llu in daemon\n", 
185                          (unsigned long long)srvid));
186         } else {
187                 DEBUG(DEBUG_INFO,(__location__ " Registered message handler for srvid=%llu\n", 
188                          (unsigned long long)srvid));
189         }
190
191         return res;
192 }
193
194 /*
195   this is called when the ctdb daemon received a ctdb request to 
196   remove a srvid from the client
197  */
198 int daemon_deregister_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
199 {
200         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
201         if (client == NULL) {
202                 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_deregister_message_handler\n"));
203                 return -1;
204         }
205         return ctdb_deregister_message_handler(ctdb, srvid, client);
206 }
207
208 int daemon_check_srvids(struct ctdb_context *ctdb, TDB_DATA indata,
209                         TDB_DATA *outdata)
210 {
211         uint64_t *ids;
212         int i, num_ids;
213         uint8_t *results;
214
215         if ((indata.dsize % sizeof(uint64_t)) != 0) {
216                 DEBUG(DEBUG_ERR, ("Bad indata in daemon_check_srvids, "
217                                   "size=%d\n", (int)indata.dsize));
218                 return -1;
219         }
220
221         ids = (uint64_t *)indata.dptr;
222         num_ids = indata.dsize / 8;
223
224         results = talloc_zero_array(outdata, uint8_t, (num_ids+7)/8);
225         if (results == NULL) {
226                 DEBUG(DEBUG_ERR, ("talloc failed in daemon_check_srvids\n"));
227                 return -1;
228         }
229         for (i=0; i<num_ids; i++) {
230                 struct ctdb_message_list *ml;
231                 for (ml=ctdb->message_list; ml; ml=ml->next) {
232                         if (ml->srvid == ids[i]) {
233                                 break;
234                         }
235                 }
236                 if (ml != NULL) {
237                         results[i/8] |= (1 << (i%8));
238                 }
239         }
240         outdata->dptr = (uint8_t *)results;
241         outdata->dsize = talloc_get_size(results);
242         return 0;
243 }
244
245 /*
246   destroy a ctdb_client
247 */
248 static int ctdb_client_destructor(struct ctdb_client *client)
249 {
250         struct ctdb_db_context *ctdb_db;
251
252         ctdb_takeover_client_destructor_hook(client);
253         ctdb_reqid_remove(client->ctdb, client->client_id);
254         CTDB_DECREMENT_STAT(client->ctdb, num_clients);
255
256         if (client->num_persistent_updates != 0) {
257                 DEBUG(DEBUG_ERR,(__location__ " Client disconnecting with %u persistent updates in flight. Starting recovery\n", client->num_persistent_updates));
258                 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
259         }
260         ctdb_db = find_ctdb_db(client->ctdb, client->db_id);
261         if (ctdb_db) {
262                 DEBUG(DEBUG_ERR, (__location__ " client exit while transaction "
263                                   "commit active. Forcing recovery.\n"));
264                 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
265
266                 /* legacy trans2 transaction state: */
267                 ctdb_db->transaction_active = false;
268
269                 /*
270                  * trans3 transaction state:
271                  *
272                  * The destructor sets the pointer to NULL.
273                  */
274                 talloc_free(ctdb_db->persistent_state);
275         }
276
277         return 0;
278 }
279
280
281 /*
282   this is called when the ctdb daemon received a ctdb request message
283   from a local client over the unix domain socket
284  */
285 static void daemon_request_message_from_client(struct ctdb_client *client, 
286                                                struct ctdb_req_message *c)
287 {
288         TDB_DATA data;
289         int res;
290
291         /* maybe the message is for another client on this node */
292         if (ctdb_get_pnn(client->ctdb)==c->hdr.destnode) {
293                 ctdb_request_message(client->ctdb, (struct ctdb_req_header *)c);
294                 return;
295         }
296
297         /* its for a remote node */
298         data.dptr = &c->data[0];
299         data.dsize = c->datalen;
300         res = ctdb_daemon_send_message(client->ctdb, c->hdr.destnode,
301                                        c->srvid, data);
302         if (res != 0) {
303                 DEBUG(DEBUG_ERR,(__location__ " Failed to send message to remote node %u\n",
304                          c->hdr.destnode));
305         }
306 }
307
308
309 struct daemon_call_state {
310         struct ctdb_client *client;
311         uint32_t reqid;
312         struct ctdb_call *call;
313         struct timeval start_time;
314 };
315
316 /* 
317    complete a call from a client 
318 */
319 static void daemon_call_from_client_callback(struct ctdb_call_state *state)
320 {
321         struct daemon_call_state *dstate = talloc_get_type(state->async.private_data, 
322                                                            struct daemon_call_state);
323         struct ctdb_reply_call *r;
324         int res;
325         uint32_t length;
326         struct ctdb_client *client = dstate->client;
327         struct ctdb_db_context *ctdb_db = state->ctdb_db;
328
329         talloc_steal(client, dstate);
330         talloc_steal(dstate, dstate->call);
331
332         res = ctdb_daemon_call_recv(state, dstate->call);
333         if (res != 0) {
334                 DEBUG(DEBUG_ERR, (__location__ " ctdbd_call_recv() returned error\n"));
335                 CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
336
337                 CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 1", call_latency, dstate->start_time);
338                 return;
339         }
340
341         length = offsetof(struct ctdb_reply_call, data) + dstate->call->reply_data.dsize;
342         r = ctdbd_allocate_pkt(client->ctdb, dstate, CTDB_REPLY_CALL, 
343                                length, struct ctdb_reply_call);
344         if (r == NULL) {
345                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate reply_call in ctdb daemon\n"));
346                 CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
347                 CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 2", call_latency, dstate->start_time);
348                 return;
349         }
350         r->hdr.reqid        = dstate->reqid;
351         r->datalen          = dstate->call->reply_data.dsize;
352         r->status           = dstate->call->status;
353         memcpy(&r->data[0], dstate->call->reply_data.dptr, r->datalen);
354
355         res = daemon_queue_send(client, &r->hdr);
356         if (res == -1) {
357                 /* client is dead - return immediately */
358                 return;
359         }
360         if (res != 0) {
361                 DEBUG(DEBUG_ERR, (__location__ " Failed to queue packet from daemon to client\n"));
362         }
363         CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 3", call_latency, dstate->start_time);
364         CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
365         talloc_free(dstate);
366 }
367
368 struct ctdb_daemon_packet_wrap {
369         struct ctdb_context *ctdb;
370         uint32_t client_id;
371 };
372
373 /*
374   a wrapper to catch disconnected clients
375  */
376 static void daemon_incoming_packet_wrap(void *p, struct ctdb_req_header *hdr)
377 {
378         struct ctdb_client *client;
379         struct ctdb_daemon_packet_wrap *w = talloc_get_type(p, 
380                                                             struct ctdb_daemon_packet_wrap);
381         if (w == NULL) {
382                 DEBUG(DEBUG_CRIT,(__location__ " Bad packet type '%s'\n", talloc_get_name(p)));
383                 return;
384         }
385
386         client = ctdb_reqid_find(w->ctdb, w->client_id, struct ctdb_client);
387         if (client == NULL) {
388                 DEBUG(DEBUG_ERR,(__location__ " Packet for disconnected client %u\n",
389                          w->client_id));
390                 talloc_free(w);
391                 return;
392         }
393         talloc_free(w);
394
395         /* process it */
396         daemon_incoming_packet(client, hdr);    
397 }
398
399 struct ctdb_deferred_fetch_call {
400         struct ctdb_deferred_fetch_call *next, *prev;
401         struct ctdb_req_call *c;
402         struct ctdb_daemon_packet_wrap *w;
403 };
404
405 struct ctdb_deferred_fetch_queue {
406         struct ctdb_deferred_fetch_call *deferred_calls;
407 };
408
409 struct ctdb_deferred_requeue {
410         struct ctdb_deferred_fetch_call *dfc;
411         struct ctdb_client *client;
412 };
413
414 /* called from a timer event and starts reprocessing the deferred call.*/
415 static void reprocess_deferred_call(struct event_context *ev, struct timed_event *te, 
416                                        struct timeval t, void *private_data)
417 {
418         struct ctdb_deferred_requeue *dfr = (struct ctdb_deferred_requeue *)private_data;
419         struct ctdb_client *client = dfr->client;
420
421         talloc_steal(client, dfr->dfc->c);
422         daemon_incoming_packet(client, (struct ctdb_req_header *)dfr->dfc->c);
423         talloc_free(dfr);
424 }
425
426 /* the referral context is destroyed either after a timeout or when the initial
427    fetch-lock has finished.
428    at this stage, immediately start reprocessing the queued up deferred
429    calls so they get reprocessed immediately (and since we are dmaster at
430    this stage, trigger the waiting smbd processes to pick up and aquire the
431    record right away.
432 */
433 static int deferred_fetch_queue_destructor(struct ctdb_deferred_fetch_queue *dfq)
434 {
435
436         /* need to reprocess the packets from the queue explicitely instead of
437            just using a normal destructor since we want, need, to
438            call the clients in the same oder as the requests queued up
439         */
440         while (dfq->deferred_calls != NULL) {
441                 struct ctdb_client *client;
442                 struct ctdb_deferred_fetch_call *dfc = dfq->deferred_calls;
443                 struct ctdb_deferred_requeue *dfr;
444
445                 DLIST_REMOVE(dfq->deferred_calls, dfc);
446
447                 client = ctdb_reqid_find(dfc->w->ctdb, dfc->w->client_id, struct ctdb_client);
448                 if (client == NULL) {
449                         DEBUG(DEBUG_ERR,(__location__ " Packet for disconnected client %u\n",
450                                  dfc->w->client_id));
451                         continue;
452                 }
453
454                 /* process it by pushing it back onto the eventloop */
455                 dfr = talloc(client, struct ctdb_deferred_requeue);
456                 if (dfr == NULL) {
457                         DEBUG(DEBUG_ERR,("Failed to allocate deferred fetch requeue structure\n"));
458                         continue;
459                 }
460
461                 dfr->dfc    = talloc_steal(dfr, dfc);
462                 dfr->client = client;
463
464                 event_add_timed(dfc->w->ctdb->ev, client, timeval_zero(), reprocess_deferred_call, dfr);
465         }
466
467         return 0;
468 }
469
470 /* insert the new deferral context into the rb tree.
471    there should never be a pre-existing context here, but check for it
472    warn and destroy the previous context if there is already a deferral context
473    for this key.
474 */
475 static void *insert_dfq_callback(void *parm, void *data)
476 {
477         if (data) {
478                 DEBUG(DEBUG_ERR,("Already have DFQ registered. Free old %p and create new %p\n", data, parm));
479                 talloc_free(data);
480         }
481         return parm;
482 }
483
484 /* if the original fetch-lock did not complete within a reasonable time,
485    free the context and context for all deferred requests to cause them to be
486    re-inserted into the event system.
487 */
488 static void dfq_timeout(struct event_context *ev, struct timed_event *te, 
489                                   struct timeval t, void *private_data)
490 {
491         talloc_free(private_data);
492 }
493
494 /* This function is used in the local daemon to register a KEY in a database
495    for being "fetched"
496    While the remote fetch is in-flight, any futher attempts to re-fetch the
497    same record will be deferred until the fetch completes.
498 */
499 static int setup_deferred_fetch_locks(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
500 {
501         uint32_t *k;
502         struct ctdb_deferred_fetch_queue *dfq;
503
504         k = talloc_zero_size(call, ((call->key.dsize + 3) & 0xfffffffc) + 4);
505         if (k == NULL) {
506                 DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch\n"));
507                 return -1;
508         }
509
510         k[0] = (call->key.dsize + 3) / 4 + 1;
511         memcpy(&k[1], call->key.dptr, call->key.dsize);
512
513         dfq  = talloc(call, struct ctdb_deferred_fetch_queue);
514         if (dfq == NULL) {
515                 DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch queue structure\n"));
516                 talloc_free(k);
517                 return -1;
518         }
519         dfq->deferred_calls = NULL;
520
521         trbt_insertarray32_callback(ctdb_db->deferred_fetch, k[0], &k[0], insert_dfq_callback, dfq);
522
523         talloc_set_destructor(dfq, deferred_fetch_queue_destructor);
524
525         /* if the fetch havent completed in 30 seconds, just tear it all down
526            and let it try again as the events are reissued */
527         event_add_timed(ctdb_db->ctdb->ev, dfq, timeval_current_ofs(30, 0), dfq_timeout, dfq);
528
529         talloc_free(k);
530         return 0;
531 }
532
533 /* check if this is a duplicate request to a fetch already in-flight
534    if it is, make this call deferred to be reprocessed later when
535    the in-flight fetch completes.
536 */
537 static int requeue_duplicate_fetch(struct ctdb_db_context *ctdb_db, struct ctdb_client *client, TDB_DATA key, struct ctdb_req_call *c)
538 {
539         uint32_t *k;
540         struct ctdb_deferred_fetch_queue *dfq;
541         struct ctdb_deferred_fetch_call *dfc;
542
543         k = talloc_zero_size(c, ((key.dsize + 3) & 0xfffffffc) + 4);
544         if (k == NULL) {
545                 DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch\n"));
546                 return -1;
547         }
548
549         k[0] = (key.dsize + 3) / 4 + 1;
550         memcpy(&k[1], key.dptr, key.dsize);
551
552         dfq = trbt_lookuparray32(ctdb_db->deferred_fetch, k[0], &k[0]);
553         if (dfq == NULL) {
554                 talloc_free(k);
555                 return -1;
556         }
557
558
559         talloc_free(k);
560
561         dfc = talloc(dfq, struct ctdb_deferred_fetch_call);
562         if (dfc == NULL) {
563                 DEBUG(DEBUG_ERR, ("Failed to allocate deferred fetch call structure\n"));
564                 return -1;
565         }
566
567         dfc->w = talloc(dfc, struct ctdb_daemon_packet_wrap);
568         if (dfc->w == NULL) {
569                 DEBUG(DEBUG_ERR,("Failed to allocate deferred fetch daemon packet wrap structure\n"));
570                 talloc_free(dfc);
571                 return -1;
572         }
573
574         dfc->c = talloc_steal(dfc, c);
575         dfc->w->ctdb = ctdb_db->ctdb;
576         dfc->w->client_id = client->client_id;
577
578         DLIST_ADD_END(dfq->deferred_calls, dfc, NULL);
579
580         return 0;
581 }
582
583
584 /*
585   this is called when the ctdb daemon received a ctdb request call
586   from a local client over the unix domain socket
587  */
588 static void daemon_request_call_from_client(struct ctdb_client *client, 
589                                             struct ctdb_req_call *c)
590 {
591         struct ctdb_call_state *state;
592         struct ctdb_db_context *ctdb_db;
593         struct daemon_call_state *dstate;
594         struct ctdb_call *call;
595         struct ctdb_ltdb_header header;
596         TDB_DATA key, data;
597         int ret;
598         struct ctdb_context *ctdb = client->ctdb;
599         struct ctdb_daemon_packet_wrap *w;
600
601         CTDB_INCREMENT_STAT(ctdb, total_calls);
602         CTDB_DECREMENT_STAT(ctdb, pending_calls);
603
604         ctdb_db = find_ctdb_db(client->ctdb, c->db_id);
605         if (!ctdb_db) {
606                 DEBUG(DEBUG_ERR, (__location__ " Unknown database in request. db_id==0x%08x",
607                           c->db_id));
608                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
609                 return;
610         }
611
612         if (ctdb_db->unhealthy_reason) {
613                 /*
614                  * this is just a warning, as the tdb should be empty anyway,
615                  * and only persistent databases can be unhealthy, which doesn't
616                  * use this code patch
617                  */
618                 DEBUG(DEBUG_WARNING,("warn: db(%s) unhealty in daemon_request_call_from_client(): %s\n",
619                                      ctdb_db->db_name, ctdb_db->unhealthy_reason));
620         }
621
622         key.dptr = c->data;
623         key.dsize = c->keylen;
624
625         w = talloc(ctdb, struct ctdb_daemon_packet_wrap);
626         CTDB_NO_MEMORY_VOID(ctdb, w);   
627
628         w->ctdb = ctdb;
629         w->client_id = client->client_id;
630
631         ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header, 
632                                            (struct ctdb_req_header *)c, &data,
633                                            daemon_incoming_packet_wrap, w, True);
634         if (ret == -2) {
635                 /* will retry later */
636                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
637                 return;
638         }
639
640         talloc_free(w);
641
642         if (ret != 0) {
643                 DEBUG(DEBUG_ERR,(__location__ " Unable to fetch record\n"));
644                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
645                 return;
646         }
647
648         if (c->flags & CTDB_IMMEDIATE_MIGRATION) {
649                 /* check if this fetch-lock request is a duplicate for a
650                    request we already have in flight. If so defer it until
651                    the first request completes.
652                  */
653                 if (requeue_duplicate_fetch(ctdb_db, client, key, c) == 0) {
654                         ret = ctdb_ltdb_unlock(ctdb_db, key);
655                         if (ret != 0) {
656                                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
657                         }
658                         return;
659                 }
660         }
661
662         /* Dont do READONLY if we dont have a tracking database */
663         if ((c->flags & CTDB_WANT_READONLY) && !ctdb_db->readonly) {
664                 c->flags &= ~CTDB_WANT_READONLY;
665         }
666
667         if (header.flags & CTDB_REC_RO_REVOKE_COMPLETE) {
668                 header.flags &= ~(CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY|CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_REVOKE_COMPLETE);
669                 if (ctdb_ltdb_store(ctdb_db, key, &header, data) != 0) {
670                         ctdb_fatal(ctdb, "Failed to write header with cleared REVOKE flag");
671                 }
672                 /* and clear out the tracking data */
673                 if (tdb_delete(ctdb_db->rottdb, key) != 0) {
674                         DEBUG(DEBUG_ERR,(__location__ " Failed to clear out trackingdb record\n"));
675                 }
676         }
677
678         /* if we are revoking, we must defer all other calls until the revoke
679          * had completed.
680          */
681         if (header.flags & CTDB_REC_RO_REVOKING_READONLY) {
682                 talloc_free(data.dptr);
683                 ret = ctdb_ltdb_unlock(ctdb_db, key);
684
685                 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, key, (struct ctdb_req_header *)c, daemon_incoming_packet, client) != 0) {
686                         ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
687                 }
688                 return;
689         }
690
691         if ((header.dmaster == ctdb->pnn)
692         && (!(c->flags & CTDB_WANT_READONLY))
693         && (header.flags & (CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY)) ) {
694                 header.flags   |= CTDB_REC_RO_REVOKING_READONLY;
695                 if (ctdb_ltdb_store(ctdb_db, key, &header, data) != 0) {
696                         ctdb_fatal(ctdb, "Failed to store record with HAVE_DELEGATIONS set");
697                 }
698                 ret = ctdb_ltdb_unlock(ctdb_db, key);
699
700                 if (ctdb_start_revoke_ro_record(ctdb, ctdb_db, key, &header, data) != 0) {
701                         ctdb_fatal(ctdb, "Failed to start record revoke");
702                 }
703                 talloc_free(data.dptr);
704
705                 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, key, (struct ctdb_req_header *)c, daemon_incoming_packet, client) != 0) {
706                         ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
707                 }
708
709                 return;
710         }               
711
712         dstate = talloc(client, struct daemon_call_state);
713         if (dstate == NULL) {
714                 ret = ctdb_ltdb_unlock(ctdb_db, key);
715                 if (ret != 0) {
716                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
717                 }
718
719                 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate dstate\n"));
720                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
721                 return;
722         }
723         dstate->start_time = timeval_current();
724         dstate->client = client;
725         dstate->reqid  = c->hdr.reqid;
726         talloc_steal(dstate, data.dptr);
727
728         call = dstate->call = talloc_zero(dstate, struct ctdb_call);
729         if (call == NULL) {
730                 ret = ctdb_ltdb_unlock(ctdb_db, key);
731                 if (ret != 0) {
732                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
733                 }
734
735                 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate call\n"));
736                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
737                 CTDB_UPDATE_LATENCY(ctdb, ctdb_db, "call_from_client 1", call_latency, dstate->start_time);
738                 return;
739         }
740
741         call->call_id = c->callid;
742         call->key = key;
743         call->call_data.dptr = c->data + c->keylen;
744         call->call_data.dsize = c->calldatalen;
745         call->flags = c->flags;
746
747         if (header.dmaster == ctdb->pnn) {
748                 state = ctdb_call_local_send(ctdb_db, call, &header, &data);
749         } else {
750                 state = ctdb_daemon_call_send_remote(ctdb_db, call, &header);
751                 if (call->flags & CTDB_IMMEDIATE_MIGRATION) {
752                         /* This request triggered a remote fetch-lock.
753                            set up a deferral for this key so any additional
754                            fetch-locks are deferred until the current one
755                            finishes.
756                          */
757                         setup_deferred_fetch_locks(ctdb_db, call);
758                 }
759         }
760
761         ret = ctdb_ltdb_unlock(ctdb_db, key);
762         if (ret != 0) {
763                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
764         }
765
766         if (state == NULL) {
767                 DEBUG(DEBUG_ERR,(__location__ " Unable to setup call send\n"));
768                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
769                 CTDB_UPDATE_LATENCY(ctdb, ctdb_db, "call_from_client 2", call_latency, dstate->start_time);
770                 return;
771         }
772         talloc_steal(state, dstate);
773         talloc_steal(client, state);
774
775         state->async.fn = daemon_call_from_client_callback;
776         state->async.private_data = dstate;
777 }
778
779
780 static void daemon_request_control_from_client(struct ctdb_client *client, 
781                                                struct ctdb_req_control *c);
782
783 /* data contains a packet from the client */
784 static void daemon_incoming_packet(void *p, struct ctdb_req_header *hdr)
785 {
786         struct ctdb_client *client = talloc_get_type(p, struct ctdb_client);
787         TALLOC_CTX *tmp_ctx;
788         struct ctdb_context *ctdb = client->ctdb;
789
790         /* place the packet as a child of a tmp_ctx. We then use
791            talloc_free() below to free it. If any of the calls want
792            to keep it, then they will steal it somewhere else, and the
793            talloc_free() will be a no-op */
794         tmp_ctx = talloc_new(client);
795         talloc_steal(tmp_ctx, hdr);
796
797         if (hdr->ctdb_magic != CTDB_MAGIC) {
798                 ctdb_set_error(client->ctdb, "Non CTDB packet rejected in daemon\n");
799                 goto done;
800         }
801
802         if (hdr->ctdb_version != CTDB_VERSION) {
803                 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
804                 goto done;
805         }
806
807         switch (hdr->operation) {
808         case CTDB_REQ_CALL:
809                 CTDB_INCREMENT_STAT(ctdb, client.req_call);
810                 daemon_request_call_from_client(client, (struct ctdb_req_call *)hdr);
811                 break;
812
813         case CTDB_REQ_MESSAGE:
814                 CTDB_INCREMENT_STAT(ctdb, client.req_message);
815                 daemon_request_message_from_client(client, (struct ctdb_req_message *)hdr);
816                 break;
817
818         case CTDB_REQ_CONTROL:
819                 CTDB_INCREMENT_STAT(ctdb, client.req_control);
820                 daemon_request_control_from_client(client, (struct ctdb_req_control *)hdr);
821                 break;
822
823         default:
824                 DEBUG(DEBUG_CRIT,(__location__ " daemon: unrecognized operation %u\n",
825                          hdr->operation));
826         }
827
828 done:
829         talloc_free(tmp_ctx);
830 }
831
832 /*
833   called when the daemon gets a incoming packet
834  */
835 static void ctdb_daemon_read_cb(uint8_t *data, size_t cnt, void *args)
836 {
837         struct ctdb_client *client = talloc_get_type(args, struct ctdb_client);
838         struct ctdb_req_header *hdr;
839
840         if (cnt == 0) {
841                 talloc_free(client);
842                 return;
843         }
844
845         CTDB_INCREMENT_STAT(client->ctdb, client_packets_recv);
846
847         if (cnt < sizeof(*hdr)) {
848                 ctdb_set_error(client->ctdb, "Bad packet length %u in daemon\n", 
849                                (unsigned)cnt);
850                 return;
851         }
852         hdr = (struct ctdb_req_header *)data;
853         if (cnt != hdr->length) {
854                 ctdb_set_error(client->ctdb, "Bad header length %u expected %u\n in daemon", 
855                                (unsigned)hdr->length, (unsigned)cnt);
856                 return;
857         }
858
859         if (hdr->ctdb_magic != CTDB_MAGIC) {
860                 ctdb_set_error(client->ctdb, "Non CTDB packet rejected\n");
861                 return;
862         }
863
864         if (hdr->ctdb_version != CTDB_VERSION) {
865                 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
866                 return;
867         }
868
869         DEBUG(DEBUG_DEBUG,(__location__ " client request %u of type %u length %u from "
870                  "node %u to %u\n", hdr->reqid, hdr->operation, hdr->length,
871                  hdr->srcnode, hdr->destnode));
872
873         /* it is the responsibility of the incoming packet function to free 'data' */
874         daemon_incoming_packet(client, hdr);
875 }
876
877
878 static int ctdb_clientpid_destructor(struct ctdb_client_pid_list *client_pid)
879 {
880         if (client_pid->ctdb->client_pids != NULL) {
881                 DLIST_REMOVE(client_pid->ctdb->client_pids, client_pid);
882         }
883
884         return 0;
885 }
886
887
888 static void ctdb_accept_client(struct event_context *ev, struct fd_event *fde, 
889                          uint16_t flags, void *private_data)
890 {
891         struct sockaddr_un addr;
892         socklen_t len;
893         int fd;
894         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
895         struct ctdb_client *client;
896         struct ctdb_client_pid_list *client_pid;
897         pid_t peer_pid = 0;
898
899         memset(&addr, 0, sizeof(addr));
900         len = sizeof(addr);
901         fd = accept(ctdb->daemon.sd, (struct sockaddr *)&addr, &len);
902         if (fd == -1) {
903                 return;
904         }
905
906         set_nonblocking(fd);
907         set_close_on_exec(fd);
908
909         DEBUG(DEBUG_DEBUG,(__location__ " Created SOCKET FD:%d to connected child\n", fd));
910
911         client = talloc_zero(ctdb, struct ctdb_client);
912         if (ctdb_get_peer_pid(fd, &peer_pid) == 0) {
913                 DEBUG(DEBUG_INFO,("Connected client with pid:%u\n", (unsigned)peer_pid));
914         }
915
916         client->ctdb = ctdb;
917         client->fd = fd;
918         client->client_id = ctdb_reqid_new(ctdb, client);
919         client->pid = peer_pid;
920
921         client_pid = talloc(client, struct ctdb_client_pid_list);
922         if (client_pid == NULL) {
923                 DEBUG(DEBUG_ERR,("Failed to allocate client pid structure\n"));
924                 close(fd);
925                 talloc_free(client);
926                 return;
927         }               
928         client_pid->ctdb   = ctdb;
929         client_pid->pid    = peer_pid;
930         client_pid->client = client;
931
932         DLIST_ADD(ctdb->client_pids, client_pid);
933
934         client->queue = ctdb_queue_setup(ctdb, client, fd, CTDB_DS_ALIGNMENT, 
935                                          ctdb_daemon_read_cb, client,
936                                          "client-%u", client->pid);
937
938         talloc_set_destructor(client, ctdb_client_destructor);
939         talloc_set_destructor(client_pid, ctdb_clientpid_destructor);
940         CTDB_INCREMENT_STAT(ctdb, num_clients);
941 }
942
943
944
945 /*
946   create a unix domain socket and bind it
947   return a file descriptor open on the socket 
948 */
949 static int ux_socket_bind(struct ctdb_context *ctdb)
950 {
951         struct sockaddr_un addr;
952
953         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
954         if (ctdb->daemon.sd == -1) {
955                 return -1;
956         }
957
958         set_close_on_exec(ctdb->daemon.sd);
959         set_nonblocking(ctdb->daemon.sd);
960
961         memset(&addr, 0, sizeof(addr));
962         addr.sun_family = AF_UNIX;
963         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
964
965         if (bind(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
966                 DEBUG(DEBUG_CRIT,("Unable to bind on ctdb socket '%s'\n", ctdb->daemon.name));
967                 goto failed;
968         }       
969
970         if (chown(ctdb->daemon.name, geteuid(), getegid()) != 0 ||
971             chmod(ctdb->daemon.name, 0700) != 0) {
972                 DEBUG(DEBUG_CRIT,("Unable to secure ctdb socket '%s', ctdb->daemon.name\n", ctdb->daemon.name));
973                 goto failed;
974         } 
975
976
977         if (listen(ctdb->daemon.sd, 100) != 0) {
978                 DEBUG(DEBUG_CRIT,("Unable to listen on ctdb socket '%s'\n", ctdb->daemon.name));
979                 goto failed;
980         }
981
982         return 0;
983
984 failed:
985         close(ctdb->daemon.sd);
986         ctdb->daemon.sd = -1;
987         return -1;      
988 }
989
990 static void sig_child_handler(struct event_context *ev,
991         struct signal_event *se, int signum, int count,
992         void *dont_care, 
993         void *private_data)
994 {
995 //      struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
996         int status;
997         pid_t pid = -1;
998
999         while (pid != 0) {
1000                 pid = waitpid(-1, &status, WNOHANG);
1001                 if (pid == -1) {
1002                         DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%d\n", errno));
1003                         return;
1004                 }
1005                 if (pid > 0) {
1006                         DEBUG(DEBUG_DEBUG, ("SIGCHLD from %d\n", (int)pid));
1007                 }
1008         }
1009 }
1010
1011 static void ctdb_setup_event_callback(struct ctdb_context *ctdb, int status,
1012                                       void *private_data)
1013 {
1014         if (status != 0) {
1015                 ctdb_fatal(ctdb, "Failed to run setup event\n");
1016                 return;
1017         }
1018         ctdb_run_notification_script(ctdb, "setup");
1019
1020         /* tell all other nodes we've just started up */
1021         ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL,
1022                                  0, CTDB_CONTROL_STARTUP, 0,
1023                                  CTDB_CTRL_FLAG_NOREPLY,
1024                                  tdb_null, NULL, NULL);
1025 }
1026
1027 /*
1028   start the protocol going as a daemon
1029 */
1030 int ctdb_start_daemon(struct ctdb_context *ctdb, bool do_fork, bool use_syslog, const char *public_address_list)
1031 {
1032         int res, ret = -1;
1033         struct fd_event *fde;
1034         const char *domain_socket_name;
1035         struct signal_event *se;
1036
1037         /* get rid of any old sockets */
1038         unlink(ctdb->daemon.name);
1039
1040         /* create a unix domain stream socket to listen to */
1041         res = ux_socket_bind(ctdb);
1042         if (res!=0) {
1043                 DEBUG(DEBUG_ALERT,(__location__ " Failed to open CTDB unix domain socket\n"));
1044                 exit(10);
1045         }
1046
1047         if (do_fork && fork()) {
1048                 return 0;
1049         }
1050
1051         tdb_reopen_all(False);
1052
1053         if (do_fork) {
1054                 setsid();
1055                 close(0);
1056                 if (open("/dev/null", O_RDONLY) != 0) {
1057                         DEBUG(DEBUG_ALERT,(__location__ " Failed to setup stdin on /dev/null\n"));
1058                         exit(11);
1059                 }
1060         }
1061         block_signal(SIGPIPE);
1062
1063         ctdbd_pid = getpid();
1064         ctdb->ctdbd_pid = ctdbd_pid;
1065
1066
1067         DEBUG(DEBUG_ERR, ("Starting CTDBD as pid : %u\n", ctdbd_pid));
1068
1069         if (ctdb->do_setsched) {
1070                 /* try to set us up as realtime */
1071                 ctdb_set_scheduler(ctdb);
1072         }
1073
1074         /* ensure the socket is deleted on exit of the daemon */
1075         domain_socket_name = talloc_strdup(talloc_autofree_context(), ctdb->daemon.name);
1076         if (domain_socket_name == NULL) {
1077                 DEBUG(DEBUG_ALERT,(__location__ " talloc_strdup failed.\n"));
1078                 exit(12);
1079         }
1080
1081         ctdb->ev = event_context_init(NULL);
1082         tevent_loop_allow_nesting(ctdb->ev);
1083         ret = ctdb_init_tevent_logging(ctdb);
1084         if (ret != 0) {
1085                 DEBUG(DEBUG_ALERT,("Failed to initialize TEVENT logging\n"));
1086                 exit(1);
1087         }
1088
1089         ctdb_set_child_logging(ctdb);
1090
1091         /* initialize statistics collection */
1092         ctdb_statistics_init(ctdb);
1093
1094         /* force initial recovery for election */
1095         ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
1096
1097         if (strcmp(ctdb->transport, "tcp") == 0) {
1098                 int ctdb_tcp_init(struct ctdb_context *);
1099                 ret = ctdb_tcp_init(ctdb);
1100         }
1101 #ifdef USE_INFINIBAND
1102         if (strcmp(ctdb->transport, "ib") == 0) {
1103                 int ctdb_ibw_init(struct ctdb_context *);
1104                 ret = ctdb_ibw_init(ctdb);
1105         }
1106 #endif
1107         if (ret != 0) {
1108                 DEBUG(DEBUG_ERR,("Failed to initialise transport '%s'\n", ctdb->transport));
1109                 return -1;
1110         }
1111
1112         if (ctdb->methods == NULL) {
1113                 DEBUG(DEBUG_ALERT,(__location__ " Can not initialize transport. ctdb->methods is NULL\n"));
1114                 ctdb_fatal(ctdb, "transport is unavailable. can not initialize.");
1115         }
1116
1117         /* initialise the transport  */
1118         if (ctdb->methods->initialise(ctdb) != 0) {
1119                 ctdb_fatal(ctdb, "transport failed to initialise");
1120         }
1121         if (public_address_list) {
1122                 ret = ctdb_set_public_addresses(ctdb, public_address_list);
1123                 if (ret == -1) {
1124                         DEBUG(DEBUG_ALERT,("Unable to setup public address list\n"));
1125                         exit(1);
1126                 }
1127         }
1128
1129
1130         /* attach to existing databases */
1131         if (ctdb_attach_databases(ctdb) != 0) {
1132                 ctdb_fatal(ctdb, "Failed to attach to databases\n");
1133         }
1134
1135         ret = ctdb_event_script(ctdb, CTDB_EVENT_INIT);
1136         if (ret != 0) {
1137                 ctdb_fatal(ctdb, "Failed to run init event\n");
1138         }
1139         ctdb_run_notification_script(ctdb, "init");
1140
1141         /* start frozen, then let the first election sort things out */
1142         if (ctdb_blocking_freeze(ctdb)) {
1143                 ctdb_fatal(ctdb, "Failed to get initial freeze\n");
1144         }
1145
1146         /* now start accepting clients, only can do this once frozen */
1147         fde = event_add_fd(ctdb->ev, ctdb, ctdb->daemon.sd, 
1148                            EVENT_FD_READ,
1149                            ctdb_accept_client, ctdb);
1150         tevent_fd_set_auto_close(fde);
1151
1152         /* release any IPs we hold from previous runs of the daemon */
1153         if (ctdb->tunable.disable_ip_failover == 0) {
1154                 ctdb_release_all_ips(ctdb);
1155         }
1156
1157         /* start the transport going */
1158         ctdb_start_transport(ctdb);
1159
1160         /* set up a handler to pick up sigchld */
1161         se = event_add_signal(ctdb->ev, ctdb,
1162                                      SIGCHLD, 0,
1163                                      sig_child_handler,
1164                                      ctdb);
1165         if (se == NULL) {
1166                 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD\n"));
1167                 exit(1);
1168         }
1169
1170         ret = ctdb_event_script_callback(ctdb,
1171                                          ctdb,
1172                                          ctdb_setup_event_callback,
1173                                          ctdb,
1174                                          false,
1175                                          CTDB_EVENT_SETUP,
1176                                          "%s",
1177                                          "");
1178         if (ret != 0) {
1179                 DEBUG(DEBUG_CRIT,("Failed to set up 'setup' event\n"));
1180                 exit(1);
1181         }
1182
1183         if (use_syslog) {
1184                 if (start_syslog_daemon(ctdb)) {
1185                         DEBUG(DEBUG_CRIT, ("Failed to start syslog daemon\n"));
1186                         exit(10);
1187                 }
1188         }
1189
1190         ctdb_lockdown_memory(ctdb);
1191           
1192         /* go into a wait loop to allow other nodes to complete */
1193         event_loop_wait(ctdb->ev);
1194
1195         DEBUG(DEBUG_CRIT,("event_loop_wait() returned. this should not happen\n"));
1196         exit(1);
1197 }
1198
1199 /*
1200   allocate a packet for use in daemon<->daemon communication
1201  */
1202 struct ctdb_req_header *_ctdb_transport_allocate(struct ctdb_context *ctdb,
1203                                                  TALLOC_CTX *mem_ctx, 
1204                                                  enum ctdb_operation operation, 
1205                                                  size_t length, size_t slength,
1206                                                  const char *type)
1207 {
1208         int size;
1209         struct ctdb_req_header *hdr;
1210
1211         length = MAX(length, slength);
1212         size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
1213
1214         if (ctdb->methods == NULL) {
1215                 DEBUG(DEBUG_INFO,(__location__ " Unable to allocate transport packet for operation %u of length %u. Transport is DOWN.\n",
1216                          operation, (unsigned)length));
1217                 return NULL;
1218         }
1219
1220         hdr = (struct ctdb_req_header *)ctdb->methods->allocate_pkt(mem_ctx, size);
1221         if (hdr == NULL) {
1222                 DEBUG(DEBUG_ERR,("Unable to allocate transport packet for operation %u of length %u\n",
1223                          operation, (unsigned)length));
1224                 return NULL;
1225         }
1226         talloc_set_name_const(hdr, type);
1227         memset(hdr, 0, slength);
1228         hdr->length       = length;
1229         hdr->operation    = operation;
1230         hdr->ctdb_magic   = CTDB_MAGIC;
1231         hdr->ctdb_version = CTDB_VERSION;
1232         hdr->generation   = ctdb->vnn_map->generation;
1233         hdr->srcnode      = ctdb->pnn;
1234
1235         return hdr;     
1236 }
1237
1238 struct daemon_control_state {
1239         struct daemon_control_state *next, *prev;
1240         struct ctdb_client *client;
1241         struct ctdb_req_control *c;
1242         uint32_t reqid;
1243         struct ctdb_node *node;
1244 };
1245
1246 /*
1247   callback when a control reply comes in
1248  */
1249 static void daemon_control_callback(struct ctdb_context *ctdb,
1250                                     int32_t status, TDB_DATA data, 
1251                                     const char *errormsg,
1252                                     void *private_data)
1253 {
1254         struct daemon_control_state *state = talloc_get_type(private_data, 
1255                                                              struct daemon_control_state);
1256         struct ctdb_client *client = state->client;
1257         struct ctdb_reply_control *r;
1258         size_t len;
1259         int ret;
1260
1261         /* construct a message to send to the client containing the data */
1262         len = offsetof(struct ctdb_reply_control, data) + data.dsize;
1263         if (errormsg) {
1264                 len += strlen(errormsg);
1265         }
1266         r = ctdbd_allocate_pkt(ctdb, state, CTDB_REPLY_CONTROL, len, 
1267                                struct ctdb_reply_control);
1268         CTDB_NO_MEMORY_VOID(ctdb, r);
1269
1270         r->hdr.reqid     = state->reqid;
1271         r->status        = status;
1272         r->datalen       = data.dsize;
1273         r->errorlen = 0;
1274         memcpy(&r->data[0], data.dptr, data.dsize);
1275         if (errormsg) {
1276                 r->errorlen = strlen(errormsg);
1277                 memcpy(&r->data[r->datalen], errormsg, r->errorlen);
1278         }
1279
1280         ret = daemon_queue_send(client, &r->hdr);
1281         if (ret != -1) {
1282                 talloc_free(state);
1283         }
1284 }
1285
1286 /*
1287   fail all pending controls to a disconnected node
1288  */
1289 void ctdb_daemon_cancel_controls(struct ctdb_context *ctdb, struct ctdb_node *node)
1290 {
1291         struct daemon_control_state *state;
1292         while ((state = node->pending_controls)) {
1293                 DLIST_REMOVE(node->pending_controls, state);
1294                 daemon_control_callback(ctdb, (uint32_t)-1, tdb_null, 
1295                                         "node is disconnected", state);
1296         }
1297 }
1298
1299 /*
1300   destroy a daemon_control_state
1301  */
1302 static int daemon_control_destructor(struct daemon_control_state *state)
1303 {
1304         if (state->node) {
1305                 DLIST_REMOVE(state->node->pending_controls, state);
1306         }
1307         return 0;
1308 }
1309
1310 /*
1311   this is called when the ctdb daemon received a ctdb request control
1312   from a local client over the unix domain socket
1313  */
1314 static void daemon_request_control_from_client(struct ctdb_client *client, 
1315                                                struct ctdb_req_control *c)
1316 {
1317         TDB_DATA data;
1318         int res;
1319         struct daemon_control_state *state;
1320         TALLOC_CTX *tmp_ctx = talloc_new(client);
1321
1322         if (c->hdr.destnode == CTDB_CURRENT_NODE) {
1323                 c->hdr.destnode = client->ctdb->pnn;
1324         }
1325
1326         state = talloc(client, struct daemon_control_state);
1327         CTDB_NO_MEMORY_VOID(client->ctdb, state);
1328
1329         state->client = client;
1330         state->c = talloc_steal(state, c);
1331         state->reqid = c->hdr.reqid;
1332         if (ctdb_validate_pnn(client->ctdb, c->hdr.destnode)) {
1333                 state->node = client->ctdb->nodes[c->hdr.destnode];
1334                 DLIST_ADD(state->node->pending_controls, state);
1335         } else {
1336                 state->node = NULL;
1337         }
1338
1339         talloc_set_destructor(state, daemon_control_destructor);
1340
1341         if (c->flags & CTDB_CTRL_FLAG_NOREPLY) {
1342                 talloc_steal(tmp_ctx, state);
1343         }
1344         
1345         data.dptr = &c->data[0];
1346         data.dsize = c->datalen;
1347         res = ctdb_daemon_send_control(client->ctdb, c->hdr.destnode,
1348                                        c->srvid, c->opcode, client->client_id,
1349                                        c->flags,
1350                                        data, daemon_control_callback,
1351                                        state);
1352         if (res != 0) {
1353                 DEBUG(DEBUG_ERR,(__location__ " Failed to send control to remote node %u\n",
1354                          c->hdr.destnode));
1355         }
1356
1357         talloc_free(tmp_ctx);
1358 }
1359
1360 /*
1361   register a call function
1362 */
1363 int ctdb_daemon_set_call(struct ctdb_context *ctdb, uint32_t db_id,
1364                          ctdb_fn_t fn, int id)
1365 {
1366         struct ctdb_registered_call *call;
1367         struct ctdb_db_context *ctdb_db;
1368
1369         ctdb_db = find_ctdb_db(ctdb, db_id);
1370         if (ctdb_db == NULL) {
1371                 return -1;
1372         }
1373
1374         call = talloc(ctdb_db, struct ctdb_registered_call);
1375         call->fn = fn;
1376         call->id = id;
1377
1378         DLIST_ADD(ctdb_db->calls, call);        
1379         return 0;
1380 }
1381
1382
1383
1384 /*
1385   this local messaging handler is ugly, but is needed to prevent
1386   recursion in ctdb_send_message() when the destination node is the
1387   same as the source node
1388  */
1389 struct ctdb_local_message {
1390         struct ctdb_context *ctdb;
1391         uint64_t srvid;
1392         TDB_DATA data;
1393 };
1394
1395 static void ctdb_local_message_trigger(struct event_context *ev, struct timed_event *te, 
1396                                        struct timeval t, void *private_data)
1397 {
1398         struct ctdb_local_message *m = talloc_get_type(private_data, 
1399                                                        struct ctdb_local_message);
1400         int res;
1401
1402         res = ctdb_dispatch_message(m->ctdb, m->srvid, m->data);
1403         if (res != 0) {
1404                 DEBUG(DEBUG_ERR, (__location__ " Failed to dispatch message for srvid=%llu\n", 
1405                           (unsigned long long)m->srvid));
1406         }
1407         talloc_free(m);
1408 }
1409
1410 static int ctdb_local_message(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data)
1411 {
1412         struct ctdb_local_message *m;
1413         m = talloc(ctdb, struct ctdb_local_message);
1414         CTDB_NO_MEMORY(ctdb, m);
1415
1416         m->ctdb = ctdb;
1417         m->srvid = srvid;
1418         m->data  = data;
1419         m->data.dptr = talloc_memdup(m, m->data.dptr, m->data.dsize);
1420         if (m->data.dptr == NULL) {
1421                 talloc_free(m);
1422                 return -1;
1423         }
1424
1425         /* this needs to be done as an event to prevent recursion */
1426         event_add_timed(ctdb->ev, m, timeval_zero(), ctdb_local_message_trigger, m);
1427         return 0;
1428 }
1429
1430 /*
1431   send a ctdb message
1432 */
1433 int ctdb_daemon_send_message(struct ctdb_context *ctdb, uint32_t pnn,
1434                              uint64_t srvid, TDB_DATA data)
1435 {
1436         struct ctdb_req_message *r;
1437         int len;
1438
1439         if (ctdb->methods == NULL) {
1440                 DEBUG(DEBUG_INFO,(__location__ " Failed to send message. Transport is DOWN\n"));
1441                 return -1;
1442         }
1443
1444         /* see if this is a message to ourselves */
1445         if (pnn == ctdb->pnn) {
1446                 return ctdb_local_message(ctdb, srvid, data);
1447         }
1448
1449         len = offsetof(struct ctdb_req_message, data) + data.dsize;
1450         r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_MESSAGE, len,
1451                                     struct ctdb_req_message);
1452         CTDB_NO_MEMORY(ctdb, r);
1453
1454         r->hdr.destnode  = pnn;
1455         r->srvid         = srvid;
1456         r->datalen       = data.dsize;
1457         memcpy(&r->data[0], data.dptr, data.dsize);
1458
1459         ctdb_queue_packet(ctdb, &r->hdr);
1460
1461         talloc_free(r);
1462         return 0;
1463 }
1464
1465
1466
1467 struct ctdb_client_notify_list {
1468         struct ctdb_client_notify_list *next, *prev;
1469         struct ctdb_context *ctdb;
1470         uint64_t srvid;
1471         TDB_DATA data;
1472 };
1473
1474
1475 static int ctdb_client_notify_destructor(struct ctdb_client_notify_list *nl)
1476 {
1477         int ret;
1478
1479         DEBUG(DEBUG_ERR,("Sending client notify message for srvid:%llu\n", (unsigned long long)nl->srvid));
1480
1481         ret = ctdb_daemon_send_message(nl->ctdb, CTDB_BROADCAST_CONNECTED, (unsigned long long)nl->srvid, nl->data);
1482         if (ret != 0) {
1483                 DEBUG(DEBUG_ERR,("Failed to send client notify message\n"));
1484         }
1485
1486         return 0;
1487 }
1488
1489 int32_t ctdb_control_register_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1490 {
1491         struct ctdb_client_notify_register *notify = (struct ctdb_client_notify_register *)indata.dptr;
1492         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client); 
1493         struct ctdb_client_notify_list *nl;
1494
1495         DEBUG(DEBUG_INFO,("Register srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
1496
1497         if (indata.dsize < offsetof(struct ctdb_client_notify_register, notify_data)) {
1498                 DEBUG(DEBUG_ERR,(__location__ " Too little data in control : %d\n", (int)indata.dsize));
1499                 return -1;
1500         }
1501
1502         if (indata.dsize != (notify->len + offsetof(struct ctdb_client_notify_register, notify_data))) {
1503                 DEBUG(DEBUG_ERR,(__location__ " Wrong amount of data in control. Got %d, expected %d\n", (int)indata.dsize, (int)(notify->len + offsetof(struct ctdb_client_notify_register, notify_data))));
1504                 return -1;
1505         }
1506
1507
1508         if (client == NULL) {
1509                 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1510                 return -1;
1511         }
1512
1513         for(nl=client->notify; nl; nl=nl->next) {
1514                 if (nl->srvid == notify->srvid) {
1515                         break;
1516                 }
1517         }
1518         if (nl != NULL) {
1519                 DEBUG(DEBUG_ERR,(__location__ " Notification for srvid:%llu already exists for this client\n", (unsigned long long)notify->srvid));
1520                 return -1;
1521         }
1522
1523         nl = talloc(client, struct ctdb_client_notify_list);
1524         CTDB_NO_MEMORY(ctdb, nl);
1525         nl->ctdb       = ctdb;
1526         nl->srvid      = notify->srvid;
1527         nl->data.dsize = notify->len;
1528         nl->data.dptr  = talloc_size(nl, nl->data.dsize);
1529         CTDB_NO_MEMORY(ctdb, nl->data.dptr);
1530         memcpy(nl->data.dptr, notify->notify_data, nl->data.dsize);
1531         
1532         DLIST_ADD(client->notify, nl);
1533         talloc_set_destructor(nl, ctdb_client_notify_destructor);
1534
1535         return 0;
1536 }
1537
1538 int32_t ctdb_control_deregister_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1539 {
1540         struct ctdb_client_notify_deregister *notify = (struct ctdb_client_notify_deregister *)indata.dptr;
1541         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client); 
1542         struct ctdb_client_notify_list *nl;
1543
1544         DEBUG(DEBUG_INFO,("Deregister srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
1545
1546         if (client == NULL) {
1547                 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1548                 return -1;
1549         }
1550
1551         for(nl=client->notify; nl; nl=nl->next) {
1552                 if (nl->srvid == notify->srvid) {
1553                         break;
1554                 }
1555         }
1556         if (nl == NULL) {
1557                 DEBUG(DEBUG_ERR,(__location__ " No notification for srvid:%llu found for this client\n", (unsigned long long)notify->srvid));
1558                 return -1;
1559         }
1560
1561         DLIST_REMOVE(client->notify, nl);
1562         talloc_set_destructor(nl, NULL);
1563         talloc_free(nl);
1564
1565         return 0;
1566 }
1567
1568 struct ctdb_client *ctdb_find_client_by_pid(struct ctdb_context *ctdb, pid_t pid)
1569 {
1570         struct ctdb_client_pid_list *client_pid;
1571
1572         for (client_pid = ctdb->client_pids; client_pid; client_pid=client_pid->next) {
1573                 if (client_pid->pid == pid) {
1574                         return client_pid->client;
1575                 }
1576         }
1577         return NULL;
1578 }
1579
1580
1581 /* This control is used by samba when probing if a process (of a samba daemon)
1582    exists on the node.
1583    Samba does this when it needs/wants to check if a subrecord in one of the
1584    databases is still valied, or if it is stale and can be removed.
1585    If the node is in unhealthy or stopped state we just kill of the samba
1586    process holding htis sub-record and return to the calling samba that
1587    the process does not exist.
1588    This allows us to forcefully recall subrecords registered by samba processes
1589    on banned and stopped nodes.
1590 */
1591 int32_t ctdb_control_process_exists(struct ctdb_context *ctdb, pid_t pid)
1592 {
1593         struct ctdb_client *client;
1594
1595         if (ctdb->nodes[ctdb->pnn]->flags & (NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
1596                 client = ctdb_find_client_by_pid(ctdb, pid);
1597                 if (client != NULL) {
1598                         DEBUG(DEBUG_NOTICE,(__location__ " Killing client with pid:%d on banned/stopped node\n", (int)pid));
1599                         talloc_free(client);
1600                 }
1601                 return -1;
1602         }
1603
1604         return kill(pid, 0);
1605 }