event: Update events to latest Samba version 0.9.8
[vlendec/samba-autobuild/.git] / ctdb / server / ctdb_daemon.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2006
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "db_wrap.h"
22 #include "lib/tdb/include/tdb.h"
23 #include "lib/tevent/tevent.h"
24 #include "lib/util/dlinklist.h"
25 #include "system/network.h"
26 #include "system/filesys.h"
27 #include "system/wait.h"
28 #include "../include/ctdb_client.h"
29 #include "../include/ctdb_private.h"
30 #include <sys/socket.h>
31
32 struct ctdb_client_pid_list {
33         struct ctdb_client_pid_list *next, *prev;
34         struct ctdb_context *ctdb;
35         pid_t pid;
36         struct ctdb_client *client;
37 };
38
39 static void daemon_incoming_packet(void *, struct ctdb_req_header *);
40
41 static void print_exit_message(void)
42 {
43         DEBUG(DEBUG_NOTICE,("CTDB daemon shutting down\n"));
44 }
45
46 /* called when the "startup" event script has finished */
47 static void ctdb_start_transport(struct ctdb_context *ctdb)
48 {
49         if (ctdb->methods == NULL) {
50                 DEBUG(DEBUG_ALERT,(__location__ " startup event finished but transport is DOWN.\n"));
51                 ctdb_fatal(ctdb, "transport is not initialized but startup completed");
52         }
53
54         /* start the transport running */
55         if (ctdb->methods->start(ctdb) != 0) {
56                 DEBUG(DEBUG_ALERT,("transport failed to start!\n"));
57                 ctdb_fatal(ctdb, "transport failed to start");
58         }
59
60         /* start the recovery daemon process */
61         if (ctdb_start_recoverd(ctdb) != 0) {
62                 DEBUG(DEBUG_ALERT,("Failed to start recovery daemon\n"));
63                 exit(11);
64         }
65
66         /* Make sure we log something when the daemon terminates */
67         atexit(print_exit_message);
68
69         /* start monitoring for connected/disconnected nodes */
70         ctdb_start_keepalive(ctdb);
71
72         /* start monitoring for node health */
73         ctdb_start_monitoring(ctdb);
74
75         /* start periodic update of tcp tickle lists */
76         ctdb_start_tcp_tickle_update(ctdb);
77
78         /* start listening for recovery daemon pings */
79         ctdb_control_recd_ping(ctdb);
80 }
81
82 static void block_signal(int signum)
83 {
84         struct sigaction act;
85
86         memset(&act, 0, sizeof(act));
87
88         act.sa_handler = SIG_IGN;
89         sigemptyset(&act.sa_mask);
90         sigaddset(&act.sa_mask, signum);
91         sigaction(signum, &act, NULL);
92 }
93
94
95 /*
96   send a packet to a client
97  */
98 static int daemon_queue_send(struct ctdb_client *client, struct ctdb_req_header *hdr)
99 {
100         client->ctdb->statistics.client_packets_sent++;
101         if (hdr->operation == CTDB_REQ_MESSAGE) {
102                 if (ctdb_queue_length(client->queue) > client->ctdb->tunable.max_queue_depth_drop_msg) {
103                         DEBUG(DEBUG_ERR,("CTDB_REQ_MESSAGE queue full - killing client connection.\n"));
104                         talloc_free(client);
105                         return -1;
106                 }
107         }
108         return ctdb_queue_send(client->queue, (uint8_t *)hdr, hdr->length);
109 }
110
111 /*
112   message handler for when we are in daemon mode. This redirects the message
113   to the right client
114  */
115 static void daemon_message_handler(struct ctdb_context *ctdb, uint64_t srvid, 
116                                     TDB_DATA data, void *private_data)
117 {
118         struct ctdb_client *client = talloc_get_type(private_data, struct ctdb_client);
119         struct ctdb_req_message *r;
120         int len;
121
122         /* construct a message to send to the client containing the data */
123         len = offsetof(struct ctdb_req_message, data) + data.dsize;
124         r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE, 
125                                len, struct ctdb_req_message);
126         CTDB_NO_MEMORY_VOID(ctdb, r);
127
128         talloc_set_name_const(r, "req_message packet");
129
130         r->srvid         = srvid;
131         r->datalen       = data.dsize;
132         memcpy(&r->data[0], data.dptr, data.dsize);
133
134         daemon_queue_send(client, &r->hdr);
135
136         talloc_free(r);
137 }
138
139 /*
140   this is called when the ctdb daemon received a ctdb request to 
141   set the srvid from the client
142  */
143 int daemon_register_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
144 {
145         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
146         int res;
147         if (client == NULL) {
148                 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_register_message_handler\n"));
149                 return -1;
150         }
151         res = ctdb_register_message_handler(ctdb, client, srvid, daemon_message_handler, client);
152         if (res != 0) {
153                 DEBUG(DEBUG_ERR,(__location__ " Failed to register handler %llu in daemon\n", 
154                          (unsigned long long)srvid));
155         } else {
156                 DEBUG(DEBUG_INFO,(__location__ " Registered message handler for srvid=%llu\n", 
157                          (unsigned long long)srvid));
158         }
159
160         return res;
161 }
162
163 /*
164   this is called when the ctdb daemon received a ctdb request to 
165   remove a srvid from the client
166  */
167 int daemon_deregister_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
168 {
169         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
170         if (client == NULL) {
171                 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_deregister_message_handler\n"));
172                 return -1;
173         }
174         return ctdb_deregister_message_handler(ctdb, srvid, client);
175 }
176
177
178 /*
179   destroy a ctdb_client
180 */
181 static int ctdb_client_destructor(struct ctdb_client *client)
182 {
183         struct ctdb_db_context *ctdb_db;
184
185         ctdb_takeover_client_destructor_hook(client);
186         ctdb_reqid_remove(client->ctdb, client->client_id);
187         if (client->ctdb->statistics.num_clients) {
188                 client->ctdb->statistics.num_clients--;
189         }
190
191         if (client->num_persistent_updates != 0) {
192                 DEBUG(DEBUG_ERR,(__location__ " Client disconnecting with %u persistent updates in flight. Starting recovery\n", client->num_persistent_updates));
193                 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
194         }
195         ctdb_db = find_ctdb_db(client->ctdb, client->db_id);
196         if (ctdb_db) {
197                 DEBUG(DEBUG_ERR, (__location__ " client exit while transaction "
198                                   "commit active. Forcing recovery.\n"));
199                 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
200                 ctdb_db->transaction_active = false;
201         }
202
203         return 0;
204 }
205
206
207 /*
208   this is called when the ctdb daemon received a ctdb request message
209   from a local client over the unix domain socket
210  */
211 static void daemon_request_message_from_client(struct ctdb_client *client, 
212                                                struct ctdb_req_message *c)
213 {
214         TDB_DATA data;
215         int res;
216
217         /* maybe the message is for another client on this node */
218         if (ctdb_get_pnn(client->ctdb)==c->hdr.destnode) {
219                 ctdb_request_message(client->ctdb, (struct ctdb_req_header *)c);
220                 return;
221         }
222
223         /* its for a remote node */
224         data.dptr = &c->data[0];
225         data.dsize = c->datalen;
226         res = ctdb_daemon_send_message(client->ctdb, c->hdr.destnode,
227                                        c->srvid, data);
228         if (res != 0) {
229                 DEBUG(DEBUG_ERR,(__location__ " Failed to send message to remote node %u\n",
230                          c->hdr.destnode));
231         }
232 }
233
234
235 struct daemon_call_state {
236         struct ctdb_client *client;
237         uint32_t reqid;
238         struct ctdb_call *call;
239         struct timeval start_time;
240 };
241
242 /* 
243    complete a call from a client 
244 */
245 static void daemon_call_from_client_callback(struct ctdb_call_state *state)
246 {
247         struct daemon_call_state *dstate = talloc_get_type(state->async.private_data, 
248                                                            struct daemon_call_state);
249         struct ctdb_reply_call *r;
250         int res;
251         uint32_t length;
252         struct ctdb_client *client = dstate->client;
253         struct ctdb_db_context *ctdb_db = state->ctdb_db;
254
255         talloc_steal(client, dstate);
256         talloc_steal(dstate, dstate->call);
257
258         res = ctdb_daemon_call_recv(state, dstate->call);
259         if (res != 0) {
260                 DEBUG(DEBUG_ERR, (__location__ " ctdbd_call_recv() returned error\n"));
261                 if (client->ctdb->statistics.pending_calls > 0) {
262                         client->ctdb->statistics.pending_calls--;
263                 }
264                 ctdb_latency(ctdb_db, "call_from_client_cb 1", &client->ctdb->statistics.max_call_latency, dstate->start_time);
265                 return;
266         }
267
268         length = offsetof(struct ctdb_reply_call, data) + dstate->call->reply_data.dsize;
269         r = ctdbd_allocate_pkt(client->ctdb, dstate, CTDB_REPLY_CALL, 
270                                length, struct ctdb_reply_call);
271         if (r == NULL) {
272                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate reply_call in ctdb daemon\n"));
273                 if (client->ctdb->statistics.pending_calls > 0) {
274                         client->ctdb->statistics.pending_calls--;
275                 }
276                 ctdb_latency(ctdb_db, "call_from_client_cb 2", &client->ctdb->statistics.max_call_latency, dstate->start_time);
277                 return;
278         }
279         r->hdr.reqid        = dstate->reqid;
280         r->datalen          = dstate->call->reply_data.dsize;
281         memcpy(&r->data[0], dstate->call->reply_data.dptr, r->datalen);
282
283         res = daemon_queue_send(client, &r->hdr);
284         if (res == -1) {
285                 /* client is dead - return immediately */
286                 return;
287         }
288         if (res != 0) {
289                 DEBUG(DEBUG_ERR, (__location__ " Failed to queue packet from daemon to client\n"));
290         }
291         ctdb_latency(ctdb_db, "call_from_client_cb 3", &client->ctdb->statistics.max_call_latency, dstate->start_time);
292         talloc_free(dstate);
293         if (client->ctdb->statistics.pending_calls > 0) {
294                 client->ctdb->statistics.pending_calls--;
295         }
296 }
297
298 struct ctdb_daemon_packet_wrap {
299         struct ctdb_context *ctdb;
300         uint32_t client_id;
301 };
302
303 /*
304   a wrapper to catch disconnected clients
305  */
306 static void daemon_incoming_packet_wrap(void *p, struct ctdb_req_header *hdr)
307 {
308         struct ctdb_client *client;
309         struct ctdb_daemon_packet_wrap *w = talloc_get_type(p, 
310                                                             struct ctdb_daemon_packet_wrap);
311         if (w == NULL) {
312                 DEBUG(DEBUG_CRIT,(__location__ " Bad packet type '%s'\n", talloc_get_name(p)));
313                 return;
314         }
315
316         client = ctdb_reqid_find(w->ctdb, w->client_id, struct ctdb_client);
317         if (client == NULL) {
318                 DEBUG(DEBUG_ERR,(__location__ " Packet for disconnected client %u\n",
319                          w->client_id));
320                 talloc_free(w);
321                 return;
322         }
323         talloc_free(w);
324
325         /* process it */
326         daemon_incoming_packet(client, hdr);    
327 }
328
329
330 /*
331   this is called when the ctdb daemon received a ctdb request call
332   from a local client over the unix domain socket
333  */
334 static void daemon_request_call_from_client(struct ctdb_client *client, 
335                                             struct ctdb_req_call *c)
336 {
337         struct ctdb_call_state *state;
338         struct ctdb_db_context *ctdb_db;
339         struct daemon_call_state *dstate;
340         struct ctdb_call *call;
341         struct ctdb_ltdb_header header;
342         TDB_DATA key, data;
343         int ret;
344         struct ctdb_context *ctdb = client->ctdb;
345         struct ctdb_daemon_packet_wrap *w;
346
347         ctdb->statistics.total_calls++;
348         if (client->ctdb->statistics.pending_calls > 0) {
349                 ctdb->statistics.pending_calls++;
350         }
351
352         ctdb_db = find_ctdb_db(client->ctdb, c->db_id);
353         if (!ctdb_db) {
354                 DEBUG(DEBUG_ERR, (__location__ " Unknown database in request. db_id==0x%08x",
355                           c->db_id));
356                 if (client->ctdb->statistics.pending_calls > 0) {
357                         ctdb->statistics.pending_calls--;
358                 }
359                 return;
360         }
361
362         if (ctdb_db->unhealthy_reason) {
363                 /*
364                  * this is just a warning, as the tdb should be empty anyway,
365                  * and only persistent databases can be unhealthy, which doesn't
366                  * use this code patch
367                  */
368                 DEBUG(DEBUG_WARNING,("warn: db(%s) unhealty in daemon_request_call_from_client(): %s\n",
369                                      ctdb_db->db_name, ctdb_db->unhealthy_reason));
370         }
371
372         key.dptr = c->data;
373         key.dsize = c->keylen;
374
375         w = talloc(ctdb, struct ctdb_daemon_packet_wrap);
376         CTDB_NO_MEMORY_VOID(ctdb, w);   
377
378         w->ctdb = ctdb;
379         w->client_id = client->client_id;
380
381         ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header, 
382                                            (struct ctdb_req_header *)c, &data,
383                                            daemon_incoming_packet_wrap, w, True);
384         if (ret == -2) {
385                 /* will retry later */
386                 if (client->ctdb->statistics.pending_calls > 0) {
387                         ctdb->statistics.pending_calls--;
388                 }
389                 return;
390         }
391
392         talloc_free(w);
393
394         if (ret != 0) {
395                 DEBUG(DEBUG_ERR,(__location__ " Unable to fetch record\n"));
396                 if (client->ctdb->statistics.pending_calls > 0) {
397                         ctdb->statistics.pending_calls--;
398                 }
399                 return;
400         }
401
402         dstate = talloc(client, struct daemon_call_state);
403         if (dstate == NULL) {
404                 ret = ctdb_ltdb_unlock(ctdb_db, key);
405                 if (ret != 0) {
406                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
407                 }
408
409                 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate dstate\n"));
410                 if (client->ctdb->statistics.pending_calls > 0) {
411                         ctdb->statistics.pending_calls--;
412                 }
413                 return;
414         }
415         dstate->start_time = timeval_current();
416         dstate->client = client;
417         dstate->reqid  = c->hdr.reqid;
418         talloc_steal(dstate, data.dptr);
419
420         call = dstate->call = talloc_zero(dstate, struct ctdb_call);
421         if (call == NULL) {
422                 ret = ctdb_ltdb_unlock(ctdb_db, key);
423                 if (ret != 0) {
424                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
425                 }
426
427                 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate call\n"));
428                 if (client->ctdb->statistics.pending_calls > 0) {
429                         ctdb->statistics.pending_calls--;
430                 }
431                 ctdb_latency(ctdb_db, "call_from_client 1", &ctdb->statistics.max_call_latency, dstate->start_time);
432                 return;
433         }
434
435         call->call_id = c->callid;
436         call->key = key;
437         call->call_data.dptr = c->data + c->keylen;
438         call->call_data.dsize = c->calldatalen;
439         call->flags = c->flags;
440
441         if (header.dmaster == ctdb->pnn) {
442                 state = ctdb_call_local_send(ctdb_db, call, &header, &data);
443         } else {
444                 state = ctdb_daemon_call_send_remote(ctdb_db, call, &header);
445         }
446
447         ret = ctdb_ltdb_unlock(ctdb_db, key);
448         if (ret != 0) {
449                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
450         }
451
452         if (state == NULL) {
453                 DEBUG(DEBUG_ERR,(__location__ " Unable to setup call send\n"));
454                 if (client->ctdb->statistics.pending_calls > 0) {
455                         ctdb->statistics.pending_calls--;
456                 }
457                 ctdb_latency(ctdb_db, "call_from_client 2", &ctdb->statistics.max_call_latency, dstate->start_time);
458                 return;
459         }
460         talloc_steal(state, dstate);
461         talloc_steal(client, state);
462
463         state->async.fn = daemon_call_from_client_callback;
464         state->async.private_data = dstate;
465 }
466
467
468 static void daemon_request_control_from_client(struct ctdb_client *client, 
469                                                struct ctdb_req_control *c);
470
471 /* data contains a packet from the client */
472 static void daemon_incoming_packet(void *p, struct ctdb_req_header *hdr)
473 {
474         struct ctdb_client *client = talloc_get_type(p, struct ctdb_client);
475         TALLOC_CTX *tmp_ctx;
476         struct ctdb_context *ctdb = client->ctdb;
477
478         /* place the packet as a child of a tmp_ctx. We then use
479            talloc_free() below to free it. If any of the calls want
480            to keep it, then they will steal it somewhere else, and the
481            talloc_free() will be a no-op */
482         tmp_ctx = talloc_new(client);
483         talloc_steal(tmp_ctx, hdr);
484
485         if (hdr->ctdb_magic != CTDB_MAGIC) {
486                 ctdb_set_error(client->ctdb, "Non CTDB packet rejected in daemon\n");
487                 goto done;
488         }
489
490         if (hdr->ctdb_version != CTDB_VERSION) {
491                 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
492                 goto done;
493         }
494
495         switch (hdr->operation) {
496         case CTDB_REQ_CALL:
497                 ctdb->statistics.client.req_call++;
498                 daemon_request_call_from_client(client, (struct ctdb_req_call *)hdr);
499                 break;
500
501         case CTDB_REQ_MESSAGE:
502                 ctdb->statistics.client.req_message++;
503                 daemon_request_message_from_client(client, (struct ctdb_req_message *)hdr);
504                 break;
505
506         case CTDB_REQ_CONTROL:
507                 ctdb->statistics.client.req_control++;
508                 daemon_request_control_from_client(client, (struct ctdb_req_control *)hdr);
509                 break;
510
511         default:
512                 DEBUG(DEBUG_CRIT,(__location__ " daemon: unrecognized operation %u\n",
513                          hdr->operation));
514         }
515
516 done:
517         talloc_free(tmp_ctx);
518 }
519
520 /*
521   called when the daemon gets a incoming packet
522  */
523 static void ctdb_daemon_read_cb(uint8_t *data, size_t cnt, void *args)
524 {
525         struct ctdb_client *client = talloc_get_type(args, struct ctdb_client);
526         struct ctdb_req_header *hdr;
527
528         if (cnt == 0) {
529                 talloc_free(client);
530                 return;
531         }
532
533         client->ctdb->statistics.client_packets_recv++;
534
535         if (cnt < sizeof(*hdr)) {
536                 ctdb_set_error(client->ctdb, "Bad packet length %u in daemon\n", 
537                                (unsigned)cnt);
538                 return;
539         }
540         hdr = (struct ctdb_req_header *)data;
541         if (cnt != hdr->length) {
542                 ctdb_set_error(client->ctdb, "Bad header length %u expected %u\n in daemon", 
543                                (unsigned)hdr->length, (unsigned)cnt);
544                 return;
545         }
546
547         if (hdr->ctdb_magic != CTDB_MAGIC) {
548                 ctdb_set_error(client->ctdb, "Non CTDB packet rejected\n");
549                 return;
550         }
551
552         if (hdr->ctdb_version != CTDB_VERSION) {
553                 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
554                 return;
555         }
556
557         DEBUG(DEBUG_DEBUG,(__location__ " client request %u of type %u length %u from "
558                  "node %u to %u\n", hdr->reqid, hdr->operation, hdr->length,
559                  hdr->srcnode, hdr->destnode));
560
561         /* it is the responsibility of the incoming packet function to free 'data' */
562         daemon_incoming_packet(client, hdr);
563 }
564
565
566 static int ctdb_clientpid_destructor(struct ctdb_client_pid_list *client_pid)
567 {
568         if (client_pid->ctdb->client_pids != NULL) {
569                 DLIST_REMOVE(client_pid->ctdb->client_pids, client_pid);
570         }
571
572         return 0;
573 }
574
575
576 static void ctdb_accept_client(struct event_context *ev, struct fd_event *fde, 
577                          uint16_t flags, void *private_data)
578 {
579         struct sockaddr_un addr;
580         socklen_t len;
581         int fd;
582         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
583         struct ctdb_client *client;
584         struct ctdb_client_pid_list *client_pid;
585 #ifdef _AIX
586         struct peercred_struct cr;
587         socklen_t crl = sizeof(struct peercred_struct);
588 #else
589         struct ucred cr;
590         socklen_t crl = sizeof(struct ucred);
591 #endif
592
593         memset(&addr, 0, sizeof(addr));
594         len = sizeof(addr);
595         fd = accept(ctdb->daemon.sd, (struct sockaddr *)&addr, &len);
596         if (fd == -1) {
597                 return;
598         }
599
600         set_nonblocking(fd);
601         set_close_on_exec(fd);
602
603         DEBUG(DEBUG_DEBUG,(__location__ " Created SOCKET FD:%d to connected child\n", fd));
604
605         client = talloc_zero(ctdb, struct ctdb_client);
606 #ifdef _AIX
607         if (getsockopt(fd, SOL_SOCKET, SO_PEERID, &cr, &crl) == 0) {
608 #else
609         if (getsockopt(fd, SOL_SOCKET, SO_PEERCRED, &cr, &crl) == 0) {
610 #endif
611                 DEBUG(DEBUG_INFO,("Connected client with pid:%u\n", (unsigned)cr.pid));
612         }
613
614         client->ctdb = ctdb;
615         client->fd = fd;
616         client->client_id = ctdb_reqid_new(ctdb, client);
617         client->pid = cr.pid;
618
619         client_pid = talloc(client, struct ctdb_client_pid_list);
620         if (client_pid == NULL) {
621                 DEBUG(DEBUG_ERR,("Failed to allocate client pid structure\n"));
622                 close(fd);
623                 talloc_free(client);
624                 return;
625         }               
626         client_pid->ctdb   = ctdb;
627         client_pid->pid    = cr.pid;
628         client_pid->client = client;
629
630         DLIST_ADD(ctdb->client_pids, client_pid);
631
632         client->queue = ctdb_queue_setup(ctdb, client, fd, CTDB_DS_ALIGNMENT, 
633                                          ctdb_daemon_read_cb, client,
634                                          "client-%u", client->pid);
635
636         talloc_set_destructor(client, ctdb_client_destructor);
637         talloc_set_destructor(client_pid, ctdb_clientpid_destructor);
638         ctdb->statistics.num_clients++;
639 }
640
641
642
643 /*
644   create a unix domain socket and bind it
645   return a file descriptor open on the socket 
646 */
647 static int ux_socket_bind(struct ctdb_context *ctdb)
648 {
649         struct sockaddr_un addr;
650
651         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
652         if (ctdb->daemon.sd == -1) {
653                 return -1;
654         }
655
656         set_close_on_exec(ctdb->daemon.sd);
657         set_nonblocking(ctdb->daemon.sd);
658
659         memset(&addr, 0, sizeof(addr));
660         addr.sun_family = AF_UNIX;
661         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
662
663         if (bind(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
664                 DEBUG(DEBUG_CRIT,("Unable to bind on ctdb socket '%s'\n", ctdb->daemon.name));
665                 goto failed;
666         }       
667
668         if (chown(ctdb->daemon.name, geteuid(), getegid()) != 0 ||
669             chmod(ctdb->daemon.name, 0700) != 0) {
670                 DEBUG(DEBUG_CRIT,("Unable to secure ctdb socket '%s', ctdb->daemon.name\n", ctdb->daemon.name));
671                 goto failed;
672         } 
673
674
675         if (listen(ctdb->daemon.sd, 100) != 0) {
676                 DEBUG(DEBUG_CRIT,("Unable to listen on ctdb socket '%s'\n", ctdb->daemon.name));
677                 goto failed;
678         }
679
680         return 0;
681
682 failed:
683         close(ctdb->daemon.sd);
684         ctdb->daemon.sd = -1;
685         return -1;      
686 }
687
688 static void sig_child_handler(struct event_context *ev,
689         struct signal_event *se, int signum, int count,
690         void *dont_care, 
691         void *private_data)
692 {
693 //      struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
694         int status;
695         pid_t pid = -1;
696
697         while (pid != 0) {
698                 pid = waitpid(-1, &status, WNOHANG);
699                 if (pid == -1) {
700                         DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%d\n", errno));
701                         return;
702                 }
703                 if (pid > 0) {
704                         DEBUG(DEBUG_DEBUG, ("SIGCHLD from %d\n", (int)pid));
705                 }
706         }
707 }
708
709 static void ctdb_setup_event_callback(struct ctdb_context *ctdb, int status,
710                                       void *private_data)
711 {
712         if (status != 0) {
713                 ctdb_fatal(ctdb, "Failed to run setup event\n");
714                 return;
715         }
716         ctdb_run_notification_script(ctdb, "setup");
717
718         /* tell all other nodes we've just started up */
719         ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL,
720                                  0, CTDB_CONTROL_STARTUP, 0,
721                                  CTDB_CTRL_FLAG_NOREPLY,
722                                  tdb_null, NULL, NULL);
723 }
724
725 /*
726   start the protocol going as a daemon
727 */
728 int ctdb_start_daemon(struct ctdb_context *ctdb, bool do_fork, bool use_syslog)
729 {
730         int res, ret = -1;
731         struct fd_event *fde;
732         const char *domain_socket_name;
733         struct signal_event *se;
734
735         /* get rid of any old sockets */
736         unlink(ctdb->daemon.name);
737
738         /* create a unix domain stream socket to listen to */
739         res = ux_socket_bind(ctdb);
740         if (res!=0) {
741                 DEBUG(DEBUG_ALERT,(__location__ " Failed to open CTDB unix domain socket\n"));
742                 exit(10);
743         }
744
745         if (do_fork && fork()) {
746                 return 0;
747         }
748
749         tdb_reopen_all(False);
750
751         if (do_fork) {
752                 setsid();
753                 close(0);
754                 if (open("/dev/null", O_RDONLY) != 0) {
755                         DEBUG(DEBUG_ALERT,(__location__ " Failed to setup stdin on /dev/null\n"));
756                         exit(11);
757                 }
758         }
759         block_signal(SIGPIPE);
760
761         ctdbd_pid = getpid();
762
763
764         DEBUG(DEBUG_ERR, ("Starting CTDBD as pid : %u\n", ctdbd_pid));
765
766         ctdb_high_priority(ctdb);
767
768         /* ensure the socket is deleted on exit of the daemon */
769         domain_socket_name = talloc_strdup(talloc_autofree_context(), ctdb->daemon.name);
770         if (domain_socket_name == NULL) {
771                 DEBUG(DEBUG_ALERT,(__location__ " talloc_strdup failed.\n"));
772                 exit(12);
773         }
774
775         ctdb->ev = event_context_init(NULL);
776
777         ctdb_set_child_logging(ctdb);
778
779         /* force initial recovery for election */
780         ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
781
782         if (strcmp(ctdb->transport, "tcp") == 0) {
783                 int ctdb_tcp_init(struct ctdb_context *);
784                 ret = ctdb_tcp_init(ctdb);
785         }
786 #ifdef USE_INFINIBAND
787         if (strcmp(ctdb->transport, "ib") == 0) {
788                 int ctdb_ibw_init(struct ctdb_context *);
789                 ret = ctdb_ibw_init(ctdb);
790         }
791 #endif
792         if (ret != 0) {
793                 DEBUG(DEBUG_ERR,("Failed to initialise transport '%s'\n", ctdb->transport));
794                 return -1;
795         }
796
797         if (ctdb->methods == NULL) {
798                 DEBUG(DEBUG_ALERT,(__location__ " Can not initialize transport. ctdb->methods is NULL\n"));
799                 ctdb_fatal(ctdb, "transport is unavailable. can not initialize.");
800         }
801
802         /* initialise the transport  */
803         if (ctdb->methods->initialise(ctdb) != 0) {
804                 ctdb_fatal(ctdb, "transport failed to initialise");
805         }
806
807         /* attach to existing databases */
808         if (ctdb_attach_databases(ctdb) != 0) {
809                 ctdb_fatal(ctdb, "Failed to attach to databases\n");
810         }
811
812         /* start frozen, then let the first election sort things out */
813         if (ctdb_blocking_freeze(ctdb)) {
814                 ctdb_fatal(ctdb, "Failed to get initial freeze\n");
815         }
816
817         ret = ctdb_event_script(ctdb, CTDB_EVENT_INIT);
818         if (ret != 0) {
819                 ctdb_fatal(ctdb, "Failed to run init event\n");
820         }
821         ctdb_run_notification_script(ctdb, "init");
822
823         /* now start accepting clients, only can do this once frozen */
824         fde = event_add_fd(ctdb->ev, ctdb, ctdb->daemon.sd, 
825                            EVENT_FD_READ,
826                            ctdb_accept_client, ctdb);
827         tevent_fd_set_auto_close(fde);
828
829         /* release any IPs we hold from previous runs of the daemon */
830         ctdb_release_all_ips(ctdb);
831
832         /* start the transport going */
833         ctdb_start_transport(ctdb);
834
835         /* set up a handler to pick up sigchld */
836         se = event_add_signal(ctdb->ev, ctdb,
837                                      SIGCHLD, 0,
838                                      sig_child_handler,
839                                      ctdb);
840         if (se == NULL) {
841                 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD\n"));
842                 exit(1);
843         }
844
845         ret = ctdb_event_script_callback(ctdb,
846                                          ctdb,
847                                          ctdb_setup_event_callback,
848                                          ctdb,
849                                          false,
850                                          CTDB_EVENT_SETUP,
851                                          "");
852         if (ret != 0) {
853                 DEBUG(DEBUG_CRIT,("Failed to set up 'setup' event\n"));
854                 exit(1);
855         }
856
857         if (use_syslog) {
858                 if (start_syslog_daemon(ctdb)) {
859                         DEBUG(DEBUG_CRIT, ("Failed to start syslog daemon\n"));
860                         exit(10);
861                 }
862         }
863
864         ctdb_lockdown_memory(ctdb);
865           
866         /* go into a wait loop to allow other nodes to complete */
867         event_loop_wait(ctdb->ev);
868
869         DEBUG(DEBUG_CRIT,("event_loop_wait() returned. this should not happen\n"));
870         exit(1);
871 }
872
873 /*
874   allocate a packet for use in daemon<->daemon communication
875  */
876 struct ctdb_req_header *_ctdb_transport_allocate(struct ctdb_context *ctdb,
877                                                  TALLOC_CTX *mem_ctx, 
878                                                  enum ctdb_operation operation, 
879                                                  size_t length, size_t slength,
880                                                  const char *type)
881 {
882         int size;
883         struct ctdb_req_header *hdr;
884
885         length = MAX(length, slength);
886         size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
887
888         if (ctdb->methods == NULL) {
889                 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate transport packet for operation %u of length %u. Transport is DOWN.\n",
890                          operation, (unsigned)length));
891                 return NULL;
892         }
893
894         hdr = (struct ctdb_req_header *)ctdb->methods->allocate_pkt(mem_ctx, size);
895         if (hdr == NULL) {
896                 DEBUG(DEBUG_ERR,("Unable to allocate transport packet for operation %u of length %u\n",
897                          operation, (unsigned)length));
898                 return NULL;
899         }
900         talloc_set_name_const(hdr, type);
901         memset(hdr, 0, slength);
902         hdr->length       = length;
903         hdr->operation    = operation;
904         hdr->ctdb_magic   = CTDB_MAGIC;
905         hdr->ctdb_version = CTDB_VERSION;
906         hdr->generation   = ctdb->vnn_map->generation;
907         hdr->srcnode      = ctdb->pnn;
908
909         return hdr;     
910 }
911
912 struct daemon_control_state {
913         struct daemon_control_state *next, *prev;
914         struct ctdb_client *client;
915         struct ctdb_req_control *c;
916         uint32_t reqid;
917         struct ctdb_node *node;
918 };
919
920 /*
921   callback when a control reply comes in
922  */
923 static void daemon_control_callback(struct ctdb_context *ctdb,
924                                     int32_t status, TDB_DATA data, 
925                                     const char *errormsg,
926                                     void *private_data)
927 {
928         struct daemon_control_state *state = talloc_get_type(private_data, 
929                                                              struct daemon_control_state);
930         struct ctdb_client *client = state->client;
931         struct ctdb_reply_control *r;
932         size_t len;
933         int ret;
934
935         /* construct a message to send to the client containing the data */
936         len = offsetof(struct ctdb_reply_control, data) + data.dsize;
937         if (errormsg) {
938                 len += strlen(errormsg);
939         }
940         r = ctdbd_allocate_pkt(ctdb, state, CTDB_REPLY_CONTROL, len, 
941                                struct ctdb_reply_control);
942         CTDB_NO_MEMORY_VOID(ctdb, r);
943
944         r->hdr.reqid     = state->reqid;
945         r->status        = status;
946         r->datalen       = data.dsize;
947         r->errorlen = 0;
948         memcpy(&r->data[0], data.dptr, data.dsize);
949         if (errormsg) {
950                 r->errorlen = strlen(errormsg);
951                 memcpy(&r->data[r->datalen], errormsg, r->errorlen);
952         }
953
954         ret = daemon_queue_send(client, &r->hdr);
955         if (ret != -1) {
956                 talloc_free(state);
957         }
958 }
959
960 /*
961   fail all pending controls to a disconnected node
962  */
963 void ctdb_daemon_cancel_controls(struct ctdb_context *ctdb, struct ctdb_node *node)
964 {
965         struct daemon_control_state *state;
966         while ((state = node->pending_controls)) {
967                 DLIST_REMOVE(node->pending_controls, state);
968                 daemon_control_callback(ctdb, (uint32_t)-1, tdb_null, 
969                                         "node is disconnected", state);
970         }
971 }
972
973 /*
974   destroy a daemon_control_state
975  */
976 static int daemon_control_destructor(struct daemon_control_state *state)
977 {
978         if (state->node) {
979                 DLIST_REMOVE(state->node->pending_controls, state);
980         }
981         return 0;
982 }
983
984 /*
985   this is called when the ctdb daemon received a ctdb request control
986   from a local client over the unix domain socket
987  */
988 static void daemon_request_control_from_client(struct ctdb_client *client, 
989                                                struct ctdb_req_control *c)
990 {
991         TDB_DATA data;
992         int res;
993         struct daemon_control_state *state;
994         TALLOC_CTX *tmp_ctx = talloc_new(client);
995
996         if (c->hdr.destnode == CTDB_CURRENT_NODE) {
997                 c->hdr.destnode = client->ctdb->pnn;
998         }
999
1000         state = talloc(client, struct daemon_control_state);
1001         CTDB_NO_MEMORY_VOID(client->ctdb, state);
1002
1003         state->client = client;
1004         state->c = talloc_steal(state, c);
1005         state->reqid = c->hdr.reqid;
1006         if (ctdb_validate_pnn(client->ctdb, c->hdr.destnode)) {
1007                 state->node = client->ctdb->nodes[c->hdr.destnode];
1008                 DLIST_ADD(state->node->pending_controls, state);
1009         } else {
1010                 state->node = NULL;
1011         }
1012
1013         talloc_set_destructor(state, daemon_control_destructor);
1014
1015         if (c->flags & CTDB_CTRL_FLAG_NOREPLY) {
1016                 talloc_steal(tmp_ctx, state);
1017         }
1018         
1019         data.dptr = &c->data[0];
1020         data.dsize = c->datalen;
1021         res = ctdb_daemon_send_control(client->ctdb, c->hdr.destnode,
1022                                        c->srvid, c->opcode, client->client_id,
1023                                        c->flags,
1024                                        data, daemon_control_callback,
1025                                        state);
1026         if (res != 0) {
1027                 DEBUG(DEBUG_ERR,(__location__ " Failed to send control to remote node %u\n",
1028                          c->hdr.destnode));
1029         }
1030
1031         talloc_free(tmp_ctx);
1032 }
1033
1034 /*
1035   register a call function
1036 */
1037 int ctdb_daemon_set_call(struct ctdb_context *ctdb, uint32_t db_id,
1038                          ctdb_fn_t fn, int id)
1039 {
1040         struct ctdb_registered_call *call;
1041         struct ctdb_db_context *ctdb_db;
1042
1043         ctdb_db = find_ctdb_db(ctdb, db_id);
1044         if (ctdb_db == NULL) {
1045                 return -1;
1046         }
1047
1048         call = talloc(ctdb_db, struct ctdb_registered_call);
1049         call->fn = fn;
1050         call->id = id;
1051
1052         DLIST_ADD(ctdb_db->calls, call);        
1053         return 0;
1054 }
1055
1056
1057
1058 /*
1059   this local messaging handler is ugly, but is needed to prevent
1060   recursion in ctdb_send_message() when the destination node is the
1061   same as the source node
1062  */
1063 struct ctdb_local_message {
1064         struct ctdb_context *ctdb;
1065         uint64_t srvid;
1066         TDB_DATA data;
1067 };
1068
1069 static void ctdb_local_message_trigger(struct event_context *ev, struct timed_event *te, 
1070                                        struct timeval t, void *private_data)
1071 {
1072         struct ctdb_local_message *m = talloc_get_type(private_data, 
1073                                                        struct ctdb_local_message);
1074         int res;
1075
1076         res = ctdb_dispatch_message(m->ctdb, m->srvid, m->data);
1077         if (res != 0) {
1078                 DEBUG(DEBUG_ERR, (__location__ " Failed to dispatch message for srvid=%llu\n", 
1079                           (unsigned long long)m->srvid));
1080         }
1081         talloc_free(m);
1082 }
1083
1084 static int ctdb_local_message(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data)
1085 {
1086         struct ctdb_local_message *m;
1087         m = talloc(ctdb, struct ctdb_local_message);
1088         CTDB_NO_MEMORY(ctdb, m);
1089
1090         m->ctdb = ctdb;
1091         m->srvid = srvid;
1092         m->data  = data;
1093         m->data.dptr = talloc_memdup(m, m->data.dptr, m->data.dsize);
1094         if (m->data.dptr == NULL) {
1095                 talloc_free(m);
1096                 return -1;
1097         }
1098
1099         /* this needs to be done as an event to prevent recursion */
1100         event_add_timed(ctdb->ev, m, timeval_zero(), ctdb_local_message_trigger, m);
1101         return 0;
1102 }
1103
1104 /*
1105   send a ctdb message
1106 */
1107 int ctdb_daemon_send_message(struct ctdb_context *ctdb, uint32_t pnn,
1108                              uint64_t srvid, TDB_DATA data)
1109 {
1110         struct ctdb_req_message *r;
1111         int len;
1112
1113         if (ctdb->methods == NULL) {
1114                 DEBUG(DEBUG_ERR,(__location__ " Failed to send message. Transport is DOWN\n"));
1115                 return -1;
1116         }
1117
1118         /* see if this is a message to ourselves */
1119         if (pnn == ctdb->pnn) {
1120                 return ctdb_local_message(ctdb, srvid, data);
1121         }
1122
1123         len = offsetof(struct ctdb_req_message, data) + data.dsize;
1124         r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_MESSAGE, len,
1125                                     struct ctdb_req_message);
1126         CTDB_NO_MEMORY(ctdb, r);
1127
1128         r->hdr.destnode  = pnn;
1129         r->srvid         = srvid;
1130         r->datalen       = data.dsize;
1131         memcpy(&r->data[0], data.dptr, data.dsize);
1132
1133         ctdb_queue_packet(ctdb, &r->hdr);
1134
1135         talloc_free(r);
1136         return 0;
1137 }
1138
1139
1140
1141 struct ctdb_client_notify_list {
1142         struct ctdb_client_notify_list *next, *prev;
1143         struct ctdb_context *ctdb;
1144         uint64_t srvid;
1145         TDB_DATA data;
1146 };
1147
1148
1149 static int ctdb_client_notify_destructor(struct ctdb_client_notify_list *nl)
1150 {
1151         int ret;
1152
1153         DEBUG(DEBUG_ERR,("Sending client notify message for srvid:%llu\n", (unsigned long long)nl->srvid));
1154
1155         ret = ctdb_daemon_send_message(nl->ctdb, CTDB_BROADCAST_CONNECTED, (unsigned long long)nl->srvid, nl->data);
1156         if (ret != 0) {
1157                 DEBUG(DEBUG_ERR,("Failed to send client notify message\n"));
1158         }
1159
1160         return 0;
1161 }
1162
1163 int32_t ctdb_control_register_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1164 {
1165         struct ctdb_client_notify_register *notify = (struct ctdb_client_notify_register *)indata.dptr;
1166         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client); 
1167         struct ctdb_client_notify_list *nl;
1168
1169         DEBUG(DEBUG_INFO,("Register srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
1170
1171         if (indata.dsize < offsetof(struct ctdb_client_notify_register, notify_data)) {
1172                 DEBUG(DEBUG_ERR,(__location__ " Too little data in control : %d\n", (int)indata.dsize));
1173                 return -1;
1174         }
1175
1176         if (indata.dsize != (notify->len + offsetof(struct ctdb_client_notify_register, notify_data))) {
1177                 DEBUG(DEBUG_ERR,(__location__ " Wrong amount of data in control. Got %d, expected %d\n", (int)indata.dsize, (int)(notify->len + offsetof(struct ctdb_client_notify_register, notify_data))));
1178                 return -1;
1179         }
1180
1181
1182         if (client == NULL) {
1183                 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1184                 return -1;
1185         }
1186
1187         for(nl=client->notify; nl; nl=nl->next) {
1188                 if (nl->srvid == notify->srvid) {
1189                         break;
1190                 }
1191         }
1192         if (nl != NULL) {
1193                 DEBUG(DEBUG_ERR,(__location__ " Notification for srvid:%llu already exists for this client\n", (unsigned long long)notify->srvid));
1194                 return -1;
1195         }
1196
1197         nl = talloc(client, struct ctdb_client_notify_list);
1198         CTDB_NO_MEMORY(ctdb, nl);
1199         nl->ctdb       = ctdb;
1200         nl->srvid      = notify->srvid;
1201         nl->data.dsize = notify->len;
1202         nl->data.dptr  = talloc_size(nl, nl->data.dsize);
1203         CTDB_NO_MEMORY(ctdb, nl->data.dptr);
1204         memcpy(nl->data.dptr, notify->notify_data, nl->data.dsize);
1205         
1206         DLIST_ADD(client->notify, nl);
1207         talloc_set_destructor(nl, ctdb_client_notify_destructor);
1208
1209         return 0;
1210 }
1211
1212 int32_t ctdb_control_deregister_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1213 {
1214         struct ctdb_client_notify_deregister *notify = (struct ctdb_client_notify_deregister *)indata.dptr;
1215         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client); 
1216         struct ctdb_client_notify_list *nl;
1217
1218         DEBUG(DEBUG_INFO,("Deregister srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
1219
1220         if (client == NULL) {
1221                 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1222                 return -1;
1223         }
1224
1225         for(nl=client->notify; nl; nl=nl->next) {
1226                 if (nl->srvid == notify->srvid) {
1227                         break;
1228                 }
1229         }
1230         if (nl == NULL) {
1231                 DEBUG(DEBUG_ERR,(__location__ " No notification for srvid:%llu found for this client\n", (unsigned long long)notify->srvid));
1232                 return -1;
1233         }
1234
1235         DLIST_REMOVE(client->notify, nl);
1236         talloc_set_destructor(nl, NULL);
1237         talloc_free(nl);
1238
1239         return 0;
1240 }
1241
1242 struct ctdb_client *ctdb_find_client_by_pid(struct ctdb_context *ctdb, pid_t pid)
1243 {
1244         struct ctdb_client_pid_list *client_pid;
1245
1246         for (client_pid = ctdb->client_pids; client_pid; client_pid=client_pid->next) {
1247                 if (client_pid->pid == pid) {
1248                         return client_pid->client;
1249                 }
1250         }
1251         return NULL;
1252 }
1253
1254
1255 /* This control is used by samba when probing if a process (of a samba daemon)
1256    exists on the node.
1257    Samba does this when it needs/wants to check if a subrecord in one of the
1258    databases is still valied, or if it is stale and can be removed.
1259    If the node is in unhealthy or stopped state we just kill of the samba
1260    process holding htis sub-record and return to the calling samba that
1261    the process does not exist.
1262    This allows us to forcefully recall subrecords registered by samba processes
1263    on banned and stopped nodes.
1264 */
1265 int32_t ctdb_control_process_exists(struct ctdb_context *ctdb, pid_t pid)
1266 {
1267         struct ctdb_client *client;
1268
1269         if (ctdb->nodes[ctdb->pnn]->flags & (NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
1270                 client = ctdb_find_client_by_pid(ctdb, pid);
1271                 if (client != NULL) {
1272                         DEBUG(DEBUG_NOTICE,(__location__ " Killing client with pid:%d on banned/stopped node\n", (int)pid));
1273                         talloc_free(client);
1274                 }
1275                 return -1;
1276         }
1277
1278         return kill(pid, 0);
1279 }