ReadOnly: Add handlign of readonly requests readwrite requests, delegations and revok...
[ctdb.git] / server / ctdb_daemon.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2006
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "db_wrap.h"
22 #include "lib/tdb/include/tdb.h"
23 #include "lib/tevent/tevent.h"
24 #include "lib/util/dlinklist.h"
25 #include "system/network.h"
26 #include "system/filesys.h"
27 #include "system/wait.h"
28 #include "../include/ctdb_client.h"
29 #include "../include/ctdb_private.h"
30 #include <sys/socket.h>
31
32 struct ctdb_client_pid_list {
33         struct ctdb_client_pid_list *next, *prev;
34         struct ctdb_context *ctdb;
35         pid_t pid;
36         struct ctdb_client *client;
37 };
38
39 static void daemon_incoming_packet(void *, struct ctdb_req_header *);
40
41 static void print_exit_message(void)
42 {
43         DEBUG(DEBUG_NOTICE,("CTDB daemon shutting down\n"));
44 }
45
46
47
48 static void ctdb_time_tick(struct event_context *ev, struct timed_event *te, 
49                                   struct timeval t, void *private_data)
50 {
51         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
52
53         if (getpid() != ctdbd_pid) {
54                 return;
55         }
56
57         event_add_timed(ctdb->ev, ctdb, 
58                         timeval_current_ofs(1, 0), 
59                         ctdb_time_tick, ctdb);
60 }
61
62 /* Used to trigger a dummy event once per second, to make
63  * detection of hangs more reliable.
64  */
65 static void ctdb_start_time_tickd(struct ctdb_context *ctdb)
66 {
67         event_add_timed(ctdb->ev, ctdb, 
68                         timeval_current_ofs(1, 0), 
69                         ctdb_time_tick, ctdb);
70 }
71
72
73 /* called when the "startup" event script has finished */
74 static void ctdb_start_transport(struct ctdb_context *ctdb)
75 {
76         if (ctdb->methods == NULL) {
77                 DEBUG(DEBUG_ALERT,(__location__ " startup event finished but transport is DOWN.\n"));
78                 ctdb_fatal(ctdb, "transport is not initialized but startup completed");
79         }
80
81         /* start the transport running */
82         if (ctdb->methods->start(ctdb) != 0) {
83                 DEBUG(DEBUG_ALERT,("transport failed to start!\n"));
84                 ctdb_fatal(ctdb, "transport failed to start");
85         }
86
87         /* start the recovery daemon process */
88         if (ctdb_start_recoverd(ctdb) != 0) {
89                 DEBUG(DEBUG_ALERT,("Failed to start recovery daemon\n"));
90                 exit(11);
91         }
92
93         /* Make sure we log something when the daemon terminates */
94         atexit(print_exit_message);
95
96         /* start monitoring for connected/disconnected nodes */
97         ctdb_start_keepalive(ctdb);
98
99         /* start monitoring for node health */
100         ctdb_start_monitoring(ctdb);
101
102         /* start periodic update of tcp tickle lists */
103         ctdb_start_tcp_tickle_update(ctdb);
104
105         /* start listening for recovery daemon pings */
106         ctdb_control_recd_ping(ctdb);
107
108         /* start listening to timer ticks */
109         ctdb_start_time_tickd(ctdb);
110 }
111
112 static void block_signal(int signum)
113 {
114         struct sigaction act;
115
116         memset(&act, 0, sizeof(act));
117
118         act.sa_handler = SIG_IGN;
119         sigemptyset(&act.sa_mask);
120         sigaddset(&act.sa_mask, signum);
121         sigaction(signum, &act, NULL);
122 }
123
124
125 /*
126   send a packet to a client
127  */
128 static int daemon_queue_send(struct ctdb_client *client, struct ctdb_req_header *hdr)
129 {
130         CTDB_INCREMENT_STAT(client->ctdb, client_packets_sent);
131         if (hdr->operation == CTDB_REQ_MESSAGE) {
132                 if (ctdb_queue_length(client->queue) > client->ctdb->tunable.max_queue_depth_drop_msg) {
133                         DEBUG(DEBUG_ERR,("CTDB_REQ_MESSAGE queue full - killing client connection.\n"));
134                         talloc_free(client);
135                         return -1;
136                 }
137         }
138         return ctdb_queue_send(client->queue, (uint8_t *)hdr, hdr->length);
139 }
140
141 /*
142   message handler for when we are in daemon mode. This redirects the message
143   to the right client
144  */
145 static void daemon_message_handler(struct ctdb_context *ctdb, uint64_t srvid, 
146                                     TDB_DATA data, void *private_data)
147 {
148         struct ctdb_client *client = talloc_get_type(private_data, struct ctdb_client);
149         struct ctdb_req_message *r;
150         int len;
151
152         /* construct a message to send to the client containing the data */
153         len = offsetof(struct ctdb_req_message, data) + data.dsize;
154         r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE, 
155                                len, struct ctdb_req_message);
156         CTDB_NO_MEMORY_VOID(ctdb, r);
157
158         talloc_set_name_const(r, "req_message packet");
159
160         r->srvid         = srvid;
161         r->datalen       = data.dsize;
162         memcpy(&r->data[0], data.dptr, data.dsize);
163
164         daemon_queue_send(client, &r->hdr);
165
166         talloc_free(r);
167 }
168
169 /*
170   this is called when the ctdb daemon received a ctdb request to 
171   set the srvid from the client
172  */
173 int daemon_register_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
174 {
175         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
176         int res;
177         if (client == NULL) {
178                 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_register_message_handler\n"));
179                 return -1;
180         }
181         res = ctdb_register_message_handler(ctdb, client, srvid, daemon_message_handler, client);
182         if (res != 0) {
183                 DEBUG(DEBUG_ERR,(__location__ " Failed to register handler %llu in daemon\n", 
184                          (unsigned long long)srvid));
185         } else {
186                 DEBUG(DEBUG_INFO,(__location__ " Registered message handler for srvid=%llu\n", 
187                          (unsigned long long)srvid));
188         }
189
190         return res;
191 }
192
193 /*
194   this is called when the ctdb daemon received a ctdb request to 
195   remove a srvid from the client
196  */
197 int daemon_deregister_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
198 {
199         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
200         if (client == NULL) {
201                 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_deregister_message_handler\n"));
202                 return -1;
203         }
204         return ctdb_deregister_message_handler(ctdb, srvid, client);
205 }
206
207
208 /*
209   destroy a ctdb_client
210 */
211 static int ctdb_client_destructor(struct ctdb_client *client)
212 {
213         struct ctdb_db_context *ctdb_db;
214
215         ctdb_takeover_client_destructor_hook(client);
216         ctdb_reqid_remove(client->ctdb, client->client_id);
217         CTDB_DECREMENT_STAT(client->ctdb, num_clients);
218
219         if (client->num_persistent_updates != 0) {
220                 DEBUG(DEBUG_ERR,(__location__ " Client disconnecting with %u persistent updates in flight. Starting recovery\n", client->num_persistent_updates));
221                 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
222         }
223         ctdb_db = find_ctdb_db(client->ctdb, client->db_id);
224         if (ctdb_db) {
225                 DEBUG(DEBUG_ERR, (__location__ " client exit while transaction "
226                                   "commit active. Forcing recovery.\n"));
227                 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
228
229                 /* legacy trans2 transaction state: */
230                 ctdb_db->transaction_active = false;
231
232                 /*
233                  * trans3 transaction state:
234                  *
235                  * The destructor sets the pointer to NULL.
236                  */
237                 talloc_free(ctdb_db->persistent_state);
238         }
239
240         return 0;
241 }
242
243
244 /*
245   this is called when the ctdb daemon received a ctdb request message
246   from a local client over the unix domain socket
247  */
248 static void daemon_request_message_from_client(struct ctdb_client *client, 
249                                                struct ctdb_req_message *c)
250 {
251         TDB_DATA data;
252         int res;
253
254         /* maybe the message is for another client on this node */
255         if (ctdb_get_pnn(client->ctdb)==c->hdr.destnode) {
256                 ctdb_request_message(client->ctdb, (struct ctdb_req_header *)c);
257                 return;
258         }
259
260         /* its for a remote node */
261         data.dptr = &c->data[0];
262         data.dsize = c->datalen;
263         res = ctdb_daemon_send_message(client->ctdb, c->hdr.destnode,
264                                        c->srvid, data);
265         if (res != 0) {
266                 DEBUG(DEBUG_ERR,(__location__ " Failed to send message to remote node %u\n",
267                          c->hdr.destnode));
268         }
269 }
270
271
272 struct daemon_call_state {
273         struct ctdb_client *client;
274         uint32_t reqid;
275         struct ctdb_call *call;
276         struct timeval start_time;
277 };
278
279 /* 
280    complete a call from a client 
281 */
282 static void daemon_call_from_client_callback(struct ctdb_call_state *state)
283 {
284         struct daemon_call_state *dstate = talloc_get_type(state->async.private_data, 
285                                                            struct daemon_call_state);
286         struct ctdb_reply_call *r;
287         int res;
288         uint32_t length;
289         struct ctdb_client *client = dstate->client;
290         struct ctdb_db_context *ctdb_db = state->ctdb_db;
291
292         talloc_steal(client, dstate);
293         talloc_steal(dstate, dstate->call);
294
295         res = ctdb_daemon_call_recv(state, dstate->call);
296         if (res != 0) {
297                 DEBUG(DEBUG_ERR, (__location__ " ctdbd_call_recv() returned error\n"));
298                 CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
299
300                 CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 1", call_latency, dstate->start_time);
301                 return;
302         }
303
304         length = offsetof(struct ctdb_reply_call, data) + dstate->call->reply_data.dsize;
305         r = ctdbd_allocate_pkt(client->ctdb, dstate, CTDB_REPLY_CALL, 
306                                length, struct ctdb_reply_call);
307         if (r == NULL) {
308                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate reply_call in ctdb daemon\n"));
309                 CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
310                 CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 2", call_latency, dstate->start_time);
311                 return;
312         }
313         r->hdr.reqid        = dstate->reqid;
314         r->datalen          = dstate->call->reply_data.dsize;
315         r->status           = dstate->call->status;
316         memcpy(&r->data[0], dstate->call->reply_data.dptr, r->datalen);
317
318         res = daemon_queue_send(client, &r->hdr);
319         if (res == -1) {
320                 /* client is dead - return immediately */
321                 return;
322         }
323         if (res != 0) {
324                 DEBUG(DEBUG_ERR, (__location__ " Failed to queue packet from daemon to client\n"));
325         }
326         CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 3", call_latency, dstate->start_time);
327         CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
328         talloc_free(dstate);
329 }
330
331 struct ctdb_daemon_packet_wrap {
332         struct ctdb_context *ctdb;
333         uint32_t client_id;
334 };
335
336 /*
337   a wrapper to catch disconnected clients
338  */
339 static void daemon_incoming_packet_wrap(void *p, struct ctdb_req_header *hdr)
340 {
341         struct ctdb_client *client;
342         struct ctdb_daemon_packet_wrap *w = talloc_get_type(p, 
343                                                             struct ctdb_daemon_packet_wrap);
344         if (w == NULL) {
345                 DEBUG(DEBUG_CRIT,(__location__ " Bad packet type '%s'\n", talloc_get_name(p)));
346                 return;
347         }
348
349         client = ctdb_reqid_find(w->ctdb, w->client_id, struct ctdb_client);
350         if (client == NULL) {
351                 DEBUG(DEBUG_ERR,(__location__ " Packet for disconnected client %u\n",
352                          w->client_id));
353                 talloc_free(w);
354                 return;
355         }
356         talloc_free(w);
357
358         /* process it */
359         daemon_incoming_packet(client, hdr);    
360 }
361
362
363 /*
364   this is called when the ctdb daemon received a ctdb request call
365   from a local client over the unix domain socket
366  */
367 static void daemon_request_call_from_client(struct ctdb_client *client, 
368                                             struct ctdb_req_call *c)
369 {
370         struct ctdb_call_state *state;
371         struct ctdb_db_context *ctdb_db;
372         struct daemon_call_state *dstate;
373         struct ctdb_call *call;
374         struct ctdb_ltdb_header header;
375         TDB_DATA key, data;
376         int ret;
377         struct ctdb_context *ctdb = client->ctdb;
378         struct ctdb_daemon_packet_wrap *w;
379
380         CTDB_INCREMENT_STAT(ctdb, total_calls);
381         CTDB_DECREMENT_STAT(ctdb, pending_calls);
382
383         ctdb_db = find_ctdb_db(client->ctdb, c->db_id);
384         if (!ctdb_db) {
385                 DEBUG(DEBUG_ERR, (__location__ " Unknown database in request. db_id==0x%08x",
386                           c->db_id));
387                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
388                 return;
389         }
390
391         if (ctdb_db->unhealthy_reason) {
392                 /*
393                  * this is just a warning, as the tdb should be empty anyway,
394                  * and only persistent databases can be unhealthy, which doesn't
395                  * use this code patch
396                  */
397                 DEBUG(DEBUG_WARNING,("warn: db(%s) unhealty in daemon_request_call_from_client(): %s\n",
398                                      ctdb_db->db_name, ctdb_db->unhealthy_reason));
399         }
400
401         key.dptr = c->data;
402         key.dsize = c->keylen;
403
404         w = talloc(ctdb, struct ctdb_daemon_packet_wrap);
405         CTDB_NO_MEMORY_VOID(ctdb, w);   
406
407         w->ctdb = ctdb;
408         w->client_id = client->client_id;
409
410         ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header, 
411                                            (struct ctdb_req_header *)c, &data,
412                                            daemon_incoming_packet_wrap, w, True);
413         if (ret == -2) {
414                 /* will retry later */
415                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
416                 return;
417         }
418
419         talloc_free(w);
420
421         if (ret != 0) {
422                 DEBUG(DEBUG_ERR,(__location__ " Unable to fetch record\n"));
423                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
424                 return;
425         }
426
427         /* Dont do READONLY if we dont have a tracking database */
428         if ((c->flags & CTDB_WANT_READONLY) && ctdb_db->rottdb == NULL) {
429                 c->flags &= ~CTDB_WANT_READONLY;
430         }
431
432         if (header.flags & CTDB_REC_RO_REVOKE_COMPLETE) {
433                 header.flags &= ~(CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY|CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_REVOKE_COMPLETE);
434                 if (ctdb_ltdb_store(ctdb_db, key, &header, data) != 0) {
435                         ctdb_fatal(ctdb, "Failed to write header with cleared REVOKE flag");
436                 }
437         }
438
439         /* if we are revoking, we must defer all other calls until the revoke
440          * had completed.
441          */
442         if (header.flags & CTDB_REC_RO_REVOKING_READONLY) {
443                 talloc_free(data.dptr);
444                 ret = ctdb_ltdb_unlock(ctdb_db, key);
445
446                 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, key, (struct ctdb_req_header *)c, daemon_incoming_packet, client) != 0) {
447                         ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
448                 }
449                 return;
450         }
451
452         if ((header.dmaster == ctdb->pnn)
453         && (!(c->flags & CTDB_WANT_READONLY))
454         && (header.flags & (CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY)) ) {
455                 header.flags   |= CTDB_REC_RO_REVOKING_READONLY;
456                 if (ctdb_ltdb_store(ctdb_db, key, &header, data) != 0) {
457                         ctdb_fatal(ctdb, "Failed to store record with HAVE_DELEGATIONS set");
458                 }
459                 ret = ctdb_ltdb_unlock(ctdb_db, key);
460
461                 if (ctdb_start_revoke_ro_record(ctdb, ctdb_db, key, &header, data) != 0) {
462                         ctdb_fatal(ctdb, "Failed to start record revoke");
463                 }
464                 talloc_free(data.dptr);
465
466                 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, key, (struct ctdb_req_header *)c, daemon_incoming_packet, client) != 0) {
467                         ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
468                 }
469
470                 return;
471         }               
472
473         dstate = talloc(client, struct daemon_call_state);
474         if (dstate == NULL) {
475                 ret = ctdb_ltdb_unlock(ctdb_db, key);
476                 if (ret != 0) {
477                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
478                 }
479
480                 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate dstate\n"));
481                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
482                 return;
483         }
484         dstate->start_time = timeval_current();
485         dstate->client = client;
486         dstate->reqid  = c->hdr.reqid;
487         talloc_steal(dstate, data.dptr);
488
489         call = dstate->call = talloc_zero(dstate, struct ctdb_call);
490         if (call == NULL) {
491                 ret = ctdb_ltdb_unlock(ctdb_db, key);
492                 if (ret != 0) {
493                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
494                 }
495
496                 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate call\n"));
497                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
498                 CTDB_UPDATE_LATENCY(ctdb, ctdb_db, "call_from_client 1", call_latency, dstate->start_time);
499                 return;
500         }
501
502         call->call_id = c->callid;
503         call->key = key;
504         call->call_data.dptr = c->data + c->keylen;
505         call->call_data.dsize = c->calldatalen;
506         call->flags = c->flags;
507
508         if (header.dmaster == ctdb->pnn) {
509                 state = ctdb_call_local_send(ctdb_db, call, &header, &data);
510         } else {
511                 state = ctdb_daemon_call_send_remote(ctdb_db, call, &header);
512         }
513
514         ret = ctdb_ltdb_unlock(ctdb_db, key);
515         if (ret != 0) {
516                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
517         }
518
519         if (state == NULL) {
520                 DEBUG(DEBUG_ERR,(__location__ " Unable to setup call send\n"));
521                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
522                 CTDB_UPDATE_LATENCY(ctdb, ctdb_db, "call_from_client 2", call_latency, dstate->start_time);
523                 return;
524         }
525         talloc_steal(state, dstate);
526         talloc_steal(client, state);
527
528         state->async.fn = daemon_call_from_client_callback;
529         state->async.private_data = dstate;
530 }
531
532
533 static void daemon_request_control_from_client(struct ctdb_client *client, 
534                                                struct ctdb_req_control *c);
535
536 /* data contains a packet from the client */
537 static void daemon_incoming_packet(void *p, struct ctdb_req_header *hdr)
538 {
539         struct ctdb_client *client = talloc_get_type(p, struct ctdb_client);
540         TALLOC_CTX *tmp_ctx;
541         struct ctdb_context *ctdb = client->ctdb;
542
543         /* place the packet as a child of a tmp_ctx. We then use
544            talloc_free() below to free it. If any of the calls want
545            to keep it, then they will steal it somewhere else, and the
546            talloc_free() will be a no-op */
547         tmp_ctx = talloc_new(client);
548         talloc_steal(tmp_ctx, hdr);
549
550         if (hdr->ctdb_magic != CTDB_MAGIC) {
551                 ctdb_set_error(client->ctdb, "Non CTDB packet rejected in daemon\n");
552                 goto done;
553         }
554
555         if (hdr->ctdb_version != CTDB_VERSION) {
556                 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
557                 goto done;
558         }
559
560         switch (hdr->operation) {
561         case CTDB_REQ_CALL:
562                 CTDB_INCREMENT_STAT(ctdb, client.req_call);
563                 daemon_request_call_from_client(client, (struct ctdb_req_call *)hdr);
564                 break;
565
566         case CTDB_REQ_MESSAGE:
567                 CTDB_INCREMENT_STAT(ctdb, client.req_message);
568                 daemon_request_message_from_client(client, (struct ctdb_req_message *)hdr);
569                 break;
570
571         case CTDB_REQ_CONTROL:
572                 CTDB_INCREMENT_STAT(ctdb, client.req_control);
573                 daemon_request_control_from_client(client, (struct ctdb_req_control *)hdr);
574                 break;
575
576         default:
577                 DEBUG(DEBUG_CRIT,(__location__ " daemon: unrecognized operation %u\n",
578                          hdr->operation));
579         }
580
581 done:
582         talloc_free(tmp_ctx);
583 }
584
585 /*
586   called when the daemon gets a incoming packet
587  */
588 static void ctdb_daemon_read_cb(uint8_t *data, size_t cnt, void *args)
589 {
590         struct ctdb_client *client = talloc_get_type(args, struct ctdb_client);
591         struct ctdb_req_header *hdr;
592
593         if (cnt == 0) {
594                 talloc_free(client);
595                 return;
596         }
597
598         CTDB_INCREMENT_STAT(client->ctdb, client_packets_recv);
599
600         if (cnt < sizeof(*hdr)) {
601                 ctdb_set_error(client->ctdb, "Bad packet length %u in daemon\n", 
602                                (unsigned)cnt);
603                 return;
604         }
605         hdr = (struct ctdb_req_header *)data;
606         if (cnt != hdr->length) {
607                 ctdb_set_error(client->ctdb, "Bad header length %u expected %u\n in daemon", 
608                                (unsigned)hdr->length, (unsigned)cnt);
609                 return;
610         }
611
612         if (hdr->ctdb_magic != CTDB_MAGIC) {
613                 ctdb_set_error(client->ctdb, "Non CTDB packet rejected\n");
614                 return;
615         }
616
617         if (hdr->ctdb_version != CTDB_VERSION) {
618                 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
619                 return;
620         }
621
622         DEBUG(DEBUG_DEBUG,(__location__ " client request %u of type %u length %u from "
623                  "node %u to %u\n", hdr->reqid, hdr->operation, hdr->length,
624                  hdr->srcnode, hdr->destnode));
625
626         /* it is the responsibility of the incoming packet function to free 'data' */
627         daemon_incoming_packet(client, hdr);
628 }
629
630
631 static int ctdb_clientpid_destructor(struct ctdb_client_pid_list *client_pid)
632 {
633         if (client_pid->ctdb->client_pids != NULL) {
634                 DLIST_REMOVE(client_pid->ctdb->client_pids, client_pid);
635         }
636
637         return 0;
638 }
639
640
641 static void ctdb_accept_client(struct event_context *ev, struct fd_event *fde, 
642                          uint16_t flags, void *private_data)
643 {
644         struct sockaddr_un addr;
645         socklen_t len;
646         int fd;
647         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
648         struct ctdb_client *client;
649         struct ctdb_client_pid_list *client_pid;
650 #ifdef _AIX
651         struct peercred_struct cr;
652         socklen_t crl = sizeof(struct peercred_struct);
653 #else
654         struct ucred cr;
655         socklen_t crl = sizeof(struct ucred);
656 #endif
657
658         memset(&addr, 0, sizeof(addr));
659         len = sizeof(addr);
660         fd = accept(ctdb->daemon.sd, (struct sockaddr *)&addr, &len);
661         if (fd == -1) {
662                 return;
663         }
664
665         set_nonblocking(fd);
666         set_close_on_exec(fd);
667
668         DEBUG(DEBUG_DEBUG,(__location__ " Created SOCKET FD:%d to connected child\n", fd));
669
670         client = talloc_zero(ctdb, struct ctdb_client);
671 #ifdef _AIX
672         if (getsockopt(fd, SOL_SOCKET, SO_PEERID, &cr, &crl) == 0) {
673 #else
674         if (getsockopt(fd, SOL_SOCKET, SO_PEERCRED, &cr, &crl) == 0) {
675 #endif
676                 DEBUG(DEBUG_INFO,("Connected client with pid:%u\n", (unsigned)cr.pid));
677         }
678
679         client->ctdb = ctdb;
680         client->fd = fd;
681         client->client_id = ctdb_reqid_new(ctdb, client);
682         client->pid = cr.pid;
683
684         client_pid = talloc(client, struct ctdb_client_pid_list);
685         if (client_pid == NULL) {
686                 DEBUG(DEBUG_ERR,("Failed to allocate client pid structure\n"));
687                 close(fd);
688                 talloc_free(client);
689                 return;
690         }               
691         client_pid->ctdb   = ctdb;
692         client_pid->pid    = cr.pid;
693         client_pid->client = client;
694
695         DLIST_ADD(ctdb->client_pids, client_pid);
696
697         client->queue = ctdb_queue_setup(ctdb, client, fd, CTDB_DS_ALIGNMENT, 
698                                          ctdb_daemon_read_cb, client,
699                                          "client-%u", client->pid);
700
701         talloc_set_destructor(client, ctdb_client_destructor);
702         talloc_set_destructor(client_pid, ctdb_clientpid_destructor);
703         CTDB_INCREMENT_STAT(ctdb, num_clients);
704 }
705
706
707
708 /*
709   create a unix domain socket and bind it
710   return a file descriptor open on the socket 
711 */
712 static int ux_socket_bind(struct ctdb_context *ctdb)
713 {
714         struct sockaddr_un addr;
715
716         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
717         if (ctdb->daemon.sd == -1) {
718                 return -1;
719         }
720
721         set_close_on_exec(ctdb->daemon.sd);
722         set_nonblocking(ctdb->daemon.sd);
723
724         memset(&addr, 0, sizeof(addr));
725         addr.sun_family = AF_UNIX;
726         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
727
728         if (bind(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
729                 DEBUG(DEBUG_CRIT,("Unable to bind on ctdb socket '%s'\n", ctdb->daemon.name));
730                 goto failed;
731         }       
732
733         if (chown(ctdb->daemon.name, geteuid(), getegid()) != 0 ||
734             chmod(ctdb->daemon.name, 0700) != 0) {
735                 DEBUG(DEBUG_CRIT,("Unable to secure ctdb socket '%s', ctdb->daemon.name\n", ctdb->daemon.name));
736                 goto failed;
737         } 
738
739
740         if (listen(ctdb->daemon.sd, 100) != 0) {
741                 DEBUG(DEBUG_CRIT,("Unable to listen on ctdb socket '%s'\n", ctdb->daemon.name));
742                 goto failed;
743         }
744
745         return 0;
746
747 failed:
748         close(ctdb->daemon.sd);
749         ctdb->daemon.sd = -1;
750         return -1;      
751 }
752
753 static void sig_child_handler(struct event_context *ev,
754         struct signal_event *se, int signum, int count,
755         void *dont_care, 
756         void *private_data)
757 {
758 //      struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
759         int status;
760         pid_t pid = -1;
761
762         while (pid != 0) {
763                 pid = waitpid(-1, &status, WNOHANG);
764                 if (pid == -1) {
765                         DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%d\n", errno));
766                         return;
767                 }
768                 if (pid > 0) {
769                         DEBUG(DEBUG_DEBUG, ("SIGCHLD from %d\n", (int)pid));
770                 }
771         }
772 }
773
774 static void ctdb_setup_event_callback(struct ctdb_context *ctdb, int status,
775                                       void *private_data)
776 {
777         if (status != 0) {
778                 ctdb_fatal(ctdb, "Failed to run setup event\n");
779                 return;
780         }
781         ctdb_run_notification_script(ctdb, "setup");
782
783         /* tell all other nodes we've just started up */
784         ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL,
785                                  0, CTDB_CONTROL_STARTUP, 0,
786                                  CTDB_CTRL_FLAG_NOREPLY,
787                                  tdb_null, NULL, NULL);
788 }
789
790 /*
791   start the protocol going as a daemon
792 */
793 int ctdb_start_daemon(struct ctdb_context *ctdb, bool do_fork, bool use_syslog, const char *public_address_list)
794 {
795         int res, ret = -1;
796         struct fd_event *fde;
797         const char *domain_socket_name;
798         struct signal_event *se;
799
800         /* get rid of any old sockets */
801         unlink(ctdb->daemon.name);
802
803         /* create a unix domain stream socket to listen to */
804         res = ux_socket_bind(ctdb);
805         if (res!=0) {
806                 DEBUG(DEBUG_ALERT,(__location__ " Failed to open CTDB unix domain socket\n"));
807                 exit(10);
808         }
809
810         if (do_fork && fork()) {
811                 return 0;
812         }
813
814         tdb_reopen_all(False);
815
816         if (do_fork) {
817                 setsid();
818                 close(0);
819                 if (open("/dev/null", O_RDONLY) != 0) {
820                         DEBUG(DEBUG_ALERT,(__location__ " Failed to setup stdin on /dev/null\n"));
821                         exit(11);
822                 }
823         }
824         block_signal(SIGPIPE);
825
826         ctdbd_pid = getpid();
827         ctdb->ctdbd_pid = ctdbd_pid;
828
829
830         DEBUG(DEBUG_ERR, ("Starting CTDBD as pid : %u\n", ctdbd_pid));
831
832         if (ctdb->do_setsched) {
833                 /* try to set us up as realtime */
834                 ctdb_set_scheduler(ctdb);
835         }
836
837         /* ensure the socket is deleted on exit of the daemon */
838         domain_socket_name = talloc_strdup(talloc_autofree_context(), ctdb->daemon.name);
839         if (domain_socket_name == NULL) {
840                 DEBUG(DEBUG_ALERT,(__location__ " talloc_strdup failed.\n"));
841                 exit(12);
842         }
843
844         ctdb->ev = event_context_init(NULL);
845         tevent_loop_allow_nesting(ctdb->ev);
846         ret = ctdb_init_tevent_logging(ctdb);
847         if (ret != 0) {
848                 DEBUG(DEBUG_ALERT,("Failed to initialize TEVENT logging\n"));
849                 exit(1);
850         }
851
852         ctdb_set_child_logging(ctdb);
853
854         /* initialize statistics collection */
855         ctdb_statistics_init(ctdb);
856
857         /* force initial recovery for election */
858         ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
859
860         if (strcmp(ctdb->transport, "tcp") == 0) {
861                 int ctdb_tcp_init(struct ctdb_context *);
862                 ret = ctdb_tcp_init(ctdb);
863         }
864 #ifdef USE_INFINIBAND
865         if (strcmp(ctdb->transport, "ib") == 0) {
866                 int ctdb_ibw_init(struct ctdb_context *);
867                 ret = ctdb_ibw_init(ctdb);
868         }
869 #endif
870         if (ret != 0) {
871                 DEBUG(DEBUG_ERR,("Failed to initialise transport '%s'\n", ctdb->transport));
872                 return -1;
873         }
874
875         if (ctdb->methods == NULL) {
876                 DEBUG(DEBUG_ALERT,(__location__ " Can not initialize transport. ctdb->methods is NULL\n"));
877                 ctdb_fatal(ctdb, "transport is unavailable. can not initialize.");
878         }
879
880         /* initialise the transport  */
881         if (ctdb->methods->initialise(ctdb) != 0) {
882                 ctdb_fatal(ctdb, "transport failed to initialise");
883         }
884         if (public_address_list) {
885                 ret = ctdb_set_public_addresses(ctdb, public_address_list);
886                 if (ret == -1) {
887                         DEBUG(DEBUG_ALERT,("Unable to setup public address list\n"));
888                         exit(1);
889                 }
890         }
891
892
893         /* attach to existing databases */
894         if (ctdb_attach_databases(ctdb) != 0) {
895                 ctdb_fatal(ctdb, "Failed to attach to databases\n");
896         }
897
898         ret = ctdb_event_script(ctdb, CTDB_EVENT_INIT);
899         if (ret != 0) {
900                 ctdb_fatal(ctdb, "Failed to run init event\n");
901         }
902         ctdb_run_notification_script(ctdb, "init");
903
904         /* start frozen, then let the first election sort things out */
905         if (ctdb_blocking_freeze(ctdb)) {
906                 ctdb_fatal(ctdb, "Failed to get initial freeze\n");
907         }
908
909         /* now start accepting clients, only can do this once frozen */
910         fde = event_add_fd(ctdb->ev, ctdb, ctdb->daemon.sd, 
911                            EVENT_FD_READ,
912                            ctdb_accept_client, ctdb);
913         tevent_fd_set_auto_close(fde);
914
915         /* release any IPs we hold from previous runs of the daemon */
916         if (ctdb->tunable.disable_ip_failover == 0) {
917                 ctdb_release_all_ips(ctdb);
918         }
919
920         /* start the transport going */
921         ctdb_start_transport(ctdb);
922
923         /* set up a handler to pick up sigchld */
924         se = event_add_signal(ctdb->ev, ctdb,
925                                      SIGCHLD, 0,
926                                      sig_child_handler,
927                                      ctdb);
928         if (se == NULL) {
929                 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD\n"));
930                 exit(1);
931         }
932
933         ret = ctdb_event_script_callback(ctdb,
934                                          ctdb,
935                                          ctdb_setup_event_callback,
936                                          ctdb,
937                                          false,
938                                          CTDB_EVENT_SETUP,
939                                          "");
940         if (ret != 0) {
941                 DEBUG(DEBUG_CRIT,("Failed to set up 'setup' event\n"));
942                 exit(1);
943         }
944
945         if (use_syslog) {
946                 if (start_syslog_daemon(ctdb)) {
947                         DEBUG(DEBUG_CRIT, ("Failed to start syslog daemon\n"));
948                         exit(10);
949                 }
950         }
951
952         ctdb_lockdown_memory(ctdb);
953           
954         /* go into a wait loop to allow other nodes to complete */
955         event_loop_wait(ctdb->ev);
956
957         DEBUG(DEBUG_CRIT,("event_loop_wait() returned. this should not happen\n"));
958         exit(1);
959 }
960
961 /*
962   allocate a packet for use in daemon<->daemon communication
963  */
964 struct ctdb_req_header *_ctdb_transport_allocate(struct ctdb_context *ctdb,
965                                                  TALLOC_CTX *mem_ctx, 
966                                                  enum ctdb_operation operation, 
967                                                  size_t length, size_t slength,
968                                                  const char *type)
969 {
970         int size;
971         struct ctdb_req_header *hdr;
972
973         length = MAX(length, slength);
974         size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
975
976         if (ctdb->methods == NULL) {
977                 DEBUG(DEBUG_INFO,(__location__ " Unable to allocate transport packet for operation %u of length %u. Transport is DOWN.\n",
978                          operation, (unsigned)length));
979                 return NULL;
980         }
981
982         hdr = (struct ctdb_req_header *)ctdb->methods->allocate_pkt(mem_ctx, size);
983         if (hdr == NULL) {
984                 DEBUG(DEBUG_ERR,("Unable to allocate transport packet for operation %u of length %u\n",
985                          operation, (unsigned)length));
986                 return NULL;
987         }
988         talloc_set_name_const(hdr, type);
989         memset(hdr, 0, slength);
990         hdr->length       = length;
991         hdr->operation    = operation;
992         hdr->ctdb_magic   = CTDB_MAGIC;
993         hdr->ctdb_version = CTDB_VERSION;
994         hdr->generation   = ctdb->vnn_map->generation;
995         hdr->srcnode      = ctdb->pnn;
996
997         return hdr;     
998 }
999
1000 struct daemon_control_state {
1001         struct daemon_control_state *next, *prev;
1002         struct ctdb_client *client;
1003         struct ctdb_req_control *c;
1004         uint32_t reqid;
1005         struct ctdb_node *node;
1006 };
1007
1008 /*
1009   callback when a control reply comes in
1010  */
1011 static void daemon_control_callback(struct ctdb_context *ctdb,
1012                                     int32_t status, TDB_DATA data, 
1013                                     const char *errormsg,
1014                                     void *private_data)
1015 {
1016         struct daemon_control_state *state = talloc_get_type(private_data, 
1017                                                              struct daemon_control_state);
1018         struct ctdb_client *client = state->client;
1019         struct ctdb_reply_control *r;
1020         size_t len;
1021         int ret;
1022
1023         /* construct a message to send to the client containing the data */
1024         len = offsetof(struct ctdb_reply_control, data) + data.dsize;
1025         if (errormsg) {
1026                 len += strlen(errormsg);
1027         }
1028         r = ctdbd_allocate_pkt(ctdb, state, CTDB_REPLY_CONTROL, len, 
1029                                struct ctdb_reply_control);
1030         CTDB_NO_MEMORY_VOID(ctdb, r);
1031
1032         r->hdr.reqid     = state->reqid;
1033         r->status        = status;
1034         r->datalen       = data.dsize;
1035         r->errorlen = 0;
1036         memcpy(&r->data[0], data.dptr, data.dsize);
1037         if (errormsg) {
1038                 r->errorlen = strlen(errormsg);
1039                 memcpy(&r->data[r->datalen], errormsg, r->errorlen);
1040         }
1041
1042         ret = daemon_queue_send(client, &r->hdr);
1043         if (ret != -1) {
1044                 talloc_free(state);
1045         }
1046 }
1047
1048 /*
1049   fail all pending controls to a disconnected node
1050  */
1051 void ctdb_daemon_cancel_controls(struct ctdb_context *ctdb, struct ctdb_node *node)
1052 {
1053         struct daemon_control_state *state;
1054         while ((state = node->pending_controls)) {
1055                 DLIST_REMOVE(node->pending_controls, state);
1056                 daemon_control_callback(ctdb, (uint32_t)-1, tdb_null, 
1057                                         "node is disconnected", state);
1058         }
1059 }
1060
1061 /*
1062   destroy a daemon_control_state
1063  */
1064 static int daemon_control_destructor(struct daemon_control_state *state)
1065 {
1066         if (state->node) {
1067                 DLIST_REMOVE(state->node->pending_controls, state);
1068         }
1069         return 0;
1070 }
1071
1072 /*
1073   this is called when the ctdb daemon received a ctdb request control
1074   from a local client over the unix domain socket
1075  */
1076 static void daemon_request_control_from_client(struct ctdb_client *client, 
1077                                                struct ctdb_req_control *c)
1078 {
1079         TDB_DATA data;
1080         int res;
1081         struct daemon_control_state *state;
1082         TALLOC_CTX *tmp_ctx = talloc_new(client);
1083
1084         if (c->hdr.destnode == CTDB_CURRENT_NODE) {
1085                 c->hdr.destnode = client->ctdb->pnn;
1086         }
1087
1088         state = talloc(client, struct daemon_control_state);
1089         CTDB_NO_MEMORY_VOID(client->ctdb, state);
1090
1091         state->client = client;
1092         state->c = talloc_steal(state, c);
1093         state->reqid = c->hdr.reqid;
1094         if (ctdb_validate_pnn(client->ctdb, c->hdr.destnode)) {
1095                 state->node = client->ctdb->nodes[c->hdr.destnode];
1096                 DLIST_ADD(state->node->pending_controls, state);
1097         } else {
1098                 state->node = NULL;
1099         }
1100
1101         talloc_set_destructor(state, daemon_control_destructor);
1102
1103         if (c->flags & CTDB_CTRL_FLAG_NOREPLY) {
1104                 talloc_steal(tmp_ctx, state);
1105         }
1106         
1107         data.dptr = &c->data[0];
1108         data.dsize = c->datalen;
1109         res = ctdb_daemon_send_control(client->ctdb, c->hdr.destnode,
1110                                        c->srvid, c->opcode, client->client_id,
1111                                        c->flags,
1112                                        data, daemon_control_callback,
1113                                        state);
1114         if (res != 0) {
1115                 DEBUG(DEBUG_ERR,(__location__ " Failed to send control to remote node %u\n",
1116                          c->hdr.destnode));
1117         }
1118
1119         talloc_free(tmp_ctx);
1120 }
1121
1122 /*
1123   register a call function
1124 */
1125 int ctdb_daemon_set_call(struct ctdb_context *ctdb, uint32_t db_id,
1126                          ctdb_fn_t fn, int id)
1127 {
1128         struct ctdb_registered_call *call;
1129         struct ctdb_db_context *ctdb_db;
1130
1131         ctdb_db = find_ctdb_db(ctdb, db_id);
1132         if (ctdb_db == NULL) {
1133                 return -1;
1134         }
1135
1136         call = talloc(ctdb_db, struct ctdb_registered_call);
1137         call->fn = fn;
1138         call->id = id;
1139
1140         DLIST_ADD(ctdb_db->calls, call);        
1141         return 0;
1142 }
1143
1144
1145
1146 /*
1147   this local messaging handler is ugly, but is needed to prevent
1148   recursion in ctdb_send_message() when the destination node is the
1149   same as the source node
1150  */
1151 struct ctdb_local_message {
1152         struct ctdb_context *ctdb;
1153         uint64_t srvid;
1154         TDB_DATA data;
1155 };
1156
1157 static void ctdb_local_message_trigger(struct event_context *ev, struct timed_event *te, 
1158                                        struct timeval t, void *private_data)
1159 {
1160         struct ctdb_local_message *m = talloc_get_type(private_data, 
1161                                                        struct ctdb_local_message);
1162         int res;
1163
1164         res = ctdb_dispatch_message(m->ctdb, m->srvid, m->data);
1165         if (res != 0) {
1166                 DEBUG(DEBUG_ERR, (__location__ " Failed to dispatch message for srvid=%llu\n", 
1167                           (unsigned long long)m->srvid));
1168         }
1169         talloc_free(m);
1170 }
1171
1172 static int ctdb_local_message(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data)
1173 {
1174         struct ctdb_local_message *m;
1175         m = talloc(ctdb, struct ctdb_local_message);
1176         CTDB_NO_MEMORY(ctdb, m);
1177
1178         m->ctdb = ctdb;
1179         m->srvid = srvid;
1180         m->data  = data;
1181         m->data.dptr = talloc_memdup(m, m->data.dptr, m->data.dsize);
1182         if (m->data.dptr == NULL) {
1183                 talloc_free(m);
1184                 return -1;
1185         }
1186
1187         /* this needs to be done as an event to prevent recursion */
1188         event_add_timed(ctdb->ev, m, timeval_zero(), ctdb_local_message_trigger, m);
1189         return 0;
1190 }
1191
1192 /*
1193   send a ctdb message
1194 */
1195 int ctdb_daemon_send_message(struct ctdb_context *ctdb, uint32_t pnn,
1196                              uint64_t srvid, TDB_DATA data)
1197 {
1198         struct ctdb_req_message *r;
1199         int len;
1200
1201         if (ctdb->methods == NULL) {
1202                 DEBUG(DEBUG_INFO,(__location__ " Failed to send message. Transport is DOWN\n"));
1203                 return -1;
1204         }
1205
1206         /* see if this is a message to ourselves */
1207         if (pnn == ctdb->pnn) {
1208                 return ctdb_local_message(ctdb, srvid, data);
1209         }
1210
1211         len = offsetof(struct ctdb_req_message, data) + data.dsize;
1212         r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_MESSAGE, len,
1213                                     struct ctdb_req_message);
1214         CTDB_NO_MEMORY(ctdb, r);
1215
1216         r->hdr.destnode  = pnn;
1217         r->srvid         = srvid;
1218         r->datalen       = data.dsize;
1219         memcpy(&r->data[0], data.dptr, data.dsize);
1220
1221         ctdb_queue_packet(ctdb, &r->hdr);
1222
1223         talloc_free(r);
1224         return 0;
1225 }
1226
1227
1228
1229 struct ctdb_client_notify_list {
1230         struct ctdb_client_notify_list *next, *prev;
1231         struct ctdb_context *ctdb;
1232         uint64_t srvid;
1233         TDB_DATA data;
1234 };
1235
1236
1237 static int ctdb_client_notify_destructor(struct ctdb_client_notify_list *nl)
1238 {
1239         int ret;
1240
1241         DEBUG(DEBUG_ERR,("Sending client notify message for srvid:%llu\n", (unsigned long long)nl->srvid));
1242
1243         ret = ctdb_daemon_send_message(nl->ctdb, CTDB_BROADCAST_CONNECTED, (unsigned long long)nl->srvid, nl->data);
1244         if (ret != 0) {
1245                 DEBUG(DEBUG_ERR,("Failed to send client notify message\n"));
1246         }
1247
1248         return 0;
1249 }
1250
1251 int32_t ctdb_control_register_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1252 {
1253         struct ctdb_client_notify_register *notify = (struct ctdb_client_notify_register *)indata.dptr;
1254         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client); 
1255         struct ctdb_client_notify_list *nl;
1256
1257         DEBUG(DEBUG_INFO,("Register srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
1258
1259         if (indata.dsize < offsetof(struct ctdb_client_notify_register, notify_data)) {
1260                 DEBUG(DEBUG_ERR,(__location__ " Too little data in control : %d\n", (int)indata.dsize));
1261                 return -1;
1262         }
1263
1264         if (indata.dsize != (notify->len + offsetof(struct ctdb_client_notify_register, notify_data))) {
1265                 DEBUG(DEBUG_ERR,(__location__ " Wrong amount of data in control. Got %d, expected %d\n", (int)indata.dsize, (int)(notify->len + offsetof(struct ctdb_client_notify_register, notify_data))));
1266                 return -1;
1267         }
1268
1269
1270         if (client == NULL) {
1271                 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1272                 return -1;
1273         }
1274
1275         for(nl=client->notify; nl; nl=nl->next) {
1276                 if (nl->srvid == notify->srvid) {
1277                         break;
1278                 }
1279         }
1280         if (nl != NULL) {
1281                 DEBUG(DEBUG_ERR,(__location__ " Notification for srvid:%llu already exists for this client\n", (unsigned long long)notify->srvid));
1282                 return -1;
1283         }
1284
1285         nl = talloc(client, struct ctdb_client_notify_list);
1286         CTDB_NO_MEMORY(ctdb, nl);
1287         nl->ctdb       = ctdb;
1288         nl->srvid      = notify->srvid;
1289         nl->data.dsize = notify->len;
1290         nl->data.dptr  = talloc_size(nl, nl->data.dsize);
1291         CTDB_NO_MEMORY(ctdb, nl->data.dptr);
1292         memcpy(nl->data.dptr, notify->notify_data, nl->data.dsize);
1293         
1294         DLIST_ADD(client->notify, nl);
1295         talloc_set_destructor(nl, ctdb_client_notify_destructor);
1296
1297         return 0;
1298 }
1299
1300 int32_t ctdb_control_deregister_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1301 {
1302         struct ctdb_client_notify_deregister *notify = (struct ctdb_client_notify_deregister *)indata.dptr;
1303         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client); 
1304         struct ctdb_client_notify_list *nl;
1305
1306         DEBUG(DEBUG_INFO,("Deregister srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
1307
1308         if (client == NULL) {
1309                 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1310                 return -1;
1311         }
1312
1313         for(nl=client->notify; nl; nl=nl->next) {
1314                 if (nl->srvid == notify->srvid) {
1315                         break;
1316                 }
1317         }
1318         if (nl == NULL) {
1319                 DEBUG(DEBUG_ERR,(__location__ " No notification for srvid:%llu found for this client\n", (unsigned long long)notify->srvid));
1320                 return -1;
1321         }
1322
1323         DLIST_REMOVE(client->notify, nl);
1324         talloc_set_destructor(nl, NULL);
1325         talloc_free(nl);
1326
1327         return 0;
1328 }
1329
1330 struct ctdb_client *ctdb_find_client_by_pid(struct ctdb_context *ctdb, pid_t pid)
1331 {
1332         struct ctdb_client_pid_list *client_pid;
1333
1334         for (client_pid = ctdb->client_pids; client_pid; client_pid=client_pid->next) {
1335                 if (client_pid->pid == pid) {
1336                         return client_pid->client;
1337                 }
1338         }
1339         return NULL;
1340 }
1341
1342
1343 /* This control is used by samba when probing if a process (of a samba daemon)
1344    exists on the node.
1345    Samba does this when it needs/wants to check if a subrecord in one of the
1346    databases is still valied, or if it is stale and can be removed.
1347    If the node is in unhealthy or stopped state we just kill of the samba
1348    process holding htis sub-record and return to the calling samba that
1349    the process does not exist.
1350    This allows us to forcefully recall subrecords registered by samba processes
1351    on banned and stopped nodes.
1352 */
1353 int32_t ctdb_control_process_exists(struct ctdb_context *ctdb, pid_t pid)
1354 {
1355         struct ctdb_client *client;
1356
1357         if (ctdb->nodes[ctdb->pnn]->flags & (NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
1358                 client = ctdb_find_client_by_pid(ctdb, pid);
1359                 if (client != NULL) {
1360                         DEBUG(DEBUG_NOTICE,(__location__ " Killing client with pid:%d on banned/stopped node\n", (int)pid));
1361                         talloc_free(client);
1362                 }
1363                 return -1;
1364         }
1365
1366         return kill(pid, 0);
1367 }