Make fetch_locked more scalable
[samba.git] / ctdb / server / ctdb_daemon.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2006
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "db_wrap.h"
22 #include "lib/tdb/include/tdb.h"
23 #include "lib/events/events.h"
24 #include "lib/util/dlinklist.h"
25 #include "system/network.h"
26 #include "system/filesys.h"
27 #include "system/wait.h"
28 #include "../include/ctdb.h"
29 #include "../include/ctdb_private.h"
30 #include <sys/socket.h>
31
32 struct ctdb_client_pid_list {
33         struct ctdb_client_pid_list *next, *prev;
34         struct ctdb_context *ctdb;
35         pid_t pid;
36         struct ctdb_client *client;
37 };
38
39 static void daemon_incoming_packet(void *, struct ctdb_req_header *);
40
41 static void print_exit_message(void)
42 {
43         DEBUG(DEBUG_NOTICE,("CTDB daemon shutting down\n"));
44 }
45
46 /* called when the "startup" event script has finished */
47 static void ctdb_start_transport(struct ctdb_context *ctdb)
48 {
49         if (ctdb->methods == NULL) {
50                 DEBUG(DEBUG_ALERT,(__location__ " startup event finished but transport is DOWN.\n"));
51                 ctdb_fatal(ctdb, "transport is not initialized but startup completed");
52         }
53
54         /* start the transport running */
55         if (ctdb->methods->start(ctdb) != 0) {
56                 DEBUG(DEBUG_ALERT,("transport failed to start!\n"));
57                 ctdb_fatal(ctdb, "transport failed to start");
58         }
59
60         /* start the recovery daemon process */
61         if (ctdb_start_recoverd(ctdb) != 0) {
62                 DEBUG(DEBUG_ALERT,("Failed to start recovery daemon\n"));
63                 exit(11);
64         }
65
66         /* Make sure we log something when the daemon terminates */
67         atexit(print_exit_message);
68
69         /* start monitoring for connected/disconnected nodes */
70         ctdb_start_keepalive(ctdb);
71
72         /* start monitoring for node health */
73         ctdb_start_monitoring(ctdb);
74
75         /* start periodic update of tcp tickle lists */
76         ctdb_start_tcp_tickle_update(ctdb);
77
78         /* start periodic cleanup of holdback cleanup */
79         ctdb_start_holdback_cleanup(ctdb);
80
81         /* start listening for recovery daemon pings */
82         ctdb_control_recd_ping(ctdb);
83 }
84
85 static void block_signal(int signum)
86 {
87         struct sigaction act;
88
89         memset(&act, 0, sizeof(act));
90
91         act.sa_handler = SIG_IGN;
92         sigemptyset(&act.sa_mask);
93         sigaddset(&act.sa_mask, signum);
94         sigaction(signum, &act, NULL);
95 }
96
97
98 /*
99   send a packet to a client
100  */
101 static int daemon_queue_send(struct ctdb_client *client, struct ctdb_req_header *hdr)
102 {
103         client->ctdb->statistics.client_packets_sent++;
104         if (hdr->operation == CTDB_REQ_MESSAGE) {
105                 if (ctdb_queue_length(client->queue) > client->ctdb->tunable.max_queue_depth_drop_msg) {
106                         DEBUG(DEBUG_ERR,("Drop CTDB_REQ_MESSAGE to client. Queue full.\n"));
107                         return 0;
108                 }
109         }
110         return ctdb_queue_send(client->queue, (uint8_t *)hdr, hdr->length);
111 }
112
113 /*
114   message handler for when we are in daemon mode. This redirects the message
115   to the right client
116  */
117 static void daemon_message_handler(struct ctdb_context *ctdb, uint64_t srvid, 
118                                     TDB_DATA data, void *private_data)
119 {
120         struct ctdb_client *client = talloc_get_type(private_data, struct ctdb_client);
121         struct ctdb_req_message *r;
122         int len;
123
124         /* construct a message to send to the client containing the data */
125         len = offsetof(struct ctdb_req_message, data) + data.dsize;
126         r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE, 
127                                len, struct ctdb_req_message);
128         CTDB_NO_MEMORY_VOID(ctdb, r);
129
130         talloc_set_name_const(r, "req_message packet");
131
132         r->srvid         = srvid;
133         r->datalen       = data.dsize;
134         memcpy(&r->data[0], data.dptr, data.dsize);
135
136         daemon_queue_send(client, &r->hdr);
137
138         talloc_free(r);
139 }
140
141 /*
142   this is called when the ctdb daemon received a ctdb request to 
143   set the srvid from the client
144  */
145 int daemon_register_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
146 {
147         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
148         int res;
149         if (client == NULL) {
150                 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_register_message_handler\n"));
151                 return -1;
152         }
153         res = ctdb_register_message_handler(ctdb, client, srvid, daemon_message_handler, client);
154         if (res != 0) {
155                 DEBUG(DEBUG_ERR,(__location__ " Failed to register handler %llu in daemon\n", 
156                          (unsigned long long)srvid));
157         } else {
158                 DEBUG(DEBUG_INFO,(__location__ " Registered message handler for srvid=%llu\n", 
159                          (unsigned long long)srvid));
160         }
161
162         return res;
163 }
164
165 /*
166   this is called when the ctdb daemon received a ctdb request to 
167   remove a srvid from the client
168  */
169 int daemon_deregister_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
170 {
171         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
172         if (client == NULL) {
173                 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_deregister_message_handler\n"));
174                 return -1;
175         }
176         return ctdb_deregister_message_handler(ctdb, srvid, client);
177 }
178
179
180 /*
181   destroy a ctdb_client
182 */
183 static int ctdb_client_destructor(struct ctdb_client *client)
184 {
185         struct ctdb_db_context *ctdb_db;
186
187         ctdb_takeover_client_destructor_hook(client);
188         ctdb_reqid_remove(client->ctdb, client->client_id);
189         if (client->ctdb->statistics.num_clients) {
190                 client->ctdb->statistics.num_clients--;
191         }
192
193         if (client->num_persistent_updates != 0) {
194                 DEBUG(DEBUG_ERR,(__location__ " Client disconnecting with %u persistent updates in flight. Starting recovery\n", client->num_persistent_updates));
195                 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
196         }
197         ctdb_db = find_ctdb_db(client->ctdb, client->db_id);
198         if (ctdb_db) {
199                 DEBUG(DEBUG_ERR, (__location__ " client exit while transaction "
200                                   "commit active. Forcing recovery.\n"));
201                 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
202                 ctdb_db->transaction_active = false;
203         }
204
205         return 0;
206 }
207
208
209 /*
210   this is called when the ctdb daemon received a ctdb request message
211   from a local client over the unix domain socket
212  */
213 static void daemon_request_message_from_client(struct ctdb_client *client, 
214                                                struct ctdb_req_message *c)
215 {
216         TDB_DATA data;
217         int res;
218
219         /* maybe the message is for another client on this node */
220         if (ctdb_get_pnn(client->ctdb)==c->hdr.destnode) {
221                 ctdb_request_message(client->ctdb, (struct ctdb_req_header *)c);
222                 return;
223         }
224
225         /* its for a remote node */
226         data.dptr = &c->data[0];
227         data.dsize = c->datalen;
228         res = ctdb_daemon_send_message(client->ctdb, c->hdr.destnode,
229                                        c->srvid, data);
230         if (res != 0) {
231                 DEBUG(DEBUG_ERR,(__location__ " Failed to send message to remote node %u\n",
232                          c->hdr.destnode));
233         }
234 }
235
236
237 struct daemon_call_state {
238         struct ctdb_client *client;
239         uint32_t reqid;
240         struct ctdb_call *call;
241         struct timeval start_time;
242 };
243
244 /* 
245    complete a call from a client 
246 */
247 static void daemon_call_from_client_callback(struct ctdb_call_state *state)
248 {
249         struct daemon_call_state *dstate = talloc_get_type(state->async.private_data, 
250                                                            struct daemon_call_state);
251         struct ctdb_reply_call *r;
252         int res;
253         uint32_t length;
254         struct ctdb_client *client = dstate->client;
255         struct ctdb_db_context *ctdb_db = state->ctdb_db;
256
257         talloc_steal(client, dstate);
258         talloc_steal(dstate, dstate->call);
259
260         res = ctdb_daemon_call_recv(state, dstate->call);
261         if (res != 0) {
262                 DEBUG(DEBUG_ERR, (__location__ " ctdbd_call_recv() returned error\n"));
263                 if (client->ctdb->statistics.pending_calls > 0) {
264                         client->ctdb->statistics.pending_calls--;
265                 }
266                 ctdb_latency(ctdb_db, "call_from_client_cb 1", &client->ctdb->statistics.max_call_latency, dstate->start_time);
267                 return;
268         }
269
270         length = offsetof(struct ctdb_reply_call, data) + dstate->call->reply_data.dsize;
271         r = ctdbd_allocate_pkt(client->ctdb, dstate, CTDB_REPLY_CALL, 
272                                length, struct ctdb_reply_call);
273         if (r == NULL) {
274                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate reply_call in ctdb daemon\n"));
275                 if (client->ctdb->statistics.pending_calls > 0) {
276                         client->ctdb->statistics.pending_calls--;
277                 }
278                 ctdb_latency(ctdb_db, "call_from_client_cb 2", &client->ctdb->statistics.max_call_latency, dstate->start_time);
279                 return;
280         }
281         r->hdr.reqid        = dstate->reqid;
282         r->datalen          = dstate->call->reply_data.dsize;
283         memcpy(&r->data[0], dstate->call->reply_data.dptr, r->datalen);
284
285         res = daemon_queue_send(client, &r->hdr);
286         if (res != 0) {
287                 DEBUG(DEBUG_ERR, (__location__ " Failed to queue packet from daemon to client\n"));
288         }
289         ctdb_latency(ctdb_db, "call_from_client_cb 3", &client->ctdb->statistics.max_call_latency, dstate->start_time);
290         talloc_free(dstate);
291         if (client->ctdb->statistics.pending_calls > 0) {
292                 client->ctdb->statistics.pending_calls--;
293         }
294 }
295
296 struct ctdb_daemon_packet_wrap {
297         struct ctdb_context *ctdb;
298         uint32_t client_id;
299 };
300
301 /*
302   a wrapper to catch disconnected clients
303  */
304 static void daemon_incoming_packet_wrap(void *p, struct ctdb_req_header *hdr)
305 {
306         struct ctdb_client *client;
307         struct ctdb_daemon_packet_wrap *w = talloc_get_type(p, 
308                                                             struct ctdb_daemon_packet_wrap);
309         if (w == NULL) {
310                 DEBUG(DEBUG_CRIT,(__location__ " Bad packet type '%s'\n", talloc_get_name(p)));
311                 return;
312         }
313
314         client = ctdb_reqid_find(w->ctdb, w->client_id, struct ctdb_client);
315         if (client == NULL) {
316                 DEBUG(DEBUG_ERR,(__location__ " Packet for disconnected client %u\n",
317                          w->client_id));
318                 talloc_free(w);
319                 return;
320         }
321         talloc_free(w);
322
323         /* process it */
324         daemon_incoming_packet(client, hdr);    
325 }
326
327
328 /*
329   this is called when the ctdb daemon received a ctdb request call
330   from a local client over the unix domain socket
331  */
332 static void daemon_request_call_from_client(struct ctdb_client *client, 
333                                             struct ctdb_req_call *c)
334 {
335         struct ctdb_call_state *state;
336         struct ctdb_db_context *ctdb_db;
337         struct daemon_call_state *dstate;
338         struct ctdb_call *call;
339         struct ctdb_ltdb_header header;
340         TDB_DATA key, data;
341         int ret;
342         struct ctdb_context *ctdb = client->ctdb;
343         struct ctdb_daemon_packet_wrap *w;
344
345         ctdb->statistics.total_calls++;
346         if (client->ctdb->statistics.pending_calls > 0) {
347                 ctdb->statistics.pending_calls++;
348         }
349
350         ctdb_db = find_ctdb_db(client->ctdb, c->db_id);
351         if (!ctdb_db) {
352                 DEBUG(DEBUG_ERR, (__location__ " Unknown database in request. db_id==0x%08x",
353                           c->db_id));
354                 if (client->ctdb->statistics.pending_calls > 0) {
355                         ctdb->statistics.pending_calls--;
356                 }
357                 return;
358         }
359
360         key.dptr = c->data;
361         key.dsize = c->keylen;
362
363         w = talloc(ctdb, struct ctdb_daemon_packet_wrap);
364         CTDB_NO_MEMORY_VOID(ctdb, w);   
365
366         w->ctdb = ctdb;
367         w->client_id = client->client_id;
368
369         ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header, 
370                                            (struct ctdb_req_header *)c, &data,
371                                            daemon_incoming_packet_wrap, w, True);
372         if (ret == -2) {
373                 /* will retry later */
374                 if (client->ctdb->statistics.pending_calls > 0) {
375                         ctdb->statistics.pending_calls--;
376                 }
377                 return;
378         }
379
380         talloc_free(w);
381
382         if (ret != 0) {
383                 DEBUG(DEBUG_ERR,(__location__ " Unable to fetch record\n"));
384                 if (client->ctdb->statistics.pending_calls > 0) {
385                         ctdb->statistics.pending_calls--;
386                 }
387                 return;
388         }
389
390         dstate = talloc(client, struct daemon_call_state);
391         if (dstate == NULL) {
392                 ctdb_ltdb_unlock(ctdb_db, key);
393                 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate dstate\n"));
394                 if (client->ctdb->statistics.pending_calls > 0) {
395                         ctdb->statistics.pending_calls--;
396                 }
397                 return;
398         }
399         dstate->start_time = timeval_current();
400         dstate->client = client;
401         dstate->reqid  = c->hdr.reqid;
402         talloc_steal(dstate, data.dptr);
403
404         call = dstate->call = talloc_zero(dstate, struct ctdb_call);
405         if (call == NULL) {
406                 ctdb_ltdb_unlock(ctdb_db, key);
407                 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate call\n"));
408                 if (client->ctdb->statistics.pending_calls > 0) {
409                         ctdb->statistics.pending_calls--;
410                 }
411                 ctdb_latency(ctdb_db, "call_from_client 1", &ctdb->statistics.max_call_latency, dstate->start_time);
412                 return;
413         }
414
415         call->call_id = c->callid;
416         call->key = key;
417         call->call_data.dptr = c->data + c->keylen;
418         call->call_data.dsize = c->calldatalen;
419         call->flags = c->flags;
420
421         if (header.dmaster == ctdb->pnn) {
422                 state = ctdb_call_local_send(ctdb_db, call, &header, &data);
423         } else {
424                 state = ctdb_daemon_call_send_remote(ctdb_db, call, &header);
425         }
426
427         ctdb_ltdb_unlock(ctdb_db, key);
428
429         if (state == NULL) {
430                 DEBUG(DEBUG_ERR,(__location__ " Unable to setup call send\n"));
431                 if (client->ctdb->statistics.pending_calls > 0) {
432                         ctdb->statistics.pending_calls--;
433                 }
434                 ctdb_latency(ctdb_db, "call_from_client 2", &ctdb->statistics.max_call_latency, dstate->start_time);
435                 return;
436         }
437         talloc_steal(state, dstate);
438         talloc_steal(client, state);
439
440         state->async.fn = daemon_call_from_client_callback;
441         state->async.private_data = dstate;
442 }
443
444
445 static void daemon_request_control_from_client(struct ctdb_client *client, 
446                                                struct ctdb_req_control *c);
447
448 /* data contains a packet from the client */
449 static void daemon_incoming_packet(void *p, struct ctdb_req_header *hdr)
450 {
451         struct ctdb_client *client = talloc_get_type(p, struct ctdb_client);
452         TALLOC_CTX *tmp_ctx;
453         struct ctdb_context *ctdb = client->ctdb;
454
455         /* place the packet as a child of a tmp_ctx. We then use
456            talloc_free() below to free it. If any of the calls want
457            to keep it, then they will steal it somewhere else, and the
458            talloc_free() will be a no-op */
459         tmp_ctx = talloc_new(client);
460         talloc_steal(tmp_ctx, hdr);
461
462         if (hdr->ctdb_magic != CTDB_MAGIC) {
463                 ctdb_set_error(client->ctdb, "Non CTDB packet rejected in daemon\n");
464                 goto done;
465         }
466
467         if (hdr->ctdb_version != CTDB_VERSION) {
468                 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
469                 goto done;
470         }
471
472         switch (hdr->operation) {
473         case CTDB_REQ_CALL:
474                 ctdb->statistics.client.req_call++;
475                 daemon_request_call_from_client(client, (struct ctdb_req_call *)hdr);
476                 break;
477
478         case CTDB_REQ_MESSAGE:
479                 ctdb->statistics.client.req_message++;
480                 daemon_request_message_from_client(client, (struct ctdb_req_message *)hdr);
481                 break;
482
483         case CTDB_REQ_CONTROL:
484                 ctdb->statistics.client.req_control++;
485                 daemon_request_control_from_client(client, (struct ctdb_req_control *)hdr);
486                 break;
487
488         default:
489                 DEBUG(DEBUG_CRIT,(__location__ " daemon: unrecognized operation %u\n",
490                          hdr->operation));
491         }
492
493 done:
494         talloc_free(tmp_ctx);
495 }
496
497 /*
498   called when the daemon gets a incoming packet
499  */
500 static void ctdb_daemon_read_cb(uint8_t *data, size_t cnt, void *args)
501 {
502         struct ctdb_client *client = talloc_get_type(args, struct ctdb_client);
503         struct ctdb_req_header *hdr;
504
505         if (cnt == 0) {
506                 talloc_free(client);
507                 return;
508         }
509
510         client->ctdb->statistics.client_packets_recv++;
511
512         if (cnt < sizeof(*hdr)) {
513                 ctdb_set_error(client->ctdb, "Bad packet length %u in daemon\n", 
514                                (unsigned)cnt);
515                 return;
516         }
517         hdr = (struct ctdb_req_header *)data;
518         if (cnt != hdr->length) {
519                 ctdb_set_error(client->ctdb, "Bad header length %u expected %u\n in daemon", 
520                                (unsigned)hdr->length, (unsigned)cnt);
521                 return;
522         }
523
524         if (hdr->ctdb_magic != CTDB_MAGIC) {
525                 ctdb_set_error(client->ctdb, "Non CTDB packet rejected\n");
526                 return;
527         }
528
529         if (hdr->ctdb_version != CTDB_VERSION) {
530                 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
531                 return;
532         }
533
534         DEBUG(DEBUG_DEBUG,(__location__ " client request %u of type %u length %u from "
535                  "node %u to %u\n", hdr->reqid, hdr->operation, hdr->length,
536                  hdr->srcnode, hdr->destnode));
537
538         /* it is the responsibility of the incoming packet function to free 'data' */
539         daemon_incoming_packet(client, hdr);
540 }
541
542
543 static int ctdb_clientpid_destructor(struct ctdb_client_pid_list *client_pid)
544 {
545         if (client_pid->ctdb->client_pids != NULL) {
546                 DLIST_REMOVE(client_pid->ctdb->client_pids, client_pid);
547         }
548
549         return 0;
550 }
551
552
553 static void ctdb_accept_client(struct event_context *ev, struct fd_event *fde, 
554                          uint16_t flags, void *private_data)
555 {
556         struct sockaddr_un addr;
557         socklen_t len;
558         int fd;
559         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
560         struct ctdb_client *client;
561         struct ctdb_client_pid_list *client_pid;
562 #ifdef _AIX
563         struct peercred_struct cr;
564         socklen_t crl = sizeof(struct peercred_struct);
565 #else
566         struct ucred cr;
567         socklen_t crl = sizeof(struct ucred);
568 #endif
569
570         memset(&addr, 0, sizeof(addr));
571         len = sizeof(addr);
572         fd = accept(ctdb->daemon.sd, (struct sockaddr *)&addr, &len);
573         if (fd == -1) {
574                 return;
575         }
576
577         set_nonblocking(fd);
578         set_close_on_exec(fd);
579
580         DEBUG(DEBUG_DEBUG,(__location__ " Created SOCKET FD:%d to connected child\n", fd));
581
582         client = talloc_zero(ctdb, struct ctdb_client);
583 #ifdef _AIX
584         if (getsockopt(fd, SOL_SOCKET, SO_PEERID, &cr, &crl) == 0) {
585 #else
586         if (getsockopt(fd, SOL_SOCKET, SO_PEERCRED, &cr, &crl) == 0) {
587 #endif
588                 DEBUG(DEBUG_INFO,("Connected client with pid:%u\n", (unsigned)cr.pid));
589         }
590
591         client->ctdb = ctdb;
592         client->fd = fd;
593         client->client_id = ctdb_reqid_new(ctdb, client);
594         client->pid = cr.pid;
595
596         client_pid = talloc(client, struct ctdb_client_pid_list);
597         if (client_pid == NULL) {
598                 DEBUG(DEBUG_ERR,("Failed to allocate client pid structure\n"));
599                 close(fd);
600                 talloc_free(client);
601                 return;
602         }               
603         client_pid->ctdb   = ctdb;
604         client_pid->pid    = cr.pid;
605         client_pid->client = client;
606
607         DLIST_ADD(ctdb->client_pids, client_pid);
608
609         client->queue = ctdb_queue_setup(ctdb, client, fd, CTDB_DS_ALIGNMENT, 
610                                          ctdb_daemon_read_cb, client);
611
612         talloc_set_destructor(client, ctdb_client_destructor);
613         talloc_set_destructor(client_pid, ctdb_clientpid_destructor);
614         ctdb->statistics.num_clients++;
615 }
616
617
618
619 /*
620   create a unix domain socket and bind it
621   return a file descriptor open on the socket 
622 */
623 static int ux_socket_bind(struct ctdb_context *ctdb)
624 {
625         struct sockaddr_un addr;
626
627         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
628         if (ctdb->daemon.sd == -1) {
629                 return -1;
630         }
631
632         set_close_on_exec(ctdb->daemon.sd);
633         set_nonblocking(ctdb->daemon.sd);
634
635         memset(&addr, 0, sizeof(addr));
636         addr.sun_family = AF_UNIX;
637         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
638
639         if (bind(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
640                 DEBUG(DEBUG_CRIT,("Unable to bind on ctdb socket '%s'\n", ctdb->daemon.name));
641                 goto failed;
642         }       
643
644         if (chown(ctdb->daemon.name, geteuid(), getegid()) != 0 ||
645             chmod(ctdb->daemon.name, 0700) != 0) {
646                 DEBUG(DEBUG_CRIT,("Unable to secure ctdb socket '%s', ctdb->daemon.name\n", ctdb->daemon.name));
647                 goto failed;
648         } 
649
650
651         if (listen(ctdb->daemon.sd, 100) != 0) {
652                 DEBUG(DEBUG_CRIT,("Unable to listen on ctdb socket '%s'\n", ctdb->daemon.name));
653                 goto failed;
654         }
655
656         return 0;
657
658 failed:
659         close(ctdb->daemon.sd);
660         ctdb->daemon.sd = -1;
661         return -1;      
662 }
663
664 static void sig_child_handler(struct event_context *ev,
665         struct signal_event *se, int signum, int count,
666         void *dont_care, 
667         void *private_data)
668 {
669 //      struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
670         int status;
671         pid_t pid = -1;
672
673         while (pid != 0) {
674                 pid = waitpid(-1, &status, WNOHANG);
675                 if (pid == -1) {
676                         DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%d\n", errno));
677                         return;
678                 }
679                 if (pid > 0) {
680                         DEBUG(DEBUG_DEBUG, ("SIGCHLD from %d\n", (int)pid));
681                 }
682         }
683 }
684
685 /*
686   start the protocol going as a daemon
687 */
688 int ctdb_start_daemon(struct ctdb_context *ctdb, bool do_fork, bool use_syslog)
689 {
690         int res, ret = -1;
691         struct fd_event *fde;
692         const char *domain_socket_name;
693         struct signal_event *se;
694
695         /* get rid of any old sockets */
696         unlink(ctdb->daemon.name);
697
698         /* create a unix domain stream socket to listen to */
699         res = ux_socket_bind(ctdb);
700         if (res!=0) {
701                 DEBUG(DEBUG_ALERT,(__location__ " Failed to open CTDB unix domain socket\n"));
702                 exit(10);
703         }
704
705         if (do_fork && fork()) {
706                 return 0;
707         }
708
709         tdb_reopen_all(False);
710
711         if (do_fork) {
712                 setsid();
713                 close(0);
714                 if (open("/dev/null", O_RDONLY) != 0) {
715                         DEBUG(DEBUG_ALERT,(__location__ " Failed to setup stdin on /dev/null\n"));
716                         exit(11);
717                 }
718         }
719         block_signal(SIGPIPE);
720
721         ctdbd_pid = getpid();
722
723
724         DEBUG(DEBUG_ERR, ("Starting CTDBD as pid : %u\n", ctdbd_pid));
725
726         if (ctdb->do_setsched) {
727                 /* try to set us up as realtime */
728                 ctdb_set_scheduler(ctdb);
729         }
730
731         /* ensure the socket is deleted on exit of the daemon */
732         domain_socket_name = talloc_strdup(talloc_autofree_context(), ctdb->daemon.name);
733         if (domain_socket_name == NULL) {
734                 DEBUG(DEBUG_ALERT,(__location__ " talloc_strdup failed.\n"));
735                 exit(12);
736         }
737
738         ctdb->ev = event_context_init(NULL);
739
740         ctdb_set_child_logging(ctdb);
741
742         /* force initial recovery for election */
743         ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
744
745         if (strcmp(ctdb->transport, "tcp") == 0) {
746                 int ctdb_tcp_init(struct ctdb_context *);
747                 ret = ctdb_tcp_init(ctdb);
748         }
749 #ifdef USE_INFINIBAND
750         if (strcmp(ctdb->transport, "ib") == 0) {
751                 int ctdb_ibw_init(struct ctdb_context *);
752                 ret = ctdb_ibw_init(ctdb);
753         }
754 #endif
755         if (ret != 0) {
756                 DEBUG(DEBUG_ERR,("Failed to initialise transport '%s'\n", ctdb->transport));
757                 return -1;
758         }
759
760         if (ctdb->methods == NULL) {
761                 DEBUG(DEBUG_ALERT,(__location__ " Can not initialize transport. ctdb->methods is NULL\n"));
762                 ctdb_fatal(ctdb, "transport is unavailable. can not initialize.");
763         }
764
765         /* initialise the transport  */
766         if (ctdb->methods->initialise(ctdb) != 0) {
767                 ctdb_fatal(ctdb, "transport failed to initialise");
768         }
769
770         /* attach to any existing persistent databases */
771         if (ctdb_attach_persistent(ctdb) != 0) {
772                 ctdb_fatal(ctdb, "Failed to attach to persistent databases\n");         
773         }
774
775         /* start frozen, then let the first election sort things out */
776         if (ctdb_blocking_freeze(ctdb)) {
777                 ctdb_fatal(ctdb, "Failed to get initial freeze\n");
778         }
779
780         /* now start accepting clients, only can do this once frozen */
781         fde = event_add_fd(ctdb->ev, ctdb, ctdb->daemon.sd, 
782                            EVENT_FD_READ|EVENT_FD_AUTOCLOSE, 
783                            ctdb_accept_client, ctdb);
784
785         /* tell all other nodes we've just started up */
786         ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL,
787                                  0, CTDB_CONTROL_STARTUP, 0,
788                                  CTDB_CTRL_FLAG_NOREPLY,
789                                  tdb_null, NULL, NULL);
790
791         /* release any IPs we hold from previous runs of the daemon */
792         ctdb_release_all_ips(ctdb);
793
794         /* start the transport going */
795         ctdb_start_transport(ctdb);
796
797         /* set up a handler to pick up sigchld */
798         se = event_add_signal(ctdb->ev, ctdb,
799                                      SIGCHLD, 0,
800                                      sig_child_handler,
801                                      ctdb);
802         if (se == NULL) {
803                 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD\n"));
804                 exit(1);
805         }
806
807         if (use_syslog) {
808                 if (start_syslog_daemon(ctdb)) {
809                         DEBUG(DEBUG_CRIT, ("Failed to start syslog daemon\n"));
810                         exit(10);
811                 }
812         }
813
814           
815         /* go into a wait loop to allow other nodes to complete */
816         event_loop_wait(ctdb->ev);
817
818         DEBUG(DEBUG_CRIT,("event_loop_wait() returned. this should not happen\n"));
819         exit(1);
820 }
821
822 /*
823   allocate a packet for use in daemon<->daemon communication
824  */
825 struct ctdb_req_header *_ctdb_transport_allocate(struct ctdb_context *ctdb,
826                                                  TALLOC_CTX *mem_ctx, 
827                                                  enum ctdb_operation operation, 
828                                                  size_t length, size_t slength,
829                                                  const char *type)
830 {
831         int size;
832         struct ctdb_req_header *hdr;
833
834         length = MAX(length, slength);
835         size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
836
837         if (ctdb->methods == NULL) {
838                 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate transport packet for operation %u of length %u. Transport is DOWN.\n",
839                          operation, (unsigned)length));
840                 return NULL;
841         }
842
843         hdr = (struct ctdb_req_header *)ctdb->methods->allocate_pkt(mem_ctx, size);
844         if (hdr == NULL) {
845                 DEBUG(DEBUG_ERR,("Unable to allocate transport packet for operation %u of length %u\n",
846                          operation, (unsigned)length));
847                 return NULL;
848         }
849         talloc_set_name_const(hdr, type);
850         memset(hdr, 0, slength);
851         hdr->length       = length;
852         hdr->operation    = operation;
853         hdr->ctdb_magic   = CTDB_MAGIC;
854         hdr->ctdb_version = CTDB_VERSION;
855         hdr->generation   = ctdb->vnn_map->generation;
856         hdr->srcnode      = ctdb->pnn;
857
858         return hdr;     
859 }
860
861 struct daemon_control_state {
862         struct daemon_control_state *next, *prev;
863         struct ctdb_client *client;
864         struct ctdb_req_control *c;
865         uint32_t reqid;
866         struct ctdb_node *node;
867 };
868
869 /*
870   callback when a control reply comes in
871  */
872 static void daemon_control_callback(struct ctdb_context *ctdb,
873                                     int32_t status, TDB_DATA data, 
874                                     const char *errormsg,
875                                     void *private_data)
876 {
877         struct daemon_control_state *state = talloc_get_type(private_data, 
878                                                              struct daemon_control_state);
879         struct ctdb_client *client = state->client;
880         struct ctdb_reply_control *r;
881         size_t len;
882
883         /* construct a message to send to the client containing the data */
884         len = offsetof(struct ctdb_reply_control, data) + data.dsize;
885         if (errormsg) {
886                 len += strlen(errormsg);
887         }
888         r = ctdbd_allocate_pkt(ctdb, state, CTDB_REPLY_CONTROL, len, 
889                                struct ctdb_reply_control);
890         CTDB_NO_MEMORY_VOID(ctdb, r);
891
892         r->hdr.reqid     = state->reqid;
893         r->status        = status;
894         r->datalen       = data.dsize;
895         r->errorlen = 0;
896         memcpy(&r->data[0], data.dptr, data.dsize);
897         if (errormsg) {
898                 r->errorlen = strlen(errormsg);
899                 memcpy(&r->data[r->datalen], errormsg, r->errorlen);
900         }
901
902         daemon_queue_send(client, &r->hdr);
903
904         talloc_free(state);
905 }
906
907 /*
908   fail all pending controls to a disconnected node
909  */
910 void ctdb_daemon_cancel_controls(struct ctdb_context *ctdb, struct ctdb_node *node)
911 {
912         struct daemon_control_state *state;
913         while ((state = node->pending_controls)) {
914                 DLIST_REMOVE(node->pending_controls, state);
915                 daemon_control_callback(ctdb, (uint32_t)-1, tdb_null, 
916                                         "node is disconnected", state);
917         }
918 }
919
920 /*
921   destroy a daemon_control_state
922  */
923 static int daemon_control_destructor(struct daemon_control_state *state)
924 {
925         if (state->node) {
926                 DLIST_REMOVE(state->node->pending_controls, state);
927         }
928         return 0;
929 }
930
931 /*
932   this is called when the ctdb daemon received a ctdb request control
933   from a local client over the unix domain socket
934  */
935 static void daemon_request_control_from_client(struct ctdb_client *client, 
936                                                struct ctdb_req_control *c)
937 {
938         TDB_DATA data;
939         int res;
940         struct daemon_control_state *state;
941         TALLOC_CTX *tmp_ctx = talloc_new(client);
942
943         if (c->hdr.destnode == CTDB_CURRENT_NODE) {
944                 c->hdr.destnode = client->ctdb->pnn;
945         }
946
947         state = talloc(client, struct daemon_control_state);
948         CTDB_NO_MEMORY_VOID(client->ctdb, state);
949
950         state->client = client;
951         state->c = talloc_steal(state, c);
952         state->reqid = c->hdr.reqid;
953         if (ctdb_validate_pnn(client->ctdb, c->hdr.destnode)) {
954                 state->node = client->ctdb->nodes[c->hdr.destnode];
955                 DLIST_ADD(state->node->pending_controls, state);
956         } else {
957                 state->node = NULL;
958         }
959
960         talloc_set_destructor(state, daemon_control_destructor);
961
962         if (c->flags & CTDB_CTRL_FLAG_NOREPLY) {
963                 talloc_steal(tmp_ctx, state);
964         }
965         
966         data.dptr = &c->data[0];
967         data.dsize = c->datalen;
968         res = ctdb_daemon_send_control(client->ctdb, c->hdr.destnode,
969                                        c->srvid, c->opcode, client->client_id,
970                                        c->flags,
971                                        data, daemon_control_callback,
972                                        state);
973         if (res != 0) {
974                 DEBUG(DEBUG_ERR,(__location__ " Failed to send control to remote node %u\n",
975                          c->hdr.destnode));
976         }
977
978         talloc_free(tmp_ctx);
979 }
980
981 /*
982   register a call function
983 */
984 int ctdb_daemon_set_call(struct ctdb_context *ctdb, uint32_t db_id,
985                          ctdb_fn_t fn, int id)
986 {
987         struct ctdb_registered_call *call;
988         struct ctdb_db_context *ctdb_db;
989
990         ctdb_db = find_ctdb_db(ctdb, db_id);
991         if (ctdb_db == NULL) {
992                 return -1;
993         }
994
995         call = talloc(ctdb_db, struct ctdb_registered_call);
996         call->fn = fn;
997         call->id = id;
998
999         DLIST_ADD(ctdb_db->calls, call);        
1000         return 0;
1001 }
1002
1003
1004
1005 /*
1006   this local messaging handler is ugly, but is needed to prevent
1007   recursion in ctdb_send_message() when the destination node is the
1008   same as the source node
1009  */
1010 struct ctdb_local_message {
1011         struct ctdb_context *ctdb;
1012         uint64_t srvid;
1013         TDB_DATA data;
1014 };
1015
1016 static void ctdb_local_message_trigger(struct event_context *ev, struct timed_event *te, 
1017                                        struct timeval t, void *private_data)
1018 {
1019         struct ctdb_local_message *m = talloc_get_type(private_data, 
1020                                                        struct ctdb_local_message);
1021         int res;
1022
1023         res = ctdb_dispatch_message(m->ctdb, m->srvid, m->data);
1024         if (res != 0) {
1025                 DEBUG(DEBUG_ERR, (__location__ " Failed to dispatch message for srvid=%llu\n", 
1026                           (unsigned long long)m->srvid));
1027         }
1028         talloc_free(m);
1029 }
1030
1031 static int ctdb_local_message(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data)
1032 {
1033         struct ctdb_local_message *m;
1034         m = talloc(ctdb, struct ctdb_local_message);
1035         CTDB_NO_MEMORY(ctdb, m);
1036
1037         m->ctdb = ctdb;
1038         m->srvid = srvid;
1039         m->data  = data;
1040         m->data.dptr = talloc_memdup(m, m->data.dptr, m->data.dsize);
1041         if (m->data.dptr == NULL) {
1042                 talloc_free(m);
1043                 return -1;
1044         }
1045
1046         /* this needs to be done as an event to prevent recursion */
1047         event_add_timed(ctdb->ev, m, timeval_zero(), ctdb_local_message_trigger, m);
1048         return 0;
1049 }
1050
1051 /*
1052   send a ctdb message
1053 */
1054 int ctdb_daemon_send_message(struct ctdb_context *ctdb, uint32_t pnn,
1055                              uint64_t srvid, TDB_DATA data)
1056 {
1057         struct ctdb_req_message *r;
1058         int len;
1059
1060         if (ctdb->methods == NULL) {
1061                 DEBUG(DEBUG_ERR,(__location__ " Failed to send message. Transport is DOWN\n"));
1062                 return -1;
1063         }
1064
1065         /* see if this is a message to ourselves */
1066         if (pnn == ctdb->pnn) {
1067                 return ctdb_local_message(ctdb, srvid, data);
1068         }
1069
1070         len = offsetof(struct ctdb_req_message, data) + data.dsize;
1071         r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_MESSAGE, len,
1072                                     struct ctdb_req_message);
1073         CTDB_NO_MEMORY(ctdb, r);
1074
1075         r->hdr.destnode  = pnn;
1076         r->srvid         = srvid;
1077         r->datalen       = data.dsize;
1078         memcpy(&r->data[0], data.dptr, data.dsize);
1079
1080         ctdb_queue_packet(ctdb, &r->hdr);
1081
1082         talloc_free(r);
1083         return 0;
1084 }
1085
1086
1087
1088 struct ctdb_client_notify_list {
1089         struct ctdb_client_notify_list *next, *prev;
1090         struct ctdb_context *ctdb;
1091         uint64_t srvid;
1092         TDB_DATA data;
1093 };
1094
1095
1096 static int ctdb_client_notify_destructor(struct ctdb_client_notify_list *nl)
1097 {
1098         int ret;
1099
1100         DEBUG(DEBUG_ERR,("Sending client notify message for srvid:%llu\n", (unsigned long long)nl->srvid));
1101
1102         ret = ctdb_daemon_send_message(nl->ctdb, CTDB_BROADCAST_CONNECTED, (unsigned long long)nl->srvid, nl->data);
1103         if (ret != 0) {
1104                 DEBUG(DEBUG_ERR,("Failed to send client notify message\n"));
1105         }
1106
1107         return 0;
1108 }
1109
1110 int32_t ctdb_control_register_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1111 {
1112         struct ctdb_client_notify_register *notify = (struct ctdb_client_notify_register *)indata.dptr;
1113         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client); 
1114         struct ctdb_client_notify_list *nl;
1115
1116         DEBUG(DEBUG_ERR,("Register srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
1117
1118         if (indata.dsize < offsetof(struct ctdb_client_notify_register, notify_data)) {
1119                 DEBUG(DEBUG_ERR,(__location__ " Too little data in control : %d\n", (int)indata.dsize));
1120                 return -1;
1121         }
1122
1123         if (indata.dsize != (notify->len + offsetof(struct ctdb_client_notify_register, notify_data))) {
1124                 DEBUG(DEBUG_ERR,(__location__ " Wrong amount of data in control. Got %d, expected %d\n", (int)indata.dsize, (int)(notify->len + offsetof(struct ctdb_client_notify_register, notify_data))));
1125                 return -1;
1126         }
1127
1128
1129         if (client == NULL) {
1130                 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1131                 return -1;
1132         }
1133
1134         for(nl=client->notify; nl; nl=nl->next) {
1135                 if (nl->srvid == notify->srvid) {
1136                         break;
1137                 }
1138         }
1139         if (nl != NULL) {
1140                 DEBUG(DEBUG_ERR,(__location__ " Notification for srvid:%llu already exists for this client\n", (unsigned long long)notify->srvid));
1141                 return -1;
1142         }
1143
1144         nl = talloc(client, struct ctdb_client_notify_list);
1145         CTDB_NO_MEMORY(ctdb, nl);
1146         nl->ctdb       = ctdb;
1147         nl->srvid      = notify->srvid;
1148         nl->data.dsize = notify->len;
1149         nl->data.dptr  = talloc_size(nl, nl->data.dsize);
1150         CTDB_NO_MEMORY(ctdb, nl->data.dptr);
1151         memcpy(nl->data.dptr, notify->notify_data, nl->data.dsize);
1152         
1153         DLIST_ADD(client->notify, nl);
1154         talloc_set_destructor(nl, ctdb_client_notify_destructor);
1155
1156         return 0;
1157 }
1158
1159 int32_t ctdb_control_deregister_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1160 {
1161         struct ctdb_client_notify_deregister *notify = (struct ctdb_client_notify_deregister *)indata.dptr;
1162         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client); 
1163         struct ctdb_client_notify_list *nl;
1164
1165         DEBUG(DEBUG_ERR,("Deregister srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
1166
1167         if (client == NULL) {
1168                 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1169                 return -1;
1170         }
1171
1172         for(nl=client->notify; nl; nl=nl->next) {
1173                 if (nl->srvid == notify->srvid) {
1174                         break;
1175                 }
1176         }
1177         if (nl == NULL) {
1178                 DEBUG(DEBUG_ERR,(__location__ " No notification for srvid:%llu found for this client\n", (unsigned long long)notify->srvid));
1179                 return -1;
1180         }
1181
1182         DLIST_REMOVE(client->notify, nl);
1183         talloc_set_destructor(nl, NULL);
1184         talloc_free(nl);
1185
1186         return 0;
1187 }
1188
1189 struct ctdb_client *ctdb_find_client_by_pid(struct ctdb_context *ctdb, pid_t pid)
1190 {
1191         struct ctdb_client_pid_list *client_pid;
1192
1193         for (client_pid = ctdb->client_pids; client_pid; client_pid=client_pid->next) {
1194                 if (client_pid->pid == pid) {
1195                         return client_pid->client;
1196                 }
1197         }
1198         return NULL;
1199 }
1200
1201
1202 /* This control is used by samba when probing if a process (of a samba daemon)
1203    exists on the node.
1204    Samba does this when it needs/wants to check if a subrecord in one of the
1205    databases is still valied, or if it is stale and can be removed.
1206    If the node is in unhealthy or stopped state we just kill of the samba
1207    process holding htis sub-record and return to the calling samba that
1208    the process does not exist.
1209    This allows us to forcefully recall subrecords registered by samba processes
1210    on banned and stopped nodes.
1211 */
1212 int32_t ctdb_control_process_exists(struct ctdb_context *ctdb, pid_t pid)
1213 {
1214         struct ctdb_client *client;
1215
1216         if (ctdb->nodes[ctdb->pnn]->flags & (NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
1217                 client = ctdb_find_client_by_pid(ctdb, pid);
1218                 if (client != NULL) {
1219                         DEBUG(DEBUG_NOTICE,(__location__ " Killing client with pid:%d on banned/stopped node\n", (int)pid));
1220                         talloc_free(client);
1221                 }
1222                 return -1;
1223         }
1224
1225         return kill(pid, 0);
1226 }