fix a conflict in the merge from rusty
[metze/ctdb/wip.git] / server / ctdb_daemon.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2006
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "db_wrap.h"
22 #include "lib/tdb/include/tdb.h"
23 #include "lib/events/events.h"
24 #include "lib/util/dlinklist.h"
25 #include "system/network.h"
26 #include "system/filesys.h"
27 #include "system/wait.h"
28 #include "../include/ctdb.h"
29 #include "../include/ctdb_private.h"
30 #include <sys/socket.h>
31
32 struct ctdb_client_pid_list {
33         struct ctdb_client_pid_list *next, *prev;
34         struct ctdb_context *ctdb;
35         pid_t pid;
36         struct ctdb_client *client;
37 };
38
39 static void daemon_incoming_packet(void *, struct ctdb_req_header *);
40
41 static void print_exit_message(void)
42 {
43         DEBUG(DEBUG_NOTICE,("CTDB daemon shutting down\n"));
44 }
45
46 /* called when the "startup" event script has finished */
47 static void ctdb_start_transport(struct ctdb_context *ctdb)
48 {
49         if (ctdb->methods == NULL) {
50                 DEBUG(DEBUG_ALERT,(__location__ " startup event finished but transport is DOWN.\n"));
51                 ctdb_fatal(ctdb, "transport is not initialized but startup completed");
52         }
53
54         /* start the transport running */
55         if (ctdb->methods->start(ctdb) != 0) {
56                 DEBUG(DEBUG_ALERT,("transport failed to start!\n"));
57                 ctdb_fatal(ctdb, "transport failed to start");
58         }
59
60         /* start the recovery daemon process */
61         if (ctdb_start_recoverd(ctdb) != 0) {
62                 DEBUG(DEBUG_ALERT,("Failed to start recovery daemon\n"));
63                 exit(11);
64         }
65
66         /* Make sure we log something when the daemon terminates */
67         atexit(print_exit_message);
68
69         /* start monitoring for connected/disconnected nodes */
70         ctdb_start_keepalive(ctdb);
71
72         /* start monitoring for node health */
73         ctdb_start_monitoring(ctdb);
74
75         /* start periodic update of tcp tickle lists */
76         ctdb_start_tcp_tickle_update(ctdb);
77
78         /* start listening for recovery daemon pings */
79         ctdb_control_recd_ping(ctdb);
80 }
81
82 static void block_signal(int signum)
83 {
84         struct sigaction act;
85
86         memset(&act, 0, sizeof(act));
87
88         act.sa_handler = SIG_IGN;
89         sigemptyset(&act.sa_mask);
90         sigaddset(&act.sa_mask, signum);
91         sigaction(signum, &act, NULL);
92 }
93
94
95 /*
96   send a packet to a client
97  */
98 static int daemon_queue_send(struct ctdb_client *client, struct ctdb_req_header *hdr)
99 {
100         client->ctdb->statistics.client_packets_sent++;
101         if (hdr->operation == CTDB_REQ_MESSAGE) {
102                 if (ctdb_queue_length(client->queue) > client->ctdb->tunable.max_queue_depth_drop_msg) {
103                         DEBUG(DEBUG_ERR,("Drop CTDB_REQ_MESSAGE to client. Queue full.\n"));
104                         return 0;
105                 }
106         }
107         return ctdb_queue_send(client->queue, (uint8_t *)hdr, hdr->length);
108 }
109
110 /*
111   message handler for when we are in daemon mode. This redirects the message
112   to the right client
113  */
114 static void daemon_message_handler(struct ctdb_context *ctdb, uint64_t srvid, 
115                                     TDB_DATA data, void *private_data)
116 {
117         struct ctdb_client *client = talloc_get_type(private_data, struct ctdb_client);
118         struct ctdb_req_message *r;
119         int len;
120
121         /* construct a message to send to the client containing the data */
122         len = offsetof(struct ctdb_req_message, data) + data.dsize;
123         r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE, 
124                                len, struct ctdb_req_message);
125         CTDB_NO_MEMORY_VOID(ctdb, r);
126
127         talloc_set_name_const(r, "req_message packet");
128
129         r->srvid         = srvid;
130         r->datalen       = data.dsize;
131         memcpy(&r->data[0], data.dptr, data.dsize);
132
133         daemon_queue_send(client, &r->hdr);
134
135         talloc_free(r);
136 }
137
138 /*
139   this is called when the ctdb daemon received a ctdb request to 
140   set the srvid from the client
141  */
142 int daemon_register_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
143 {
144         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
145         int res;
146         if (client == NULL) {
147                 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_register_message_handler\n"));
148                 return -1;
149         }
150         res = ctdb_register_message_handler(ctdb, client, srvid, daemon_message_handler, client);
151         if (res != 0) {
152                 DEBUG(DEBUG_ERR,(__location__ " Failed to register handler %llu in daemon\n", 
153                          (unsigned long long)srvid));
154         } else {
155                 DEBUG(DEBUG_INFO,(__location__ " Registered message handler for srvid=%llu\n", 
156                          (unsigned long long)srvid));
157         }
158
159         return res;
160 }
161
162 /*
163   this is called when the ctdb daemon received a ctdb request to 
164   remove a srvid from the client
165  */
166 int daemon_deregister_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
167 {
168         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
169         if (client == NULL) {
170                 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_deregister_message_handler\n"));
171                 return -1;
172         }
173         return ctdb_deregister_message_handler(ctdb, srvid, client);
174 }
175
176
177 /*
178   destroy a ctdb_client
179 */
180 static int ctdb_client_destructor(struct ctdb_client *client)
181 {
182         struct ctdb_db_context *ctdb_db;
183
184         ctdb_takeover_client_destructor_hook(client);
185         ctdb_reqid_remove(client->ctdb, client->client_id);
186         if (client->ctdb->statistics.num_clients) {
187                 client->ctdb->statistics.num_clients--;
188         }
189
190         if (client->num_persistent_updates != 0) {
191                 DEBUG(DEBUG_ERR,(__location__ " Client disconnecting with %u persistent updates in flight. Starting recovery\n", client->num_persistent_updates));
192                 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
193         }
194         ctdb_db = find_ctdb_db(client->ctdb, client->db_id);
195         if (ctdb_db) {
196                 DEBUG(DEBUG_ERR, (__location__ " client exit while transaction "
197                                   "commit active. Forcing recovery.\n"));
198                 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
199                 ctdb_db->transaction_active = false;
200         }
201
202         return 0;
203 }
204
205
206 /*
207   this is called when the ctdb daemon received a ctdb request message
208   from a local client over the unix domain socket
209  */
210 static void daemon_request_message_from_client(struct ctdb_client *client, 
211                                                struct ctdb_req_message *c)
212 {
213         TDB_DATA data;
214         int res;
215
216         /* maybe the message is for another client on this node */
217         if (ctdb_get_pnn(client->ctdb)==c->hdr.destnode) {
218                 ctdb_request_message(client->ctdb, (struct ctdb_req_header *)c);
219                 return;
220         }
221
222         /* its for a remote node */
223         data.dptr = &c->data[0];
224         data.dsize = c->datalen;
225         res = ctdb_daemon_send_message(client->ctdb, c->hdr.destnode,
226                                        c->srvid, data);
227         if (res != 0) {
228                 DEBUG(DEBUG_ERR,(__location__ " Failed to send message to remote node %u\n",
229                          c->hdr.destnode));
230         }
231 }
232
233
234 struct daemon_call_state {
235         struct ctdb_client *client;
236         uint32_t reqid;
237         struct ctdb_call *call;
238         struct timeval start_time;
239 };
240
241 /* 
242    complete a call from a client 
243 */
244 static void daemon_call_from_client_callback(struct ctdb_call_state *state)
245 {
246         struct daemon_call_state *dstate = talloc_get_type(state->async.private_data, 
247                                                            struct daemon_call_state);
248         struct ctdb_reply_call *r;
249         int res;
250         uint32_t length;
251         struct ctdb_client *client = dstate->client;
252         struct ctdb_db_context *ctdb_db = state->ctdb_db;
253
254         talloc_steal(client, dstate);
255         talloc_steal(dstate, dstate->call);
256
257         res = ctdb_daemon_call_recv(state, dstate->call);
258         if (res != 0) {
259                 DEBUG(DEBUG_ERR, (__location__ " ctdbd_call_recv() returned error\n"));
260                 if (client->ctdb->statistics.pending_calls > 0) {
261                         client->ctdb->statistics.pending_calls--;
262                 }
263                 ctdb_latency(ctdb_db, "call_from_client_cb 1", &client->ctdb->statistics.max_call_latency, dstate->start_time);
264                 return;
265         }
266
267         length = offsetof(struct ctdb_reply_call, data) + dstate->call->reply_data.dsize;
268         r = ctdbd_allocate_pkt(client->ctdb, dstate, CTDB_REPLY_CALL, 
269                                length, struct ctdb_reply_call);
270         if (r == NULL) {
271                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate reply_call in ctdb daemon\n"));
272                 if (client->ctdb->statistics.pending_calls > 0) {
273                         client->ctdb->statistics.pending_calls--;
274                 }
275                 ctdb_latency(ctdb_db, "call_from_client_cb 2", &client->ctdb->statistics.max_call_latency, dstate->start_time);
276                 return;
277         }
278         r->hdr.reqid        = dstate->reqid;
279         r->datalen          = dstate->call->reply_data.dsize;
280         memcpy(&r->data[0], dstate->call->reply_data.dptr, r->datalen);
281
282         res = daemon_queue_send(client, &r->hdr);
283         if (res != 0) {
284                 DEBUG(DEBUG_ERR, (__location__ " Failed to queue packet from daemon to client\n"));
285         }
286         ctdb_latency(ctdb_db, "call_from_client_cb 3", &client->ctdb->statistics.max_call_latency, dstate->start_time);
287         talloc_free(dstate);
288         if (client->ctdb->statistics.pending_calls > 0) {
289                 client->ctdb->statistics.pending_calls--;
290         }
291 }
292
293 struct ctdb_daemon_packet_wrap {
294         struct ctdb_context *ctdb;
295         uint32_t client_id;
296 };
297
298 /*
299   a wrapper to catch disconnected clients
300  */
301 static void daemon_incoming_packet_wrap(void *p, struct ctdb_req_header *hdr)
302 {
303         struct ctdb_client *client;
304         struct ctdb_daemon_packet_wrap *w = talloc_get_type(p, 
305                                                             struct ctdb_daemon_packet_wrap);
306         if (w == NULL) {
307                 DEBUG(DEBUG_CRIT,(__location__ " Bad packet type '%s'\n", talloc_get_name(p)));
308                 return;
309         }
310
311         client = ctdb_reqid_find(w->ctdb, w->client_id, struct ctdb_client);
312         if (client == NULL) {
313                 DEBUG(DEBUG_ERR,(__location__ " Packet for disconnected client %u\n",
314                          w->client_id));
315                 talloc_free(w);
316                 return;
317         }
318         talloc_free(w);
319
320         /* process it */
321         daemon_incoming_packet(client, hdr);    
322 }
323
324
325 /*
326   this is called when the ctdb daemon received a ctdb request call
327   from a local client over the unix domain socket
328  */
329 static void daemon_request_call_from_client(struct ctdb_client *client, 
330                                             struct ctdb_req_call *c)
331 {
332         struct ctdb_call_state *state;
333         struct ctdb_db_context *ctdb_db;
334         struct daemon_call_state *dstate;
335         struct ctdb_call *call;
336         struct ctdb_ltdb_header header;
337         TDB_DATA key, data;
338         int ret;
339         struct ctdb_context *ctdb = client->ctdb;
340         struct ctdb_daemon_packet_wrap *w;
341
342         ctdb->statistics.total_calls++;
343         if (client->ctdb->statistics.pending_calls > 0) {
344                 ctdb->statistics.pending_calls++;
345         }
346
347         ctdb_db = find_ctdb_db(client->ctdb, c->db_id);
348         if (!ctdb_db) {
349                 DEBUG(DEBUG_ERR, (__location__ " Unknown database in request. db_id==0x%08x",
350                           c->db_id));
351                 if (client->ctdb->statistics.pending_calls > 0) {
352                         ctdb->statistics.pending_calls--;
353                 }
354                 return;
355         }
356
357         if (ctdb_db->unhealthy_reason) {
358                 /*
359                  * this is just a warning, as the tdb should be empty anyway,
360                  * and only persistent databases can be unhealthy, which doesn't
361                  * use this code patch
362                  */
363                 DEBUG(DEBUG_WARNING,("warn: db(%s) unhealty in daemon_request_call_from_client(): %s\n",
364                                      ctdb_db->db_name, ctdb_db->unhealthy_reason));
365         }
366
367         key.dptr = c->data;
368         key.dsize = c->keylen;
369
370         w = talloc(ctdb, struct ctdb_daemon_packet_wrap);
371         CTDB_NO_MEMORY_VOID(ctdb, w);   
372
373         w->ctdb = ctdb;
374         w->client_id = client->client_id;
375
376         ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header, 
377                                            (struct ctdb_req_header *)c, &data,
378                                            daemon_incoming_packet_wrap, w, True);
379         if (ret == -2) {
380                 /* will retry later */
381                 if (client->ctdb->statistics.pending_calls > 0) {
382                         ctdb->statistics.pending_calls--;
383                 }
384                 return;
385         }
386
387         talloc_free(w);
388
389         if (ret != 0) {
390                 DEBUG(DEBUG_ERR,(__location__ " Unable to fetch record\n"));
391                 if (client->ctdb->statistics.pending_calls > 0) {
392                         ctdb->statistics.pending_calls--;
393                 }
394                 return;
395         }
396
397         dstate = talloc(client, struct daemon_call_state);
398         if (dstate == NULL) {
399                 ctdb_ltdb_unlock(ctdb_db, key);
400                 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate dstate\n"));
401                 if (client->ctdb->statistics.pending_calls > 0) {
402                         ctdb->statistics.pending_calls--;
403                 }
404                 return;
405         }
406         dstate->start_time = timeval_current();
407         dstate->client = client;
408         dstate->reqid  = c->hdr.reqid;
409         talloc_steal(dstate, data.dptr);
410
411         call = dstate->call = talloc_zero(dstate, struct ctdb_call);
412         if (call == NULL) {
413                 ctdb_ltdb_unlock(ctdb_db, key);
414                 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate call\n"));
415                 if (client->ctdb->statistics.pending_calls > 0) {
416                         ctdb->statistics.pending_calls--;
417                 }
418                 ctdb_latency(ctdb_db, "call_from_client 1", &ctdb->statistics.max_call_latency, dstate->start_time);
419                 return;
420         }
421
422         call->call_id = c->callid;
423         call->key = key;
424         call->call_data.dptr = c->data + c->keylen;
425         call->call_data.dsize = c->calldatalen;
426         call->flags = c->flags;
427
428         if (header.dmaster == ctdb->pnn) {
429                 state = ctdb_call_local_send(ctdb_db, call, &header, &data);
430         } else {
431                 state = ctdb_daemon_call_send_remote(ctdb_db, call, &header);
432         }
433
434         ctdb_ltdb_unlock(ctdb_db, key);
435
436         if (state == NULL) {
437                 DEBUG(DEBUG_ERR,(__location__ " Unable to setup call send\n"));
438                 if (client->ctdb->statistics.pending_calls > 0) {
439                         ctdb->statistics.pending_calls--;
440                 }
441                 ctdb_latency(ctdb_db, "call_from_client 2", &ctdb->statistics.max_call_latency, dstate->start_time);
442                 return;
443         }
444         talloc_steal(state, dstate);
445         talloc_steal(client, state);
446
447         state->async.fn = daemon_call_from_client_callback;
448         state->async.private_data = dstate;
449 }
450
451
452 static void daemon_request_control_from_client(struct ctdb_client *client, 
453                                                struct ctdb_req_control *c);
454
455 /* data contains a packet from the client */
456 static void daemon_incoming_packet(void *p, struct ctdb_req_header *hdr)
457 {
458         struct ctdb_client *client = talloc_get_type(p, struct ctdb_client);
459         TALLOC_CTX *tmp_ctx;
460         struct ctdb_context *ctdb = client->ctdb;
461
462         /* place the packet as a child of a tmp_ctx. We then use
463            talloc_free() below to free it. If any of the calls want
464            to keep it, then they will steal it somewhere else, and the
465            talloc_free() will be a no-op */
466         tmp_ctx = talloc_new(client);
467         talloc_steal(tmp_ctx, hdr);
468
469         if (hdr->ctdb_magic != CTDB_MAGIC) {
470                 ctdb_set_error(client->ctdb, "Non CTDB packet rejected in daemon\n");
471                 goto done;
472         }
473
474         if (hdr->ctdb_version != CTDB_VERSION) {
475                 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
476                 goto done;
477         }
478
479         switch (hdr->operation) {
480         case CTDB_REQ_CALL:
481                 ctdb->statistics.client.req_call++;
482                 daemon_request_call_from_client(client, (struct ctdb_req_call *)hdr);
483                 break;
484
485         case CTDB_REQ_MESSAGE:
486                 ctdb->statistics.client.req_message++;
487                 daemon_request_message_from_client(client, (struct ctdb_req_message *)hdr);
488                 break;
489
490         case CTDB_REQ_CONTROL:
491                 ctdb->statistics.client.req_control++;
492                 daemon_request_control_from_client(client, (struct ctdb_req_control *)hdr);
493                 break;
494
495         default:
496                 DEBUG(DEBUG_CRIT,(__location__ " daemon: unrecognized operation %u\n",
497                          hdr->operation));
498         }
499
500 done:
501         talloc_free(tmp_ctx);
502 }
503
504 /*
505   called when the daemon gets a incoming packet
506  */
507 static void ctdb_daemon_read_cb(uint8_t *data, size_t cnt, void *args)
508 {
509         struct ctdb_client *client = talloc_get_type(args, struct ctdb_client);
510         struct ctdb_req_header *hdr;
511
512         if (cnt == 0) {
513                 talloc_free(client);
514                 return;
515         }
516
517         client->ctdb->statistics.client_packets_recv++;
518
519         if (cnt < sizeof(*hdr)) {
520                 ctdb_set_error(client->ctdb, "Bad packet length %u in daemon\n", 
521                                (unsigned)cnt);
522                 return;
523         }
524         hdr = (struct ctdb_req_header *)data;
525         if (cnt != hdr->length) {
526                 ctdb_set_error(client->ctdb, "Bad header length %u expected %u\n in daemon", 
527                                (unsigned)hdr->length, (unsigned)cnt);
528                 return;
529         }
530
531         if (hdr->ctdb_magic != CTDB_MAGIC) {
532                 ctdb_set_error(client->ctdb, "Non CTDB packet rejected\n");
533                 return;
534         }
535
536         if (hdr->ctdb_version != CTDB_VERSION) {
537                 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
538                 return;
539         }
540
541         DEBUG(DEBUG_DEBUG,(__location__ " client request %u of type %u length %u from "
542                  "node %u to %u\n", hdr->reqid, hdr->operation, hdr->length,
543                  hdr->srcnode, hdr->destnode));
544
545         /* it is the responsibility of the incoming packet function to free 'data' */
546         daemon_incoming_packet(client, hdr);
547 }
548
549
550 static int ctdb_clientpid_destructor(struct ctdb_client_pid_list *client_pid)
551 {
552         if (client_pid->ctdb->client_pids != NULL) {
553                 DLIST_REMOVE(client_pid->ctdb->client_pids, client_pid);
554         }
555
556         return 0;
557 }
558
559
560 static void ctdb_accept_client(struct event_context *ev, struct fd_event *fde, 
561                          uint16_t flags, void *private_data)
562 {
563         struct sockaddr_un addr;
564         socklen_t len;
565         int fd;
566         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
567         struct ctdb_client *client;
568         struct ctdb_client_pid_list *client_pid;
569 #ifdef _AIX
570         struct peercred_struct cr;
571         socklen_t crl = sizeof(struct peercred_struct);
572 #else
573         struct ucred cr;
574         socklen_t crl = sizeof(struct ucred);
575 #endif
576
577         memset(&addr, 0, sizeof(addr));
578         len = sizeof(addr);
579         fd = accept(ctdb->daemon.sd, (struct sockaddr *)&addr, &len);
580         if (fd == -1) {
581                 return;
582         }
583
584         set_nonblocking(fd);
585         set_close_on_exec(fd);
586
587         DEBUG(DEBUG_DEBUG,(__location__ " Created SOCKET FD:%d to connected child\n", fd));
588
589         client = talloc_zero(ctdb, struct ctdb_client);
590 #ifdef _AIX
591         if (getsockopt(fd, SOL_SOCKET, SO_PEERID, &cr, &crl) == 0) {
592 #else
593         if (getsockopt(fd, SOL_SOCKET, SO_PEERCRED, &cr, &crl) == 0) {
594 #endif
595                 DEBUG(DEBUG_INFO,("Connected client with pid:%u\n", (unsigned)cr.pid));
596         }
597
598         client->ctdb = ctdb;
599         client->fd = fd;
600         client->client_id = ctdb_reqid_new(ctdb, client);
601         client->pid = cr.pid;
602
603         client_pid = talloc(client, struct ctdb_client_pid_list);
604         if (client_pid == NULL) {
605                 DEBUG(DEBUG_ERR,("Failed to allocate client pid structure\n"));
606                 close(fd);
607                 talloc_free(client);
608                 return;
609         }               
610         client_pid->ctdb   = ctdb;
611         client_pid->pid    = cr.pid;
612         client_pid->client = client;
613
614         DLIST_ADD(ctdb->client_pids, client_pid);
615
616         client->queue = ctdb_queue_setup(ctdb, client, fd, CTDB_DS_ALIGNMENT, 
617                                          ctdb_daemon_read_cb, client);
618
619         talloc_set_destructor(client, ctdb_client_destructor);
620         talloc_set_destructor(client_pid, ctdb_clientpid_destructor);
621         ctdb->statistics.num_clients++;
622 }
623
624
625
626 /*
627   create a unix domain socket and bind it
628   return a file descriptor open on the socket 
629 */
630 static int ux_socket_bind(struct ctdb_context *ctdb)
631 {
632         struct sockaddr_un addr;
633
634         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
635         if (ctdb->daemon.sd == -1) {
636                 return -1;
637         }
638
639         set_close_on_exec(ctdb->daemon.sd);
640         set_nonblocking(ctdb->daemon.sd);
641
642         memset(&addr, 0, sizeof(addr));
643         addr.sun_family = AF_UNIX;
644         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
645
646         if (bind(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
647                 DEBUG(DEBUG_CRIT,("Unable to bind on ctdb socket '%s'\n", ctdb->daemon.name));
648                 goto failed;
649         }       
650
651         if (chown(ctdb->daemon.name, geteuid(), getegid()) != 0 ||
652             chmod(ctdb->daemon.name, 0700) != 0) {
653                 DEBUG(DEBUG_CRIT,("Unable to secure ctdb socket '%s', ctdb->daemon.name\n", ctdb->daemon.name));
654                 goto failed;
655         } 
656
657
658         if (listen(ctdb->daemon.sd, 100) != 0) {
659                 DEBUG(DEBUG_CRIT,("Unable to listen on ctdb socket '%s'\n", ctdb->daemon.name));
660                 goto failed;
661         }
662
663         return 0;
664
665 failed:
666         close(ctdb->daemon.sd);
667         ctdb->daemon.sd = -1;
668         return -1;      
669 }
670
671 static void sig_child_handler(struct event_context *ev,
672         struct signal_event *se, int signum, int count,
673         void *dont_care, 
674         void *private_data)
675 {
676 //      struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
677         int status;
678         pid_t pid = -1;
679
680         while (pid != 0) {
681                 pid = waitpid(-1, &status, WNOHANG);
682                 if (pid == -1) {
683                         DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%d\n", errno));
684                         return;
685                 }
686                 if (pid > 0) {
687                         DEBUG(DEBUG_DEBUG, ("SIGCHLD from %d\n", (int)pid));
688                 }
689         }
690 }
691
692 /*
693   start the protocol going as a daemon
694 */
695 int ctdb_start_daemon(struct ctdb_context *ctdb, bool do_fork, bool use_syslog)
696 {
697         int res, ret = -1;
698         struct fd_event *fde;
699         const char *domain_socket_name;
700         struct signal_event *se;
701
702         /* get rid of any old sockets */
703         unlink(ctdb->daemon.name);
704
705         /* create a unix domain stream socket to listen to */
706         res = ux_socket_bind(ctdb);
707         if (res!=0) {
708                 DEBUG(DEBUG_ALERT,(__location__ " Failed to open CTDB unix domain socket\n"));
709                 exit(10);
710         }
711
712         if (do_fork && fork()) {
713                 return 0;
714         }
715
716         tdb_reopen_all(False);
717
718         if (do_fork) {
719                 setsid();
720                 close(0);
721                 if (open("/dev/null", O_RDONLY) != 0) {
722                         DEBUG(DEBUG_ALERT,(__location__ " Failed to setup stdin on /dev/null\n"));
723                         exit(11);
724                 }
725         }
726         block_signal(SIGPIPE);
727
728         ctdbd_pid = getpid();
729
730
731         DEBUG(DEBUG_ERR, ("Starting CTDBD as pid : %u\n", ctdbd_pid));
732
733         ctdb_high_priority(ctdb);
734
735         /* ensure the socket is deleted on exit of the daemon */
736         domain_socket_name = talloc_strdup(talloc_autofree_context(), ctdb->daemon.name);
737         if (domain_socket_name == NULL) {
738                 DEBUG(DEBUG_ALERT,(__location__ " talloc_strdup failed.\n"));
739                 exit(12);
740         }
741
742         ctdb->ev = event_context_init(NULL);
743
744         ctdb_set_child_logging(ctdb);
745
746         /* force initial recovery for election */
747         ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
748
749         if (strcmp(ctdb->transport, "tcp") == 0) {
750                 int ctdb_tcp_init(struct ctdb_context *);
751                 ret = ctdb_tcp_init(ctdb);
752         }
753 #ifdef USE_INFINIBAND
754         if (strcmp(ctdb->transport, "ib") == 0) {
755                 int ctdb_ibw_init(struct ctdb_context *);
756                 ret = ctdb_ibw_init(ctdb);
757         }
758 #endif
759         if (ret != 0) {
760                 DEBUG(DEBUG_ERR,("Failed to initialise transport '%s'\n", ctdb->transport));
761                 return -1;
762         }
763
764         if (ctdb->methods == NULL) {
765                 DEBUG(DEBUG_ALERT,(__location__ " Can not initialize transport. ctdb->methods is NULL\n"));
766                 ctdb_fatal(ctdb, "transport is unavailable. can not initialize.");
767         }
768
769         /* initialise the transport  */
770         if (ctdb->methods->initialise(ctdb) != 0) {
771                 ctdb_fatal(ctdb, "transport failed to initialise");
772         }
773
774         /* attach to existing databases */
775         if (ctdb_attach_databases(ctdb) != 0) {
776                 ctdb_fatal(ctdb, "Failed to attach to databases\n");
777         }
778
779         /* start frozen, then let the first election sort things out */
780         if (ctdb_blocking_freeze(ctdb)) {
781                 ctdb_fatal(ctdb, "Failed to get initial freeze\n");
782         }
783
784         /* now start accepting clients, only can do this once frozen */
785         fde = event_add_fd(ctdb->ev, ctdb, ctdb->daemon.sd, 
786                            EVENT_FD_READ|EVENT_FD_AUTOCLOSE, 
787                            ctdb_accept_client, ctdb);
788
789         /* tell all other nodes we've just started up */
790         ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL,
791                                  0, CTDB_CONTROL_STARTUP, 0,
792                                  CTDB_CTRL_FLAG_NOREPLY,
793                                  tdb_null, NULL, NULL);
794
795         /* release any IPs we hold from previous runs of the daemon */
796         ctdb_release_all_ips(ctdb);
797
798         /* start the transport going */
799         ctdb_start_transport(ctdb);
800
801         /* set up a handler to pick up sigchld */
802         se = event_add_signal(ctdb->ev, ctdb,
803                                      SIGCHLD, 0,
804                                      sig_child_handler,
805                                      ctdb);
806         if (se == NULL) {
807                 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD\n"));
808                 exit(1);
809         }
810
811         if (use_syslog) {
812                 if (start_syslog_daemon(ctdb)) {
813                         DEBUG(DEBUG_CRIT, ("Failed to start syslog daemon\n"));
814                         exit(10);
815                 }
816         }
817
818         ctdb_lockdown_memory(ctdb);
819           
820         /* go into a wait loop to allow other nodes to complete */
821         event_loop_wait(ctdb->ev);
822
823         DEBUG(DEBUG_CRIT,("event_loop_wait() returned. this should not happen\n"));
824         exit(1);
825 }
826
827 /*
828   allocate a packet for use in daemon<->daemon communication
829  */
830 struct ctdb_req_header *_ctdb_transport_allocate(struct ctdb_context *ctdb,
831                                                  TALLOC_CTX *mem_ctx, 
832                                                  enum ctdb_operation operation, 
833                                                  size_t length, size_t slength,
834                                                  const char *type)
835 {
836         int size;
837         struct ctdb_req_header *hdr;
838
839         length = MAX(length, slength);
840         size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
841
842         if (ctdb->methods == NULL) {
843                 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate transport packet for operation %u of length %u. Transport is DOWN.\n",
844                          operation, (unsigned)length));
845                 return NULL;
846         }
847
848         hdr = (struct ctdb_req_header *)ctdb->methods->allocate_pkt(mem_ctx, size);
849         if (hdr == NULL) {
850                 DEBUG(DEBUG_ERR,("Unable to allocate transport packet for operation %u of length %u\n",
851                          operation, (unsigned)length));
852                 return NULL;
853         }
854         talloc_set_name_const(hdr, type);
855         memset(hdr, 0, slength);
856         hdr->length       = length;
857         hdr->operation    = operation;
858         hdr->ctdb_magic   = CTDB_MAGIC;
859         hdr->ctdb_version = CTDB_VERSION;
860         hdr->generation   = ctdb->vnn_map->generation;
861         hdr->srcnode      = ctdb->pnn;
862
863         return hdr;     
864 }
865
866 struct daemon_control_state {
867         struct daemon_control_state *next, *prev;
868         struct ctdb_client *client;
869         struct ctdb_req_control *c;
870         uint32_t reqid;
871         struct ctdb_node *node;
872 };
873
874 /*
875   callback when a control reply comes in
876  */
877 static void daemon_control_callback(struct ctdb_context *ctdb,
878                                     int32_t status, TDB_DATA data, 
879                                     const char *errormsg,
880                                     void *private_data)
881 {
882         struct daemon_control_state *state = talloc_get_type(private_data, 
883                                                              struct daemon_control_state);
884         struct ctdb_client *client = state->client;
885         struct ctdb_reply_control *r;
886         size_t len;
887
888         /* construct a message to send to the client containing the data */
889         len = offsetof(struct ctdb_reply_control, data) + data.dsize;
890         if (errormsg) {
891                 len += strlen(errormsg);
892         }
893         r = ctdbd_allocate_pkt(ctdb, state, CTDB_REPLY_CONTROL, len, 
894                                struct ctdb_reply_control);
895         CTDB_NO_MEMORY_VOID(ctdb, r);
896
897         r->hdr.reqid     = state->reqid;
898         r->status        = status;
899         r->datalen       = data.dsize;
900         r->errorlen = 0;
901         memcpy(&r->data[0], data.dptr, data.dsize);
902         if (errormsg) {
903                 r->errorlen = strlen(errormsg);
904                 memcpy(&r->data[r->datalen], errormsg, r->errorlen);
905         }
906
907         daemon_queue_send(client, &r->hdr);
908
909         talloc_free(state);
910 }
911
912 /*
913   fail all pending controls to a disconnected node
914  */
915 void ctdb_daemon_cancel_controls(struct ctdb_context *ctdb, struct ctdb_node *node)
916 {
917         struct daemon_control_state *state;
918         while ((state = node->pending_controls)) {
919                 DLIST_REMOVE(node->pending_controls, state);
920                 daemon_control_callback(ctdb, (uint32_t)-1, tdb_null, 
921                                         "node is disconnected", state);
922         }
923 }
924
925 /*
926   destroy a daemon_control_state
927  */
928 static int daemon_control_destructor(struct daemon_control_state *state)
929 {
930         if (state->node) {
931                 DLIST_REMOVE(state->node->pending_controls, state);
932         }
933         return 0;
934 }
935
936 /*
937   this is called when the ctdb daemon received a ctdb request control
938   from a local client over the unix domain socket
939  */
940 static void daemon_request_control_from_client(struct ctdb_client *client, 
941                                                struct ctdb_req_control *c)
942 {
943         TDB_DATA data;
944         int res;
945         struct daemon_control_state *state;
946         TALLOC_CTX *tmp_ctx = talloc_new(client);
947
948         if (c->hdr.destnode == CTDB_CURRENT_NODE) {
949                 c->hdr.destnode = client->ctdb->pnn;
950         }
951
952         state = talloc(client, struct daemon_control_state);
953         CTDB_NO_MEMORY_VOID(client->ctdb, state);
954
955         state->client = client;
956         state->c = talloc_steal(state, c);
957         state->reqid = c->hdr.reqid;
958         if (ctdb_validate_pnn(client->ctdb, c->hdr.destnode)) {
959                 state->node = client->ctdb->nodes[c->hdr.destnode];
960                 DLIST_ADD(state->node->pending_controls, state);
961         } else {
962                 state->node = NULL;
963         }
964
965         talloc_set_destructor(state, daemon_control_destructor);
966
967         if (c->flags & CTDB_CTRL_FLAG_NOREPLY) {
968                 talloc_steal(tmp_ctx, state);
969         }
970         
971         data.dptr = &c->data[0];
972         data.dsize = c->datalen;
973         res = ctdb_daemon_send_control(client->ctdb, c->hdr.destnode,
974                                        c->srvid, c->opcode, client->client_id,
975                                        c->flags,
976                                        data, daemon_control_callback,
977                                        state);
978         if (res != 0) {
979                 DEBUG(DEBUG_ERR,(__location__ " Failed to send control to remote node %u\n",
980                          c->hdr.destnode));
981         }
982
983         talloc_free(tmp_ctx);
984 }
985
986 /*
987   register a call function
988 */
989 int ctdb_daemon_set_call(struct ctdb_context *ctdb, uint32_t db_id,
990                          ctdb_fn_t fn, int id)
991 {
992         struct ctdb_registered_call *call;
993         struct ctdb_db_context *ctdb_db;
994
995         ctdb_db = find_ctdb_db(ctdb, db_id);
996         if (ctdb_db == NULL) {
997                 return -1;
998         }
999
1000         call = talloc(ctdb_db, struct ctdb_registered_call);
1001         call->fn = fn;
1002         call->id = id;
1003
1004         DLIST_ADD(ctdb_db->calls, call);        
1005         return 0;
1006 }
1007
1008
1009
1010 /*
1011   this local messaging handler is ugly, but is needed to prevent
1012   recursion in ctdb_send_message() when the destination node is the
1013   same as the source node
1014  */
1015 struct ctdb_local_message {
1016         struct ctdb_context *ctdb;
1017         uint64_t srvid;
1018         TDB_DATA data;
1019 };
1020
1021 static void ctdb_local_message_trigger(struct event_context *ev, struct timed_event *te, 
1022                                        struct timeval t, void *private_data)
1023 {
1024         struct ctdb_local_message *m = talloc_get_type(private_data, 
1025                                                        struct ctdb_local_message);
1026         int res;
1027
1028         res = ctdb_dispatch_message(m->ctdb, m->srvid, m->data);
1029         if (res != 0) {
1030                 DEBUG(DEBUG_ERR, (__location__ " Failed to dispatch message for srvid=%llu\n", 
1031                           (unsigned long long)m->srvid));
1032         }
1033         talloc_free(m);
1034 }
1035
1036 static int ctdb_local_message(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data)
1037 {
1038         struct ctdb_local_message *m;
1039         m = talloc(ctdb, struct ctdb_local_message);
1040         CTDB_NO_MEMORY(ctdb, m);
1041
1042         m->ctdb = ctdb;
1043         m->srvid = srvid;
1044         m->data  = data;
1045         m->data.dptr = talloc_memdup(m, m->data.dptr, m->data.dsize);
1046         if (m->data.dptr == NULL) {
1047                 talloc_free(m);
1048                 return -1;
1049         }
1050
1051         /* this needs to be done as an event to prevent recursion */
1052         event_add_timed(ctdb->ev, m, timeval_zero(), ctdb_local_message_trigger, m);
1053         return 0;
1054 }
1055
1056 /*
1057   send a ctdb message
1058 */
1059 int ctdb_daemon_send_message(struct ctdb_context *ctdb, uint32_t pnn,
1060                              uint64_t srvid, TDB_DATA data)
1061 {
1062         struct ctdb_req_message *r;
1063         int len;
1064
1065         if (ctdb->methods == NULL) {
1066                 DEBUG(DEBUG_ERR,(__location__ " Failed to send message. Transport is DOWN\n"));
1067                 return -1;
1068         }
1069
1070         /* see if this is a message to ourselves */
1071         if (pnn == ctdb->pnn) {
1072                 return ctdb_local_message(ctdb, srvid, data);
1073         }
1074
1075         len = offsetof(struct ctdb_req_message, data) + data.dsize;
1076         r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_MESSAGE, len,
1077                                     struct ctdb_req_message);
1078         CTDB_NO_MEMORY(ctdb, r);
1079
1080         r->hdr.destnode  = pnn;
1081         r->srvid         = srvid;
1082         r->datalen       = data.dsize;
1083         memcpy(&r->data[0], data.dptr, data.dsize);
1084
1085         ctdb_queue_packet(ctdb, &r->hdr);
1086
1087         talloc_free(r);
1088         return 0;
1089 }
1090
1091
1092
1093 struct ctdb_client_notify_list {
1094         struct ctdb_client_notify_list *next, *prev;
1095         struct ctdb_context *ctdb;
1096         uint64_t srvid;
1097         TDB_DATA data;
1098 };
1099
1100
1101 static int ctdb_client_notify_destructor(struct ctdb_client_notify_list *nl)
1102 {
1103         int ret;
1104
1105         DEBUG(DEBUG_ERR,("Sending client notify message for srvid:%llu\n", (unsigned long long)nl->srvid));
1106
1107         ret = ctdb_daemon_send_message(nl->ctdb, CTDB_BROADCAST_CONNECTED, (unsigned long long)nl->srvid, nl->data);
1108         if (ret != 0) {
1109                 DEBUG(DEBUG_ERR,("Failed to send client notify message\n"));
1110         }
1111
1112         return 0;
1113 }
1114
1115 int32_t ctdb_control_register_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1116 {
1117         struct ctdb_client_notify_register *notify = (struct ctdb_client_notify_register *)indata.dptr;
1118         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client); 
1119         struct ctdb_client_notify_list *nl;
1120
1121         DEBUG(DEBUG_ERR,("Register srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
1122
1123         if (indata.dsize < offsetof(struct ctdb_client_notify_register, notify_data)) {
1124                 DEBUG(DEBUG_ERR,(__location__ " Too little data in control : %d\n", (int)indata.dsize));
1125                 return -1;
1126         }
1127
1128         if (indata.dsize != (notify->len + offsetof(struct ctdb_client_notify_register, notify_data))) {
1129                 DEBUG(DEBUG_ERR,(__location__ " Wrong amount of data in control. Got %d, expected %d\n", (int)indata.dsize, (int)(notify->len + offsetof(struct ctdb_client_notify_register, notify_data))));
1130                 return -1;
1131         }
1132
1133
1134         if (client == NULL) {
1135                 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1136                 return -1;
1137         }
1138
1139         for(nl=client->notify; nl; nl=nl->next) {
1140                 if (nl->srvid == notify->srvid) {
1141                         break;
1142                 }
1143         }
1144         if (nl != NULL) {
1145                 DEBUG(DEBUG_ERR,(__location__ " Notification for srvid:%llu already exists for this client\n", (unsigned long long)notify->srvid));
1146                 return -1;
1147         }
1148
1149         nl = talloc(client, struct ctdb_client_notify_list);
1150         CTDB_NO_MEMORY(ctdb, nl);
1151         nl->ctdb       = ctdb;
1152         nl->srvid      = notify->srvid;
1153         nl->data.dsize = notify->len;
1154         nl->data.dptr  = talloc_size(nl, nl->data.dsize);
1155         CTDB_NO_MEMORY(ctdb, nl->data.dptr);
1156         memcpy(nl->data.dptr, notify->notify_data, nl->data.dsize);
1157         
1158         DLIST_ADD(client->notify, nl);
1159         talloc_set_destructor(nl, ctdb_client_notify_destructor);
1160
1161         return 0;
1162 }
1163
1164 int32_t ctdb_control_deregister_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1165 {
1166         struct ctdb_client_notify_deregister *notify = (struct ctdb_client_notify_deregister *)indata.dptr;
1167         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client); 
1168         struct ctdb_client_notify_list *nl;
1169
1170         DEBUG(DEBUG_ERR,("Deregister srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
1171
1172         if (client == NULL) {
1173                 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1174                 return -1;
1175         }
1176
1177         for(nl=client->notify; nl; nl=nl->next) {
1178                 if (nl->srvid == notify->srvid) {
1179                         break;
1180                 }
1181         }
1182         if (nl == NULL) {
1183                 DEBUG(DEBUG_ERR,(__location__ " No notification for srvid:%llu found for this client\n", (unsigned long long)notify->srvid));
1184                 return -1;
1185         }
1186
1187         DLIST_REMOVE(client->notify, nl);
1188         talloc_set_destructor(nl, NULL);
1189         talloc_free(nl);
1190
1191         return 0;
1192 }
1193
1194 struct ctdb_client *ctdb_find_client_by_pid(struct ctdb_context *ctdb, pid_t pid)
1195 {
1196         struct ctdb_client_pid_list *client_pid;
1197
1198         for (client_pid = ctdb->client_pids; client_pid; client_pid=client_pid->next) {
1199                 if (client_pid->pid == pid) {
1200                         return client_pid->client;
1201                 }
1202         }
1203         return NULL;
1204 }
1205
1206
1207 /* This control is used by samba when probing if a process (of a samba daemon)
1208    exists on the node.
1209    Samba does this when it needs/wants to check if a subrecord in one of the
1210    databases is still valied, or if it is stale and can be removed.
1211    If the node is in unhealthy or stopped state we just kill of the samba
1212    process holding htis sub-record and return to the calling samba that
1213    the process does not exist.
1214    This allows us to forcefully recall subrecords registered by samba processes
1215    on banned and stopped nodes.
1216 */
1217 int32_t ctdb_control_process_exists(struct ctdb_context *ctdb, pid_t pid)
1218 {
1219         struct ctdb_client *client;
1220
1221         if (ctdb->nodes[ctdb->pnn]->flags & (NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
1222                 client = ctdb_find_client_by_pid(ctdb, pid);
1223                 if (client != NULL) {
1224                         DEBUG(DEBUG_NOTICE,(__location__ " Killing client with pid:%d on banned/stopped node\n", (int)pid));
1225                         talloc_free(client);
1226                 }
1227                 return -1;
1228         }
1229
1230         return kill(pid, 0);
1231 }