2 ctdb_call protocol code
4 Copyright (C) Andrew Tridgell 2006
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
20 see http://wiki.samba.org/index.php/Samba_%26_Clustering for
21 protocol design and packet details
25 #include "lib/util/dlinklist.h"
26 #include "system/network.h"
27 #include "system/filesys.h"
28 #include "../include/ctdb_private.h"
29 #include "../common/rb_tree.h"
31 struct ctdb_sticky_record {
32 struct ctdb_context *ctdb;
33 struct ctdb_db_context *ctdb_db;
38 find the ctdb_db from a db index
40 struct ctdb_db_context *find_ctdb_db(struct ctdb_context *ctdb, uint32_t id)
42 struct ctdb_db_context *ctdb_db;
44 for (ctdb_db=ctdb->db_list; ctdb_db; ctdb_db=ctdb_db->next) {
45 if (ctdb_db->db_id == id) {
53 a varient of input packet that can be used in lock requeue
55 static void ctdb_call_input_pkt(void *p, struct ctdb_req_header *hdr)
57 struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
58 ctdb_input_pkt(ctdb, hdr);
65 static void ctdb_send_error(struct ctdb_context *ctdb,
66 struct ctdb_req_header *hdr, uint32_t status,
67 const char *fmt, ...) PRINTF_ATTRIBUTE(4,5);
68 static void ctdb_send_error(struct ctdb_context *ctdb,
69 struct ctdb_req_header *hdr, uint32_t status,
73 struct ctdb_reply_error *r;
77 if (ctdb->methods == NULL) {
78 DEBUG(DEBUG_INFO,(__location__ " Failed to send error. Transport is DOWN\n"));
83 msg = talloc_vasprintf(ctdb, fmt, ap);
85 ctdb_fatal(ctdb, "Unable to allocate error in ctdb_send_error\n");
89 msglen = strlen(msg)+1;
90 len = offsetof(struct ctdb_reply_error, msg);
91 r = ctdb_transport_allocate(ctdb, msg, CTDB_REPLY_ERROR, len + msglen,
92 struct ctdb_reply_error);
93 CTDB_NO_MEMORY_FATAL(ctdb, r);
95 r->hdr.destnode = hdr->srcnode;
96 r->hdr.reqid = hdr->reqid;
99 memcpy(&r->msg[0], msg, msglen);
101 ctdb_queue_packet(ctdb, &r->hdr);
108 * send a redirect reply
110 * The logic behind this function is this:
112 * A client wants to grab a record and sends a CTDB_REQ_CALL packet
113 * to its local ctdb (ctdb_request_call). If the node is not itself
114 * the record's DMASTER, it first redirects the packet to the
115 * record's LMASTER. The LMASTER then redirects the call packet to
116 * the current DMASTER. Note that this works because of this: When
117 * a record is migrated off a node, then the new DMASTER is stored
118 * in the record's copy on the former DMASTER.
120 static void ctdb_call_send_redirect(struct ctdb_context *ctdb,
121 struct ctdb_db_context *ctdb_db,
123 struct ctdb_req_call *c,
124 struct ctdb_ltdb_header *header)
126 uint32_t lmaster = ctdb_lmaster(ctdb, &key);
128 c->hdr.destnode = lmaster;
129 if (ctdb->pnn == lmaster) {
130 c->hdr.destnode = header->dmaster;
134 if (c->hopcount%100 > 95) {
135 DEBUG(DEBUG_WARNING,("High hopcount %d dbid:%s "
136 "key:0x%08x reqid=%08x pnn:%d src:%d lmaster:%d "
137 "header->dmaster:%d dst:%d\n",
138 c->hopcount, ctdb_db->db_name, ctdb_hash(&key),
139 c->hdr.reqid, ctdb->pnn, c->hdr.srcnode, lmaster,
140 header->dmaster, c->hdr.destnode));
143 ctdb_queue_packet(ctdb, &c->hdr);
150 caller must have the chainlock before calling this routine. Caller must be
153 static void ctdb_send_dmaster_reply(struct ctdb_db_context *ctdb_db,
154 struct ctdb_ltdb_header *header,
155 TDB_DATA key, TDB_DATA data,
156 uint32_t new_dmaster,
159 struct ctdb_context *ctdb = ctdb_db->ctdb;
160 struct ctdb_reply_dmaster *r;
164 if (ctdb->pnn != ctdb_lmaster(ctdb, &key)) {
165 DEBUG(DEBUG_ALERT,(__location__ " Caller is not lmaster!\n"));
169 header->dmaster = new_dmaster;
170 ret = ctdb_ltdb_store(ctdb_db, key, header, data);
172 ctdb_fatal(ctdb, "ctdb_send_dmaster_reply unable to update dmaster");
176 if (ctdb->methods == NULL) {
177 ctdb_fatal(ctdb, "ctdb_send_dmaster_reply cant update dmaster since transport is down");
181 /* put the packet on a temporary context, allowing us to safely free
182 it below even if ctdb_reply_dmaster() has freed it already */
183 tmp_ctx = talloc_new(ctdb);
185 /* send the CTDB_REPLY_DMASTER */
186 len = offsetof(struct ctdb_reply_dmaster, data) + key.dsize + data.dsize + sizeof(uint32_t);
187 r = ctdb_transport_allocate(ctdb, tmp_ctx, CTDB_REPLY_DMASTER, len,
188 struct ctdb_reply_dmaster);
189 CTDB_NO_MEMORY_FATAL(ctdb, r);
191 r->hdr.destnode = new_dmaster;
192 r->hdr.reqid = reqid;
193 r->rsn = header->rsn;
194 r->keylen = key.dsize;
195 r->datalen = data.dsize;
196 r->db_id = ctdb_db->db_id;
197 memcpy(&r->data[0], key.dptr, key.dsize);
198 memcpy(&r->data[key.dsize], data.dptr, data.dsize);
199 memcpy(&r->data[key.dsize+data.dsize], &header->flags, sizeof(uint32_t));
201 ctdb_queue_packet(ctdb, &r->hdr);
203 talloc_free(tmp_ctx);
207 send a dmaster request (give another node the dmaster for a record)
209 This is always sent to the lmaster, which ensures that the lmaster
210 always knows who the dmaster is. The lmaster will then send a
211 CTDB_REPLY_DMASTER to the new dmaster
213 static void ctdb_call_send_dmaster(struct ctdb_db_context *ctdb_db,
214 struct ctdb_req_call *c,
215 struct ctdb_ltdb_header *header,
216 TDB_DATA *key, TDB_DATA *data)
218 struct ctdb_req_dmaster *r;
219 struct ctdb_context *ctdb = ctdb_db->ctdb;
221 uint32_t lmaster = ctdb_lmaster(ctdb, key);
223 if (ctdb->methods == NULL) {
224 ctdb_fatal(ctdb, "Failed ctdb_call_send_dmaster since transport is down");
228 if (data->dsize != 0) {
229 header->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
232 if (lmaster == ctdb->pnn) {
233 ctdb_send_dmaster_reply(ctdb_db, header, *key, *data,
234 c->hdr.srcnode, c->hdr.reqid);
238 len = offsetof(struct ctdb_req_dmaster, data) + key->dsize + data->dsize
240 r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_DMASTER, len,
241 struct ctdb_req_dmaster);
242 CTDB_NO_MEMORY_FATAL(ctdb, r);
243 r->hdr.destnode = lmaster;
244 r->hdr.reqid = c->hdr.reqid;
246 r->rsn = header->rsn;
247 r->dmaster = c->hdr.srcnode;
248 r->keylen = key->dsize;
249 r->datalen = data->dsize;
250 memcpy(&r->data[0], key->dptr, key->dsize);
251 memcpy(&r->data[key->dsize], data->dptr, data->dsize);
252 memcpy(&r->data[key->dsize + data->dsize], &header->flags, sizeof(uint32_t));
254 header->dmaster = c->hdr.srcnode;
255 if (ctdb_ltdb_store(ctdb_db, *key, header, *data) != 0) {
256 ctdb_fatal(ctdb, "Failed to store record in ctdb_call_send_dmaster");
259 ctdb_queue_packet(ctdb, &r->hdr);
264 static void ctdb_sticky_pindown_timeout(struct event_context *ev, struct timed_event *te,
265 struct timeval t, void *private_data)
267 struct ctdb_sticky_record *sr = talloc_get_type(private_data,
268 struct ctdb_sticky_record);
270 DEBUG(DEBUG_ERR,("Pindown timeout db:%s unstick record\n", sr->ctdb_db->db_name));
271 if (sr->pindown != NULL) {
272 talloc_free(sr->pindown);
278 ctdb_set_sticky_pindown(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key)
280 TALLOC_CTX *tmp_ctx = talloc_new(NULL);
282 struct ctdb_sticky_record *sr;
284 k = talloc_zero_size(tmp_ctx, ((key.dsize + 3) & 0xfffffffc) + 4);
286 DEBUG(DEBUG_ERR,("Failed to allocate key for sticky record\n"));
287 talloc_free(tmp_ctx);
291 k[0] = (key.dsize + 3) / 4 + 1;
292 memcpy(&k[1], key.dptr, key.dsize);
294 sr = trbt_lookuparray32(ctdb_db->sticky_records, k[0], &k[0]);
296 talloc_free(tmp_ctx);
300 talloc_free(tmp_ctx);
302 if (sr->pindown == NULL) {
303 DEBUG(DEBUG_ERR,("Pinning down record in %s for %d ms\n", ctdb_db->db_name, ctdb->tunable.sticky_pindown));
304 sr->pindown = talloc_new(sr);
305 if (sr->pindown == NULL) {
306 DEBUG(DEBUG_ERR,("Failed to allocate pindown context for sticky record\n"));
309 event_add_timed(ctdb->ev, sr->pindown, timeval_current_ofs(ctdb->tunable.sticky_pindown / 1000, (ctdb->tunable.sticky_pindown * 1000) % 1000000), ctdb_sticky_pindown_timeout, sr);
316 called when a CTDB_REPLY_DMASTER packet comes in, or when the lmaster
317 gets a CTDB_REQUEST_DMASTER for itself. We become the dmaster.
319 must be called with the chainlock held. This function releases the chainlock
321 static void ctdb_become_dmaster(struct ctdb_db_context *ctdb_db,
322 struct ctdb_req_header *hdr,
323 TDB_DATA key, TDB_DATA data,
324 uint64_t rsn, uint32_t record_flags)
326 struct ctdb_call_state *state;
327 struct ctdb_context *ctdb = ctdb_db->ctdb;
328 struct ctdb_ltdb_header header;
331 DEBUG(DEBUG_DEBUG,("pnn %u dmaster response %08x\n", ctdb->pnn, ctdb_hash(&key)));
335 header.dmaster = ctdb->pnn;
336 header.flags = record_flags;
338 state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_call_state);
341 if (state->call->flags & CTDB_CALL_FLAG_VACUUM_MIGRATION) {
343 * We temporarily add the VACUUM_MIGRATED flag to
344 * the record flags, so that ctdb_ltdb_store can
345 * decide whether the record should be stored or
348 header.flags |= CTDB_REC_FLAG_VACUUM_MIGRATED;
352 if (ctdb_ltdb_store(ctdb_db, key, &header, data) != 0) {
353 ctdb_fatal(ctdb, "ctdb_reply_dmaster store failed\n");
355 ret = ctdb_ltdb_unlock(ctdb_db, key);
357 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
362 /* we just became DMASTER and this database is "sticky",
363 see if the record is flagged as "hot" and set up a pin-down
364 context to stop migrations for a little while if so
366 if (ctdb_db->sticky) {
367 ctdb_set_sticky_pindown(ctdb, ctdb_db, key);
371 DEBUG(DEBUG_ERR,("pnn %u Invalid reqid %u in ctdb_become_dmaster from node %u\n",
372 ctdb->pnn, hdr->reqid, hdr->srcnode));
374 ret = ctdb_ltdb_unlock(ctdb_db, key);
376 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
381 if (key.dsize != state->call->key.dsize || memcmp(key.dptr, state->call->key.dptr, key.dsize)) {
382 DEBUG(DEBUG_ERR, ("Got bogus DMASTER packet reqid:%u from node %u. Key does not match key held in matching idr.\n", hdr->reqid, hdr->srcnode));
384 ret = ctdb_ltdb_unlock(ctdb_db, key);
386 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
391 if (hdr->reqid != state->reqid) {
392 /* we found a record but it was the wrong one */
393 DEBUG(DEBUG_ERR, ("Dropped orphan in ctdb_become_dmaster with reqid:%u\n from node %u", hdr->reqid, hdr->srcnode));
395 ret = ctdb_ltdb_unlock(ctdb_db, key);
397 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
402 ctdb_call_local(ctdb_db, state->call, &header, state, &data, true);
404 ret = ctdb_ltdb_unlock(ctdb_db, state->call->key);
406 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
409 state->state = CTDB_CALL_DONE;
410 if (state->async.fn) {
411 state->async.fn(state);
418 called when a CTDB_REQ_DMASTER packet comes in
420 this comes into the lmaster for a record when the current dmaster
421 wants to give up the dmaster role and give it to someone else
423 void ctdb_request_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
425 struct ctdb_req_dmaster *c = (struct ctdb_req_dmaster *)hdr;
426 TDB_DATA key, data, data2;
427 struct ctdb_ltdb_header header;
428 struct ctdb_db_context *ctdb_db;
429 uint32_t record_flags = 0;
434 key.dsize = c->keylen;
435 data.dptr = c->data + c->keylen;
436 data.dsize = c->datalen;
437 len = offsetof(struct ctdb_req_dmaster, data) + key.dsize + data.dsize
439 if (len <= c->hdr.length) {
440 memcpy(&record_flags, &c->data[c->keylen + c->datalen],
441 sizeof(record_flags));
444 ctdb_db = find_ctdb_db(ctdb, c->db_id);
446 ctdb_send_error(ctdb, hdr, -1,
447 "Unknown database in request. db_id==0x%08x",
452 /* fetch the current record */
453 ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header, hdr, &data2,
454 ctdb_call_input_pkt, ctdb, false);
456 ctdb_fatal(ctdb, "ctdb_req_dmaster failed to fetch record");
460 DEBUG(DEBUG_INFO,(__location__ " deferring ctdb_request_dmaster\n"));
464 if (ctdb_lmaster(ctdb, &key) != ctdb->pnn) {
465 DEBUG(DEBUG_ALERT,("pnn %u dmaster request to non-lmaster lmaster=%u gen=%u curgen=%u\n",
466 ctdb->pnn, ctdb_lmaster(ctdb, &key),
467 hdr->generation, ctdb->vnn_map->generation));
468 ctdb_fatal(ctdb, "ctdb_req_dmaster to non-lmaster");
471 DEBUG(DEBUG_DEBUG,("pnn %u dmaster request on %08x for %u from %u\n",
472 ctdb->pnn, ctdb_hash(&key), c->dmaster, c->hdr.srcnode));
474 /* its a protocol error if the sending node is not the current dmaster */
475 if (header.dmaster != hdr->srcnode) {
476 DEBUG(DEBUG_ALERT,("pnn %u dmaster request for new-dmaster %u from non-master %u real-dmaster=%u key %08x dbid 0x%08x gen=%u curgen=%u c->rsn=%llu header.rsn=%llu reqid=%u keyval=0x%08x\n",
477 ctdb->pnn, c->dmaster, hdr->srcnode, header.dmaster, ctdb_hash(&key),
478 ctdb_db->db_id, hdr->generation, ctdb->vnn_map->generation,
479 (unsigned long long)c->rsn, (unsigned long long)header.rsn, c->hdr.reqid,
480 (key.dsize >= 4)?(*(uint32_t *)key.dptr):0));
481 if (header.rsn != 0 || header.dmaster != ctdb->pnn) {
482 DEBUG(DEBUG_ERR,("ctdb_req_dmaster from non-master. Force a recovery.\n"));
484 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
485 ctdb_ltdb_unlock(ctdb_db, key);
490 if (header.rsn > c->rsn) {
491 DEBUG(DEBUG_ALERT,("pnn %u dmaster request with older RSN new-dmaster %u from %u real-dmaster=%u key %08x dbid 0x%08x gen=%u curgen=%u c->rsn=%llu header.rsn=%llu reqid=%u\n",
492 ctdb->pnn, c->dmaster, hdr->srcnode, header.dmaster, ctdb_hash(&key),
493 ctdb_db->db_id, hdr->generation, ctdb->vnn_map->generation,
494 (unsigned long long)c->rsn, (unsigned long long)header.rsn, c->hdr.reqid));
497 /* use the rsn from the sending node */
500 /* store the record flags from the sending node */
501 header.flags = record_flags;
503 /* check if the new dmaster is the lmaster, in which case we
504 skip the dmaster reply */
505 if (c->dmaster == ctdb->pnn) {
506 ctdb_become_dmaster(ctdb_db, hdr, key, data, c->rsn, record_flags);
508 ctdb_send_dmaster_reply(ctdb_db, &header, key, data, c->dmaster, hdr->reqid);
510 ret = ctdb_ltdb_unlock(ctdb_db, key);
512 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
517 static void ctdb_sticky_record_timeout(struct event_context *ev, struct timed_event *te,
518 struct timeval t, void *private_data)
520 struct ctdb_sticky_record *sr = talloc_get_type(private_data,
521 struct ctdb_sticky_record);
525 static void *ctdb_make_sticky_record_callback(void *parm, void *data)
528 DEBUG(DEBUG_ERR,("Already have sticky record registered. Free old %p and create new %p\n", data, parm));
535 ctdb_make_record_sticky(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key)
537 TALLOC_CTX *tmp_ctx = talloc_new(NULL);
539 struct ctdb_sticky_record *sr;
541 k = talloc_zero_size(tmp_ctx, ((key.dsize + 3) & 0xfffffffc) + 4);
543 DEBUG(DEBUG_ERR,("Failed to allocate key for sticky record\n"));
544 talloc_free(tmp_ctx);
548 k[0] = (key.dsize + 3) / 4 + 1;
549 memcpy(&k[1], key.dptr, key.dsize);
551 sr = trbt_lookuparray32(ctdb_db->sticky_records, k[0], &k[0]);
553 talloc_free(tmp_ctx);
557 sr = talloc(ctdb_db->sticky_records, struct ctdb_sticky_record);
559 talloc_free(tmp_ctx);
560 DEBUG(DEBUG_ERR,("Failed to allocate sticky record structure\n"));
565 sr->ctdb_db = ctdb_db;
568 DEBUG(DEBUG_ERR,("Make record sticky for %d seconds in db %s key:0x%08x.\n",
569 ctdb->tunable.sticky_duration,
570 ctdb_db->db_name, ctdb_hash(&key)));
572 trbt_insertarray32_callback(ctdb_db->sticky_records, k[0], &k[0], ctdb_make_sticky_record_callback, sr);
574 event_add_timed(ctdb->ev, sr, timeval_current_ofs(ctdb->tunable.sticky_duration, 0), ctdb_sticky_record_timeout, sr);
576 talloc_free(tmp_ctx);
580 struct pinned_down_requeue_handle {
581 struct ctdb_context *ctdb;
582 struct ctdb_req_header *hdr;
585 struct pinned_down_deferred_call {
586 struct ctdb_context *ctdb;
587 struct ctdb_req_header *hdr;
590 static void pinned_down_requeue(struct event_context *ev, struct timed_event *te,
591 struct timeval t, void *private_data)
593 struct pinned_down_requeue_handle *handle = talloc_get_type(private_data, struct pinned_down_requeue_handle);
594 struct ctdb_context *ctdb = handle->ctdb;
596 talloc_steal(ctdb, handle->hdr);
597 ctdb_call_input_pkt(ctdb, handle->hdr);
602 static int pinned_down_destructor(struct pinned_down_deferred_call *pinned_down)
604 struct ctdb_context *ctdb = pinned_down->ctdb;
605 struct pinned_down_requeue_handle *handle = talloc(ctdb, struct pinned_down_requeue_handle);
607 handle->ctdb = pinned_down->ctdb;
608 handle->hdr = pinned_down->hdr;
609 talloc_steal(handle, handle->hdr);
611 event_add_timed(ctdb->ev, handle, timeval_zero(), pinned_down_requeue, handle);
617 ctdb_defer_pinned_down_request(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_req_header *hdr)
619 TALLOC_CTX *tmp_ctx = talloc_new(NULL);
621 struct ctdb_sticky_record *sr;
622 struct pinned_down_deferred_call *pinned_down;
624 k = talloc_zero_size(tmp_ctx, ((key.dsize + 3) & 0xfffffffc) + 4);
626 DEBUG(DEBUG_ERR,("Failed to allocate key for sticky record\n"));
627 talloc_free(tmp_ctx);
631 k[0] = (key.dsize + 3) / 4 + 1;
632 memcpy(&k[1], key.dptr, key.dsize);
634 sr = trbt_lookuparray32(ctdb_db->sticky_records, k[0], &k[0]);
636 talloc_free(tmp_ctx);
640 talloc_free(tmp_ctx);
642 if (sr->pindown == NULL) {
646 pinned_down = talloc(sr->pindown, struct pinned_down_deferred_call);
647 if (pinned_down == NULL) {
648 DEBUG(DEBUG_ERR,("Failed to allocate structure for deferred pinned down request\n"));
652 pinned_down->ctdb = ctdb;
653 pinned_down->hdr = hdr;
655 talloc_set_destructor(pinned_down, pinned_down_destructor);
656 talloc_steal(pinned_down, hdr);
662 ctdb_update_db_stat_hot_keys(struct ctdb_db_context *ctdb_db, TDB_DATA key, int hopcount)
666 /* smallest value is always at index 0 */
667 if (hopcount <= ctdb_db->statistics.hot_keys[0].count) {
671 /* see if we already know this key */
672 for (i = 0; i < MAX_HOT_KEYS; i++) {
673 if (key.dsize != ctdb_db->statistics.hot_keys[i].key.dsize) {
676 if (memcmp(key.dptr, ctdb_db->statistics.hot_keys[i].key.dptr, key.dsize)) {
679 /* found an entry for this key */
680 if (hopcount <= ctdb_db->statistics.hot_keys[i].count) {
683 ctdb_db->statistics.hot_keys[i].count = hopcount;
687 if (ctdb_db->statistics.num_hot_keys < MAX_HOT_KEYS) {
688 id = ctdb_db->statistics.num_hot_keys;
689 ctdb_db->statistics.num_hot_keys++;
694 if (ctdb_db->statistics.hot_keys[id].key.dptr != NULL) {
695 talloc_free(ctdb_db->statistics.hot_keys[id].key.dptr);
697 ctdb_db->statistics.hot_keys[id].key.dsize = key.dsize;
698 ctdb_db->statistics.hot_keys[id].key.dptr = talloc_memdup(ctdb_db, key.dptr, key.dsize);
699 ctdb_db->statistics.hot_keys[id].count = hopcount;
700 DEBUG(DEBUG_NOTICE,("Updated hot key database=%s key=0x%08x id=%d hop_count=%d\n",
701 ctdb_db->db_name, ctdb_hash(&key), id, hopcount));
704 for (i = 1; i < MAX_HOT_KEYS; i++) {
705 if (ctdb_db->statistics.hot_keys[i].count == 0) {
708 if (ctdb_db->statistics.hot_keys[i].count < ctdb_db->statistics.hot_keys[0].count) {
709 hopcount = ctdb_db->statistics.hot_keys[i].count;
710 ctdb_db->statistics.hot_keys[i].count = ctdb_db->statistics.hot_keys[0].count;
711 ctdb_db->statistics.hot_keys[0].count = hopcount;
713 key = ctdb_db->statistics.hot_keys[i].key;
714 ctdb_db->statistics.hot_keys[i].key = ctdb_db->statistics.hot_keys[0].key;
715 ctdb_db->statistics.hot_keys[0].key = key;
721 called when a CTDB_REQ_CALL packet comes in
723 void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
725 struct ctdb_req_call *c = (struct ctdb_req_call *)hdr;
727 struct ctdb_reply_call *r;
729 struct ctdb_ltdb_header header;
730 struct ctdb_call *call;
731 struct ctdb_db_context *ctdb_db;
732 int tmp_count, bucket;
734 if (ctdb->methods == NULL) {
735 DEBUG(DEBUG_INFO,(__location__ " Failed ctdb_request_call. Transport is DOWN\n"));
740 ctdb_db = find_ctdb_db(ctdb, c->db_id);
742 ctdb_send_error(ctdb, hdr, -1,
743 "Unknown database in request. db_id==0x%08x",
748 call = talloc(hdr, struct ctdb_call);
749 CTDB_NO_MEMORY_FATAL(ctdb, call);
751 call->call_id = c->callid;
752 call->key.dptr = c->data;
753 call->key.dsize = c->keylen;
754 call->call_data.dptr = c->data + c->keylen;
755 call->call_data.dsize = c->calldatalen;
756 call->reply_data.dptr = NULL;
757 call->reply_data.dsize = 0;
760 /* If this record is pinned down we should defer the
761 request until the pindown times out
763 if (ctdb_db->sticky) {
764 if (ctdb_defer_pinned_down_request(ctdb, ctdb_db, call->key, hdr) == 0) {
766 ("Defer request for pinned down record in %s\n", ctdb_db->db_name));
773 /* determine if we are the dmaster for this key. This also
774 fetches the record data (if any), thus avoiding a 2nd fetch of the data
775 if the call will be answered locally */
777 ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, call->key, &header, hdr, &data,
778 ctdb_call_input_pkt, ctdb, false);
780 ctdb_send_error(ctdb, hdr, ret, "ltdb fetch failed in ctdb_request_call");
785 DEBUG(DEBUG_INFO,(__location__ " deferred ctdb_request_call\n"));
790 /* Dont do READONLY if we dont have a tracking database */
791 if ((c->flags & CTDB_WANT_READONLY) && !ctdb_db->readonly) {
792 c->flags &= ~CTDB_WANT_READONLY;
795 if (header.flags & CTDB_REC_RO_REVOKE_COMPLETE) {
796 header.flags &= ~CTDB_REC_RO_FLAGS;
797 CTDB_INCREMENT_STAT(ctdb, total_ro_revokes);
798 CTDB_INCREMENT_DB_STAT(ctdb_db, db_ro_revokes);
799 if (ctdb_ltdb_store(ctdb_db, call->key, &header, data) != 0) {
800 ctdb_fatal(ctdb, "Failed to write header with cleared REVOKE flag");
802 /* and clear out the tracking data */
803 if (tdb_delete(ctdb_db->rottdb, call->key) != 0) {
804 DEBUG(DEBUG_ERR,(__location__ " Failed to clear out trackingdb record\n"));
808 /* if we are revoking, we must defer all other calls until the revoke
811 if (header.flags & CTDB_REC_RO_REVOKING_READONLY) {
812 talloc_free(data.dptr);
813 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
815 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, call->key, hdr, ctdb_call_input_pkt, ctdb) != 0) {
816 ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
823 * If we are not the dmaster and are not hosting any delegations,
824 * then we redirect the request to the node than can answer it
825 * (the lmaster or the dmaster).
827 if ((header.dmaster != ctdb->pnn)
828 && (!(header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) ) {
829 talloc_free(data.dptr);
830 ctdb_call_send_redirect(ctdb, ctdb_db, call->key, c, &header);
832 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
834 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
840 if ( (!(c->flags & CTDB_WANT_READONLY))
841 && (header.flags & (CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY)) ) {
842 header.flags |= CTDB_REC_RO_REVOKING_READONLY;
843 if (ctdb_ltdb_store(ctdb_db, call->key, &header, data) != 0) {
844 ctdb_fatal(ctdb, "Failed to store record with HAVE_DELEGATIONS set");
846 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
848 if (ctdb_start_revoke_ro_record(ctdb, ctdb_db, call->key, &header, data) != 0) {
849 ctdb_fatal(ctdb, "Failed to start record revoke");
851 talloc_free(data.dptr);
853 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, call->key, hdr, ctdb_call_input_pkt, ctdb) != 0) {
854 ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
861 /* If this is the first request for delegation. bump rsn and set
862 * the delegations flag
864 if ((c->flags & CTDB_WANT_READONLY)
865 && (c->callid == CTDB_FETCH_WITH_HEADER_FUNC)
866 && (!(header.flags & CTDB_REC_RO_HAVE_DELEGATIONS))) {
868 header.flags |= CTDB_REC_RO_HAVE_DELEGATIONS;
869 if (ctdb_ltdb_store(ctdb_db, call->key, &header, data) != 0) {
870 ctdb_fatal(ctdb, "Failed to store record with HAVE_DELEGATIONS set");
873 if ((c->flags & CTDB_WANT_READONLY)
874 && (call->call_id == CTDB_FETCH_WITH_HEADER_FUNC)) {
877 tdata = tdb_fetch(ctdb_db->rottdb, call->key);
878 if (ctdb_trackingdb_add_pnn(ctdb, &tdata, c->hdr.srcnode) != 0) {
879 ctdb_fatal(ctdb, "Failed to add node to trackingdb");
881 if (tdb_store(ctdb_db->rottdb, call->key, tdata, TDB_REPLACE) != 0) {
882 ctdb_fatal(ctdb, "Failed to store trackingdb data");
886 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
888 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
891 len = offsetof(struct ctdb_reply_call, data) + data.dsize + sizeof(struct ctdb_ltdb_header);
892 r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REPLY_CALL, len,
893 struct ctdb_reply_call);
894 CTDB_NO_MEMORY_FATAL(ctdb, r);
895 r->hdr.destnode = c->hdr.srcnode;
896 r->hdr.reqid = c->hdr.reqid;
898 r->datalen = data.dsize + sizeof(struct ctdb_ltdb_header);
900 header.flags |= CTDB_REC_RO_HAVE_READONLY;
901 header.flags &= ~CTDB_REC_RO_HAVE_DELEGATIONS;
902 memcpy(&r->data[0], &header, sizeof(struct ctdb_ltdb_header));
905 memcpy(&r->data[sizeof(struct ctdb_ltdb_header)], data.dptr, data.dsize);
908 ctdb_queue_packet(ctdb, &r->hdr);
909 CTDB_INCREMENT_STAT(ctdb, total_ro_delegations);
910 CTDB_INCREMENT_DB_STAT(ctdb_db, db_ro_delegations);
917 CTDB_UPDATE_STAT(ctdb, max_hop_count, c->hopcount);
918 tmp_count = c->hopcount;
924 if (bucket >= MAX_COUNT_BUCKETS) {
925 bucket = MAX_COUNT_BUCKETS - 1;
927 CTDB_INCREMENT_STAT(ctdb, hop_count_bucket[bucket]);
928 CTDB_INCREMENT_DB_STAT(ctdb_db, hop_count_bucket[bucket]);
929 ctdb_update_db_stat_hot_keys(ctdb_db, call->key, c->hopcount);
931 /* If this database supports sticky records, then check if the
932 hopcount is big. If it is it means the record is hot and we
933 should make it sticky.
935 if (ctdb_db->sticky && c->hopcount >= ctdb->tunable.hopcount_make_sticky) {
936 ctdb_make_record_sticky(ctdb, ctdb_db, call->key);
940 /* Try if possible to migrate the record off to the caller node.
941 * From the clients perspective a fetch of the data is just as
942 * expensive as a migration.
944 if (c->hdr.srcnode != ctdb->pnn) {
945 if (ctdb_db->persistent_state) {
946 DEBUG(DEBUG_INFO, (__location__ " refusing migration"
947 " of key %s while transaction is active\n",
948 (char *)call->key.dptr));
950 DEBUG(DEBUG_DEBUG,("pnn %u starting migration of %08x to %u\n",
951 ctdb->pnn, ctdb_hash(&(call->key)), c->hdr.srcnode));
952 ctdb_call_send_dmaster(ctdb_db, c, &header, &(call->key), &data);
953 talloc_free(data.dptr);
955 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
957 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
964 ret = ctdb_call_local(ctdb_db, call, &header, hdr, &data, true);
966 DEBUG(DEBUG_ERR,(__location__ " ctdb_call_local failed\n"));
970 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
972 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
975 len = offsetof(struct ctdb_reply_call, data) + call->reply_data.dsize;
976 r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REPLY_CALL, len,
977 struct ctdb_reply_call);
978 CTDB_NO_MEMORY_FATAL(ctdb, r);
979 r->hdr.destnode = hdr->srcnode;
980 r->hdr.reqid = hdr->reqid;
981 r->status = call->status;
982 r->datalen = call->reply_data.dsize;
983 if (call->reply_data.dsize) {
984 memcpy(&r->data[0], call->reply_data.dptr, call->reply_data.dsize);
987 ctdb_queue_packet(ctdb, &r->hdr);
994 * called when a CTDB_REPLY_CALL packet comes in
996 * This packet comes in response to a CTDB_REQ_CALL request packet. It
997 * contains any reply data from the call
999 void ctdb_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
1001 struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr;
1002 struct ctdb_call_state *state;
1004 state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_call_state);
1005 if (state == NULL) {
1006 DEBUG(DEBUG_ERR, (__location__ " reqid %u not found\n", hdr->reqid));
1010 if (hdr->reqid != state->reqid) {
1011 /* we found a record but it was the wrong one */
1012 DEBUG(DEBUG_ERR, ("Dropped orphaned call reply with reqid:%u\n",hdr->reqid));
1017 /* read only delegation processing */
1018 /* If we got a FETCH_WITH_HEADER we should check if this is a ro
1019 * delegation since we may need to update the record header
1021 if (state->c->callid == CTDB_FETCH_WITH_HEADER_FUNC) {
1022 struct ctdb_db_context *ctdb_db = state->ctdb_db;
1023 struct ctdb_ltdb_header *header = (struct ctdb_ltdb_header *)&c->data[0];
1024 struct ctdb_ltdb_header oldheader;
1025 TDB_DATA key, data, olddata;
1028 if (!(header->flags & CTDB_REC_RO_HAVE_READONLY)) {
1033 key.dsize = state->c->keylen;
1034 key.dptr = state->c->data;
1035 ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr,
1036 ctdb_call_input_pkt, ctdb, false);
1041 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock in ctdb_reply_call\n"));
1045 ret = ctdb_ltdb_fetch(ctdb_db, key, &oldheader, state, &olddata);
1047 DEBUG(DEBUG_ERR, ("Failed to fetch old record in ctdb_reply_call\n"));
1048 ctdb_ltdb_unlock(ctdb_db, key);
1052 if (header->rsn <= oldheader.rsn) {
1053 ctdb_ltdb_unlock(ctdb_db, key);
1057 if (c->datalen < sizeof(struct ctdb_ltdb_header)) {
1058 DEBUG(DEBUG_ERR,(__location__ " Got FETCH_WITH_HEADER reply with too little data: %d bytes\n", c->datalen));
1059 ctdb_ltdb_unlock(ctdb_db, key);
1063 data.dsize = c->datalen - sizeof(struct ctdb_ltdb_header);
1064 data.dptr = &c->data[sizeof(struct ctdb_ltdb_header)];
1065 ret = ctdb_ltdb_store(ctdb_db, key, header, data);
1067 DEBUG(DEBUG_ERR, ("Failed to store new record in ctdb_reply_call\n"));
1068 ctdb_ltdb_unlock(ctdb_db, key);
1072 ctdb_ltdb_unlock(ctdb_db, key);
1076 state->call->reply_data.dptr = c->data;
1077 state->call->reply_data.dsize = c->datalen;
1078 state->call->status = c->status;
1080 talloc_steal(state, c);
1082 state->state = CTDB_CALL_DONE;
1083 if (state->async.fn) {
1084 state->async.fn(state);
1090 * called when a CTDB_REPLY_DMASTER packet comes in
1092 * This packet comes in from the lmaster in response to a CTDB_REQ_CALL
1093 * request packet. It means that the current dmaster wants to give us
1096 void ctdb_reply_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
1098 struct ctdb_reply_dmaster *c = (struct ctdb_reply_dmaster *)hdr;
1099 struct ctdb_db_context *ctdb_db;
1101 uint32_t record_flags = 0;
1105 ctdb_db = find_ctdb_db(ctdb, c->db_id);
1106 if (ctdb_db == NULL) {
1107 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_reply_dmaster\n", c->db_id));
1112 key.dsize = c->keylen;
1113 data.dptr = &c->data[key.dsize];
1114 data.dsize = c->datalen;
1115 len = offsetof(struct ctdb_reply_dmaster, data) + key.dsize + data.dsize
1117 if (len <= c->hdr.length) {
1118 memcpy(&record_flags, &c->data[c->keylen + c->datalen],
1119 sizeof(record_flags));
1122 ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr,
1123 ctdb_call_input_pkt, ctdb, false);
1128 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock in ctdb_reply_dmaster\n"));
1132 ctdb_become_dmaster(ctdb_db, hdr, key, data, c->rsn, record_flags);
1137 called when a CTDB_REPLY_ERROR packet comes in
1139 void ctdb_reply_error(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
1141 struct ctdb_reply_error *c = (struct ctdb_reply_error *)hdr;
1142 struct ctdb_call_state *state;
1144 state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_call_state);
1145 if (state == NULL) {
1146 DEBUG(DEBUG_ERR,("pnn %u Invalid reqid %u in ctdb_reply_error\n",
1147 ctdb->pnn, hdr->reqid));
1151 if (hdr->reqid != state->reqid) {
1152 /* we found a record but it was the wrong one */
1153 DEBUG(DEBUG_ERR, ("Dropped orphaned error reply with reqid:%u\n",hdr->reqid));
1157 talloc_steal(state, c);
1159 state->state = CTDB_CALL_ERROR;
1160 state->errmsg = (char *)c->msg;
1161 if (state->async.fn) {
1162 state->async.fn(state);
1170 static int ctdb_call_destructor(struct ctdb_call_state *state)
1172 DLIST_REMOVE(state->ctdb_db->ctdb->pending_calls, state);
1173 ctdb_reqid_remove(state->ctdb_db->ctdb, state->reqid);
1179 called when a ctdb_call needs to be resent after a reconfigure event
1181 static void ctdb_call_resend(struct ctdb_call_state *state)
1183 struct ctdb_context *ctdb = state->ctdb_db->ctdb;
1185 state->generation = ctdb->vnn_map->generation;
1187 /* use a new reqid, in case the old reply does eventually come in */
1188 ctdb_reqid_remove(ctdb, state->reqid);
1189 state->reqid = ctdb_reqid_new(ctdb, state);
1190 state->c->hdr.reqid = state->reqid;
1192 /* update the generation count for this request, so its valid with the new vnn_map */
1193 state->c->hdr.generation = state->generation;
1195 /* send the packet to ourselves, it will be redirected appropriately */
1196 state->c->hdr.destnode = ctdb->pnn;
1198 ctdb_queue_packet(ctdb, &state->c->hdr);
1199 DEBUG(DEBUG_NOTICE,("resent ctdb_call\n"));
1203 resend all pending calls on recovery
1205 void ctdb_call_resend_all(struct ctdb_context *ctdb)
1207 struct ctdb_call_state *state, *next;
1208 for (state=ctdb->pending_calls;state;state=next) {
1210 ctdb_call_resend(state);
1215 this allows the caller to setup a async.fn
1217 static void call_local_trigger(struct event_context *ev, struct timed_event *te,
1218 struct timeval t, void *private_data)
1220 struct ctdb_call_state *state = talloc_get_type(private_data, struct ctdb_call_state);
1221 if (state->async.fn) {
1222 state->async.fn(state);
1228 construct an event driven local ctdb_call
1230 this is used so that locally processed ctdb_call requests are processed
1231 in an event driven manner
1233 struct ctdb_call_state *ctdb_call_local_send(struct ctdb_db_context *ctdb_db,
1234 struct ctdb_call *call,
1235 struct ctdb_ltdb_header *header,
1238 struct ctdb_call_state *state;
1239 struct ctdb_context *ctdb = ctdb_db->ctdb;
1242 state = talloc_zero(ctdb_db, struct ctdb_call_state);
1243 CTDB_NO_MEMORY_NULL(ctdb, state);
1245 talloc_steal(state, data->dptr);
1247 state->state = CTDB_CALL_DONE;
1248 state->call = talloc(state, struct ctdb_call);
1249 CTDB_NO_MEMORY_NULL(ctdb, state->call);
1250 *(state->call) = *call;
1251 state->ctdb_db = ctdb_db;
1253 ret = ctdb_call_local(ctdb_db, state->call, header, state, data, true);
1255 DEBUG(DEBUG_DEBUG,("ctdb_call_local() failed, ignoring return code %d\n", ret));
1258 event_add_timed(ctdb->ev, state, timeval_zero(), call_local_trigger, state);
1265 make a remote ctdb call - async send. Called in daemon context.
1267 This constructs a ctdb_call request and queues it for processing.
1268 This call never blocks.
1270 struct ctdb_call_state *ctdb_daemon_call_send_remote(struct ctdb_db_context *ctdb_db,
1271 struct ctdb_call *call,
1272 struct ctdb_ltdb_header *header)
1275 struct ctdb_call_state *state;
1276 struct ctdb_context *ctdb = ctdb_db->ctdb;
1278 if (ctdb->methods == NULL) {
1279 DEBUG(DEBUG_INFO,(__location__ " Failed send packet. Transport is down\n"));
1283 state = talloc_zero(ctdb_db, struct ctdb_call_state);
1284 CTDB_NO_MEMORY_NULL(ctdb, state);
1285 state->call = talloc(state, struct ctdb_call);
1286 CTDB_NO_MEMORY_NULL(ctdb, state->call);
1288 state->reqid = ctdb_reqid_new(ctdb, state);
1289 state->ctdb_db = ctdb_db;
1290 talloc_set_destructor(state, ctdb_call_destructor);
1292 len = offsetof(struct ctdb_req_call, data) + call->key.dsize + call->call_data.dsize;
1293 state->c = ctdb_transport_allocate(ctdb, state, CTDB_REQ_CALL, len,
1294 struct ctdb_req_call);
1295 CTDB_NO_MEMORY_NULL(ctdb, state->c);
1296 state->c->hdr.destnode = header->dmaster;
1298 /* this limits us to 16k outstanding messages - not unreasonable */
1299 state->c->hdr.reqid = state->reqid;
1300 state->c->flags = call->flags;
1301 state->c->db_id = ctdb_db->db_id;
1302 state->c->callid = call->call_id;
1303 state->c->hopcount = 0;
1304 state->c->keylen = call->key.dsize;
1305 state->c->calldatalen = call->call_data.dsize;
1306 memcpy(&state->c->data[0], call->key.dptr, call->key.dsize);
1307 memcpy(&state->c->data[call->key.dsize],
1308 call->call_data.dptr, call->call_data.dsize);
1309 *(state->call) = *call;
1310 state->call->call_data.dptr = &state->c->data[call->key.dsize];
1311 state->call->key.dptr = &state->c->data[0];
1313 state->state = CTDB_CALL_WAIT;
1314 state->generation = ctdb->vnn_map->generation;
1316 DLIST_ADD(ctdb->pending_calls, state);
1318 ctdb_queue_packet(ctdb, &state->c->hdr);
1324 make a remote ctdb call - async recv - called in daemon context
1326 This is called when the program wants to wait for a ctdb_call to complete and get the
1327 results. This call will block unless the call has already completed.
1329 int ctdb_daemon_call_recv(struct ctdb_call_state *state, struct ctdb_call *call)
1331 while (state->state < CTDB_CALL_DONE) {
1332 event_loop_once(state->ctdb_db->ctdb->ev);
1334 if (state->state != CTDB_CALL_DONE) {
1335 ctdb_set_error(state->ctdb_db->ctdb, "%s", state->errmsg);
1340 if (state->call->reply_data.dsize) {
1341 call->reply_data.dptr = talloc_memdup(call,
1342 state->call->reply_data.dptr,
1343 state->call->reply_data.dsize);
1344 call->reply_data.dsize = state->call->reply_data.dsize;
1346 call->reply_data.dptr = NULL;
1347 call->reply_data.dsize = 0;
1349 call->status = state->call->status;
1356 send a keepalive packet to the other node
1358 void ctdb_send_keepalive(struct ctdb_context *ctdb, uint32_t destnode)
1360 struct ctdb_req_keepalive *r;
1362 if (ctdb->methods == NULL) {
1363 DEBUG(DEBUG_INFO,(__location__ " Failed to send keepalive. Transport is DOWN\n"));
1367 r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_KEEPALIVE,
1368 sizeof(struct ctdb_req_keepalive),
1369 struct ctdb_req_keepalive);
1370 CTDB_NO_MEMORY_FATAL(ctdb, r);
1371 r->hdr.destnode = destnode;
1374 CTDB_INCREMENT_STAT(ctdb, keepalive_packets_sent);
1376 ctdb_queue_packet(ctdb, &r->hdr);
1383 struct revokechild_deferred_call {
1384 struct ctdb_context *ctdb;
1385 struct ctdb_req_header *hdr;
1386 deferred_requeue_fn fn;
1390 struct revokechild_handle {
1391 struct revokechild_handle *next, *prev;
1392 struct ctdb_context *ctdb;
1393 struct ctdb_db_context *ctdb_db;
1394 struct fd_event *fde;
1401 struct revokechild_requeue_handle {
1402 struct ctdb_context *ctdb;
1403 struct ctdb_req_header *hdr;
1404 deferred_requeue_fn fn;
1408 static void deferred_call_requeue(struct event_context *ev, struct timed_event *te,
1409 struct timeval t, void *private_data)
1411 struct revokechild_requeue_handle *requeue_handle = talloc_get_type(private_data, struct revokechild_requeue_handle);
1413 requeue_handle->fn(requeue_handle->ctx, requeue_handle->hdr);
1414 talloc_free(requeue_handle);
1417 static int deferred_call_destructor(struct revokechild_deferred_call *deferred_call)
1419 struct ctdb_context *ctdb = deferred_call->ctdb;
1420 struct revokechild_requeue_handle *requeue_handle = talloc(ctdb, struct revokechild_requeue_handle);
1421 struct ctdb_req_call *c = (struct ctdb_req_call *)deferred_call->hdr;
1423 requeue_handle->ctdb = ctdb;
1424 requeue_handle->hdr = deferred_call->hdr;
1425 requeue_handle->fn = deferred_call->fn;
1426 requeue_handle->ctx = deferred_call->ctx;
1427 talloc_steal(requeue_handle, requeue_handle->hdr);
1429 /* when revoking, any READONLY requests have 1 second grace to let read/write finish first */
1430 event_add_timed(ctdb->ev, requeue_handle, timeval_current_ofs(c->flags & CTDB_WANT_READONLY ? 1 : 0, 0), deferred_call_requeue, requeue_handle);
1436 static int revokechild_destructor(struct revokechild_handle *rc)
1438 if (rc->fde != NULL) {
1439 talloc_free(rc->fde);
1442 if (rc->fd[0] != -1) {
1445 if (rc->fd[1] != -1) {
1448 ctdb_kill(rc->ctdb, rc->child, SIGKILL);
1450 DLIST_REMOVE(rc->ctdb_db->revokechild_active, rc);
1454 static void revokechild_handler(struct event_context *ev, struct fd_event *fde,
1455 uint16_t flags, void *private_data)
1457 struct revokechild_handle *rc = talloc_get_type(private_data,
1458 struct revokechild_handle);
1462 ret = sys_read(rc->fd[0], &c, 1);
1464 DEBUG(DEBUG_ERR,("Failed to read status from revokechild. errno:%d\n", errno));
1470 DEBUG(DEBUG_ERR,("revokechild returned failure. status:%d\n", c));
1479 struct ctdb_revoke_state {
1480 struct ctdb_db_context *ctdb_db;
1482 struct ctdb_ltdb_header *header;
1489 static void update_record_cb(struct ctdb_client_control_state *state)
1491 struct ctdb_revoke_state *revoke_state;
1495 if (state == NULL) {
1498 revoke_state = state->async.private_data;
1500 state->async.fn = NULL;
1501 ret = ctdb_control_recv(state->ctdb, state, state, NULL, &res, NULL);
1502 if ((ret != 0) || (res != 0)) {
1503 DEBUG(DEBUG_ERR,("Recv for revoke update record failed ret:%d res:%d\n", ret, res));
1504 revoke_state->status = -1;
1507 revoke_state->count--;
1508 if (revoke_state->count <= 0) {
1509 revoke_state->finished = 1;
1513 static void revoke_send_cb(struct ctdb_context *ctdb, uint32_t pnn, void *private_data)
1515 struct ctdb_revoke_state *revoke_state = private_data;
1516 struct ctdb_client_control_state *state;
1518 state = ctdb_ctrl_updaterecord_send(ctdb, revoke_state, timeval_current_ofs(ctdb->tunable.control_timeout,0), pnn, revoke_state->ctdb_db, revoke_state->key, revoke_state->header, revoke_state->data);
1519 if (state == NULL) {
1520 DEBUG(DEBUG_ERR,("Failure to send update record to revoke readonly delegation\n"));
1521 revoke_state->status = -1;
1524 state->async.fn = update_record_cb;
1525 state->async.private_data = revoke_state;
1527 revoke_state->count++;
1531 static void ctdb_revoke_timeout_handler(struct event_context *ev, struct timed_event *te,
1532 struct timeval yt, void *private_data)
1534 struct ctdb_revoke_state *state = private_data;
1536 DEBUG(DEBUG_ERR,("Timed out waiting for revoke to finish\n"));
1537 state->finished = 1;
1541 static int ctdb_revoke_all_delegations(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA tdata, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
1543 struct ctdb_revoke_state *state = talloc_zero(ctdb, struct ctdb_revoke_state);
1544 struct ctdb_ltdb_header new_header;
1547 state->ctdb_db = ctdb_db;
1549 state->header = header;
1552 ctdb_trackingdb_traverse(ctdb, tdata, revoke_send_cb, state);
1554 event_add_timed(ctdb->ev, state, timeval_current_ofs(ctdb->tunable.control_timeout, 0), ctdb_revoke_timeout_handler, state);
1556 while (state->finished == 0) {
1557 event_loop_once(ctdb->ev);
1560 if (ctdb_ltdb_lock(ctdb_db, key) != 0) {
1561 DEBUG(DEBUG_ERR,("Failed to chainlock the database in revokechild\n"));
1565 if (ctdb_ltdb_fetch(ctdb_db, key, &new_header, state, &new_data) != 0) {
1566 ctdb_ltdb_unlock(ctdb_db, key);
1567 DEBUG(DEBUG_ERR,("Failed for fetch tdb record in revokechild\n"));
1572 if (new_header.rsn > header->rsn) {
1573 ctdb_ltdb_unlock(ctdb_db, key);
1574 DEBUG(DEBUG_ERR,("RSN too high in tdb record in revokechild\n"));
1578 if ( (new_header.flags & (CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_HAVE_DELEGATIONS)) != (CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_HAVE_DELEGATIONS) ) {
1579 ctdb_ltdb_unlock(ctdb_db, key);
1580 DEBUG(DEBUG_ERR,("Flags are wrong in tdb record in revokechild\n"));
1586 * If revoke on all nodes succeed, revoke is complete. Otherwise,
1587 * remove CTDB_REC_RO_REVOKING_READONLY flag and retry.
1589 if (state->status == 0) {
1591 new_header.flags |= CTDB_REC_RO_REVOKE_COMPLETE;
1593 DEBUG(DEBUG_NOTICE, ("Revoke all delegations failed, retrying.\n"));
1594 new_header.flags &= ~CTDB_REC_RO_REVOKING_READONLY;
1596 if (ctdb_ltdb_store(ctdb_db, key, &new_header, new_data) != 0) {
1597 ctdb_ltdb_unlock(ctdb_db, key);
1598 DEBUG(DEBUG_ERR,("Failed to write new record in revokechild\n"));
1602 ctdb_ltdb_unlock(ctdb_db, key);
1609 int ctdb_start_revoke_ro_record(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
1612 struct revokechild_handle *rc;
1613 pid_t parent = getpid();
1616 header->flags &= ~(CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY);
1617 header->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
1620 if ((rc = talloc_zero(ctdb_db, struct revokechild_handle)) == NULL) {
1621 DEBUG(DEBUG_ERR,("Failed to allocate revokechild_handle\n"));
1625 tdata = tdb_fetch(ctdb_db->rottdb, key);
1626 if (tdata.dsize > 0) {
1630 tdata.dptr = talloc_memdup(rc, tdata.dptr, tdata.dsize);
1636 rc->ctdb_db = ctdb_db;
1640 talloc_set_destructor(rc, revokechild_destructor);
1642 rc->key.dsize = key.dsize;
1643 rc->key.dptr = talloc_memdup(rc, key.dptr, key.dsize);
1644 if (rc->key.dptr == NULL) {
1645 DEBUG(DEBUG_ERR,("Failed to allocate key for revokechild_handle\n"));
1652 DEBUG(DEBUG_ERR,("Failed to allocate key for revokechild_handle\n"));
1658 rc->child = ctdb_fork(ctdb);
1659 if (rc->child == (pid_t)-1) {
1660 DEBUG(DEBUG_ERR,("Failed to fork child for revokechild\n"));
1665 if (rc->child == 0) {
1668 debug_extra = talloc_asprintf(NULL, "revokechild-%s:", ctdb_db->db_name);
1670 ctdb_set_process_name("ctdb_revokechild");
1671 if (switch_from_server_to_client(ctdb, "revokechild-%s", ctdb_db->db_name) != 0) {
1672 DEBUG(DEBUG_ERR,("Failed to switch from server to client for revokechild process\n"));
1674 goto child_finished;
1677 c = ctdb_revoke_all_delegations(ctdb, ctdb_db, tdata, key, header, data);
1680 sys_write(rc->fd[1], &c, 1);
1681 /* make sure we die when our parent dies */
1682 while (ctdb_kill(ctdb, parent, 0) == 0 || errno != ESRCH) {
1690 set_close_on_exec(rc->fd[0]);
1692 /* This is an active revokechild child process */
1693 DLIST_ADD_END(ctdb_db->revokechild_active, rc, NULL);
1695 rc->fde = event_add_fd(ctdb->ev, rc, rc->fd[0],
1696 EVENT_FD_READ, revokechild_handler,
1698 if (rc->fde == NULL) {
1699 DEBUG(DEBUG_ERR,("Failed to set up fd event for revokechild process\n"));
1702 tevent_fd_set_auto_close(rc->fde);
1707 int ctdb_add_revoke_deferred_call(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_req_header *hdr, deferred_requeue_fn fn, void *call_context)
1709 struct revokechild_handle *rc;
1710 struct revokechild_deferred_call *deferred_call;
1712 for (rc = ctdb_db->revokechild_active; rc; rc = rc->next) {
1713 if (rc->key.dsize == 0) {
1716 if (rc->key.dsize != key.dsize) {
1719 if (!memcmp(rc->key.dptr, key.dptr, key.dsize)) {
1725 DEBUG(DEBUG_ERR,("Failed to add deferred call to revoke list. revoke structure not found\n"));
1729 deferred_call = talloc(rc, struct revokechild_deferred_call);
1730 if (deferred_call == NULL) {
1731 DEBUG(DEBUG_ERR,("Failed to allocate deferred call structure for revoking record\n"));
1735 deferred_call->ctdb = ctdb;
1736 deferred_call->hdr = hdr;
1737 deferred_call->fn = fn;
1738 deferred_call->ctx = call_context;
1740 talloc_set_destructor(deferred_call, deferred_call_destructor);
1741 talloc_steal(deferred_call, hdr);