2 ctdb_call protocol code
4 Copyright (C) Andrew Tridgell 2006
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
20 see http://wiki.samba.org/index.php/Samba_%26_Clustering for
21 protocol design and packet details
24 #include "system/network.h"
25 #include "system/filesys.h"
30 #include "lib/util/dlinklist.h"
31 #include "lib/util/debug.h"
32 #include "lib/util/samba_util.h"
34 #include "ctdb_private.h"
35 #include "ctdb_client.h"
36 #include "ctdb_logging.h"
38 #include "common/rb_tree.h"
39 #include "common/reqid.h"
40 #include "common/system.h"
41 #include "common/common.h"
43 struct ctdb_sticky_record {
44 struct ctdb_context *ctdb;
45 struct ctdb_db_context *ctdb_db;
50 find the ctdb_db from a db index
52 struct ctdb_db_context *find_ctdb_db(struct ctdb_context *ctdb, uint32_t id)
54 struct ctdb_db_context *ctdb_db;
56 for (ctdb_db=ctdb->db_list; ctdb_db; ctdb_db=ctdb_db->next) {
57 if (ctdb_db->db_id == id) {
65 a varient of input packet that can be used in lock requeue
67 static void ctdb_call_input_pkt(void *p, struct ctdb_req_header *hdr)
69 struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
70 ctdb_input_pkt(ctdb, hdr);
77 static void ctdb_send_error(struct ctdb_context *ctdb,
78 struct ctdb_req_header *hdr, uint32_t status,
79 const char *fmt, ...) PRINTF_ATTRIBUTE(4,5);
80 static void ctdb_send_error(struct ctdb_context *ctdb,
81 struct ctdb_req_header *hdr, uint32_t status,
85 struct ctdb_reply_error *r;
89 if (ctdb->methods == NULL) {
90 DEBUG(DEBUG_INFO,(__location__ " Failed to send error. Transport is DOWN\n"));
95 msg = talloc_vasprintf(ctdb, fmt, ap);
97 ctdb_fatal(ctdb, "Unable to allocate error in ctdb_send_error\n");
101 msglen = strlen(msg)+1;
102 len = offsetof(struct ctdb_reply_error, msg);
103 r = ctdb_transport_allocate(ctdb, msg, CTDB_REPLY_ERROR, len + msglen,
104 struct ctdb_reply_error);
105 CTDB_NO_MEMORY_FATAL(ctdb, r);
107 r->hdr.destnode = hdr->srcnode;
108 r->hdr.reqid = hdr->reqid;
111 memcpy(&r->msg[0], msg, msglen);
113 ctdb_queue_packet(ctdb, &r->hdr);
120 * send a redirect reply
122 * The logic behind this function is this:
124 * A client wants to grab a record and sends a CTDB_REQ_CALL packet
125 * to its local ctdb (ctdb_request_call). If the node is not itself
126 * the record's DMASTER, it first redirects the packet to the
127 * record's LMASTER. The LMASTER then redirects the call packet to
128 * the current DMASTER. Note that this works because of this: When
129 * a record is migrated off a node, then the new DMASTER is stored
130 * in the record's copy on the former DMASTER.
132 static void ctdb_call_send_redirect(struct ctdb_context *ctdb,
133 struct ctdb_db_context *ctdb_db,
135 struct ctdb_req_call_old *c,
136 struct ctdb_ltdb_header *header)
138 uint32_t lmaster = ctdb_lmaster(ctdb, &key);
140 c->hdr.destnode = lmaster;
141 if (ctdb->pnn == lmaster) {
142 c->hdr.destnode = header->dmaster;
146 if (c->hopcount%100 > 95) {
147 DEBUG(DEBUG_WARNING,("High hopcount %d dbid:%s "
148 "key:0x%08x reqid=%08x pnn:%d src:%d lmaster:%d "
149 "header->dmaster:%d dst:%d\n",
150 c->hopcount, ctdb_db->db_name, ctdb_hash(&key),
151 c->hdr.reqid, ctdb->pnn, c->hdr.srcnode, lmaster,
152 header->dmaster, c->hdr.destnode));
155 ctdb_queue_packet(ctdb, &c->hdr);
162 caller must have the chainlock before calling this routine. Caller must be
165 static void ctdb_send_dmaster_reply(struct ctdb_db_context *ctdb_db,
166 struct ctdb_ltdb_header *header,
167 TDB_DATA key, TDB_DATA data,
168 uint32_t new_dmaster,
171 struct ctdb_context *ctdb = ctdb_db->ctdb;
172 struct ctdb_reply_dmaster *r;
176 if (ctdb->pnn != ctdb_lmaster(ctdb, &key)) {
177 DEBUG(DEBUG_ALERT,(__location__ " Caller is not lmaster!\n"));
181 header->dmaster = new_dmaster;
182 ret = ctdb_ltdb_store(ctdb_db, key, header, data);
184 ctdb_fatal(ctdb, "ctdb_send_dmaster_reply unable to update dmaster");
188 if (ctdb->methods == NULL) {
189 ctdb_fatal(ctdb, "ctdb_send_dmaster_reply cant update dmaster since transport is down");
193 /* put the packet on a temporary context, allowing us to safely free
194 it below even if ctdb_reply_dmaster() has freed it already */
195 tmp_ctx = talloc_new(ctdb);
197 /* send the CTDB_REPLY_DMASTER */
198 len = offsetof(struct ctdb_reply_dmaster, data) + key.dsize + data.dsize + sizeof(uint32_t);
199 r = ctdb_transport_allocate(ctdb, tmp_ctx, CTDB_REPLY_DMASTER, len,
200 struct ctdb_reply_dmaster);
201 CTDB_NO_MEMORY_FATAL(ctdb, r);
203 r->hdr.destnode = new_dmaster;
204 r->hdr.reqid = reqid;
205 r->hdr.generation = ctdb_db->generation;
206 r->rsn = header->rsn;
207 r->keylen = key.dsize;
208 r->datalen = data.dsize;
209 r->db_id = ctdb_db->db_id;
210 memcpy(&r->data[0], key.dptr, key.dsize);
211 memcpy(&r->data[key.dsize], data.dptr, data.dsize);
212 memcpy(&r->data[key.dsize+data.dsize], &header->flags, sizeof(uint32_t));
214 ctdb_queue_packet(ctdb, &r->hdr);
216 talloc_free(tmp_ctx);
220 send a dmaster request (give another node the dmaster for a record)
222 This is always sent to the lmaster, which ensures that the lmaster
223 always knows who the dmaster is. The lmaster will then send a
224 CTDB_REPLY_DMASTER to the new dmaster
226 static void ctdb_call_send_dmaster(struct ctdb_db_context *ctdb_db,
227 struct ctdb_req_call_old *c,
228 struct ctdb_ltdb_header *header,
229 TDB_DATA *key, TDB_DATA *data)
231 struct ctdb_req_dmaster *r;
232 struct ctdb_context *ctdb = ctdb_db->ctdb;
234 uint32_t lmaster = ctdb_lmaster(ctdb, key);
236 if (ctdb->methods == NULL) {
237 ctdb_fatal(ctdb, "Failed ctdb_call_send_dmaster since transport is down");
241 if (data->dsize != 0) {
242 header->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
245 if (lmaster == ctdb->pnn) {
246 ctdb_send_dmaster_reply(ctdb_db, header, *key, *data,
247 c->hdr.srcnode, c->hdr.reqid);
251 len = offsetof(struct ctdb_req_dmaster, data) + key->dsize + data->dsize
253 r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_DMASTER, len,
254 struct ctdb_req_dmaster);
255 CTDB_NO_MEMORY_FATAL(ctdb, r);
256 r->hdr.destnode = lmaster;
257 r->hdr.reqid = c->hdr.reqid;
258 r->hdr.generation = ctdb_db->generation;
260 r->rsn = header->rsn;
261 r->dmaster = c->hdr.srcnode;
262 r->keylen = key->dsize;
263 r->datalen = data->dsize;
264 memcpy(&r->data[0], key->dptr, key->dsize);
265 memcpy(&r->data[key->dsize], data->dptr, data->dsize);
266 memcpy(&r->data[key->dsize + data->dsize], &header->flags, sizeof(uint32_t));
268 header->dmaster = c->hdr.srcnode;
269 if (ctdb_ltdb_store(ctdb_db, *key, header, *data) != 0) {
270 ctdb_fatal(ctdb, "Failed to store record in ctdb_call_send_dmaster");
273 ctdb_queue_packet(ctdb, &r->hdr);
278 static void ctdb_sticky_pindown_timeout(struct tevent_context *ev,
279 struct tevent_timer *te,
280 struct timeval t, void *private_data)
282 struct ctdb_sticky_record *sr = talloc_get_type(private_data,
283 struct ctdb_sticky_record);
285 DEBUG(DEBUG_ERR,("Pindown timeout db:%s unstick record\n", sr->ctdb_db->db_name));
286 if (sr->pindown != NULL) {
287 talloc_free(sr->pindown);
293 ctdb_set_sticky_pindown(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key)
295 TALLOC_CTX *tmp_ctx = talloc_new(NULL);
297 struct ctdb_sticky_record *sr;
299 k = ctdb_key_to_idkey(tmp_ctx, key);
301 DEBUG(DEBUG_ERR,("Failed to allocate key for sticky record\n"));
302 talloc_free(tmp_ctx);
306 sr = trbt_lookuparray32(ctdb_db->sticky_records, k[0], &k[0]);
308 talloc_free(tmp_ctx);
312 talloc_free(tmp_ctx);
314 if (sr->pindown == NULL) {
315 DEBUG(DEBUG_ERR,("Pinning down record in %s for %d ms\n", ctdb_db->db_name, ctdb->tunable.sticky_pindown));
316 sr->pindown = talloc_new(sr);
317 if (sr->pindown == NULL) {
318 DEBUG(DEBUG_ERR,("Failed to allocate pindown context for sticky record\n"));
321 tevent_add_timer(ctdb->ev, sr->pindown,
322 timeval_current_ofs(ctdb->tunable.sticky_pindown / 1000,
323 (ctdb->tunable.sticky_pindown * 1000) % 1000000),
324 ctdb_sticky_pindown_timeout, sr);
331 called when a CTDB_REPLY_DMASTER packet comes in, or when the lmaster
332 gets a CTDB_REQUEST_DMASTER for itself. We become the dmaster.
334 must be called with the chainlock held. This function releases the chainlock
336 static void ctdb_become_dmaster(struct ctdb_db_context *ctdb_db,
337 struct ctdb_req_header *hdr,
338 TDB_DATA key, TDB_DATA data,
339 uint64_t rsn, uint32_t record_flags)
341 struct ctdb_call_state *state;
342 struct ctdb_context *ctdb = ctdb_db->ctdb;
343 struct ctdb_ltdb_header header;
346 DEBUG(DEBUG_DEBUG,("pnn %u dmaster response %08x\n", ctdb->pnn, ctdb_hash(&key)));
350 header.dmaster = ctdb->pnn;
351 header.flags = record_flags;
353 state = reqid_find(ctdb->idr, hdr->reqid, struct ctdb_call_state);
356 if (state->call->flags & CTDB_CALL_FLAG_VACUUM_MIGRATION) {
358 * We temporarily add the VACUUM_MIGRATED flag to
359 * the record flags, so that ctdb_ltdb_store can
360 * decide whether the record should be stored or
363 header.flags |= CTDB_REC_FLAG_VACUUM_MIGRATED;
367 if (ctdb_ltdb_store(ctdb_db, key, &header, data) != 0) {
368 ctdb_fatal(ctdb, "ctdb_reply_dmaster store failed\n");
370 ret = ctdb_ltdb_unlock(ctdb_db, key);
372 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
377 /* we just became DMASTER and this database is "sticky",
378 see if the record is flagged as "hot" and set up a pin-down
379 context to stop migrations for a little while if so
381 if (ctdb_db->sticky) {
382 ctdb_set_sticky_pindown(ctdb, ctdb_db, key);
386 DEBUG(DEBUG_ERR,("pnn %u Invalid reqid %u in ctdb_become_dmaster from node %u\n",
387 ctdb->pnn, hdr->reqid, hdr->srcnode));
389 ret = ctdb_ltdb_unlock(ctdb_db, key);
391 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
396 if (key.dsize != state->call->key.dsize || memcmp(key.dptr, state->call->key.dptr, key.dsize)) {
397 DEBUG(DEBUG_ERR, ("Got bogus DMASTER packet reqid:%u from node %u. Key does not match key held in matching idr.\n", hdr->reqid, hdr->srcnode));
399 ret = ctdb_ltdb_unlock(ctdb_db, key);
401 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
406 if (hdr->reqid != state->reqid) {
407 /* we found a record but it was the wrong one */
408 DEBUG(DEBUG_ERR, ("Dropped orphan in ctdb_become_dmaster with reqid:%u\n from node %u", hdr->reqid, hdr->srcnode));
410 ret = ctdb_ltdb_unlock(ctdb_db, key);
412 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
417 ctdb_call_local(ctdb_db, state->call, &header, state, &data, true);
419 ret = ctdb_ltdb_unlock(ctdb_db, state->call->key);
421 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
424 state->state = CTDB_CALL_DONE;
425 if (state->async.fn) {
426 state->async.fn(state);
430 struct dmaster_defer_call {
431 struct dmaster_defer_call *next, *prev;
432 struct ctdb_context *ctdb;
433 struct ctdb_req_header *hdr;
436 struct dmaster_defer_queue {
437 struct ctdb_db_context *ctdb_db;
439 struct dmaster_defer_call *deferred_calls;
442 static void dmaster_defer_reprocess(struct tevent_context *ev,
443 struct tevent_timer *te,
447 struct dmaster_defer_call *call = talloc_get_type(
448 private_data, struct dmaster_defer_call);
450 ctdb_input_pkt(call->ctdb, call->hdr);
454 static int dmaster_defer_queue_destructor(struct dmaster_defer_queue *ddq)
456 /* Ignore requests, if database recovery happens in-between. */
457 if (ddq->generation != ddq->ctdb_db->generation) {
461 while (ddq->deferred_calls != NULL) {
462 struct dmaster_defer_call *call = ddq->deferred_calls;
464 DLIST_REMOVE(ddq->deferred_calls, call);
466 talloc_steal(call->ctdb, call);
467 tevent_add_timer(call->ctdb->ev, call, timeval_zero(),
468 dmaster_defer_reprocess, call);
473 static void *insert_ddq_callback(void *parm, void *data)
482 * This function is used to reigster a key in database that needs to be updated.
483 * Any requests for that key should get deferred till this is completed.
485 static int dmaster_defer_setup(struct ctdb_db_context *ctdb_db,
486 struct ctdb_req_header *hdr,
490 struct dmaster_defer_queue *ddq;
492 k = ctdb_key_to_idkey(hdr, key);
494 DEBUG(DEBUG_ERR, ("Failed to allocate key for dmaster defer setup\n"));
499 ddq = trbt_lookuparray32(ctdb_db->defer_dmaster, k[0], k);
501 if (ddq->generation == ctdb_db->generation) {
506 /* Recovery ocurred - get rid of old queue. All the deferred
507 * requests will be resent anyway from ctdb_call_resend_db.
512 ddq = talloc(hdr, struct dmaster_defer_queue);
514 DEBUG(DEBUG_ERR, ("Failed to allocate dmaster defer queue\n"));
518 ddq->ctdb_db = ctdb_db;
519 ddq->generation = hdr->generation;
520 ddq->deferred_calls = NULL;
522 trbt_insertarray32_callback(ctdb_db->defer_dmaster, k[0], k,
523 insert_ddq_callback, ddq);
524 talloc_set_destructor(ddq, dmaster_defer_queue_destructor);
530 static int dmaster_defer_add(struct ctdb_db_context *ctdb_db,
531 struct ctdb_req_header *hdr,
534 struct dmaster_defer_queue *ddq;
535 struct dmaster_defer_call *call;
538 k = ctdb_key_to_idkey(hdr, key);
540 DEBUG(DEBUG_ERR, ("Failed to allocate key for dmaster defer add\n"));
544 ddq = trbt_lookuparray32(ctdb_db->defer_dmaster, k[0], k);
552 if (ddq->generation != hdr->generation) {
553 talloc_set_destructor(ddq, NULL);
558 call = talloc(ddq, struct dmaster_defer_call);
560 DEBUG(DEBUG_ERR, ("Failed to allocate dmaster defer call\n"));
564 call->ctdb = ctdb_db->ctdb;
565 call->hdr = talloc_steal(call, hdr);
567 DLIST_ADD_END(ddq->deferred_calls, call, NULL);
573 called when a CTDB_REQ_DMASTER packet comes in
575 this comes into the lmaster for a record when the current dmaster
576 wants to give up the dmaster role and give it to someone else
578 void ctdb_request_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
580 struct ctdb_req_dmaster *c = (struct ctdb_req_dmaster *)hdr;
581 TDB_DATA key, data, data2;
582 struct ctdb_ltdb_header header;
583 struct ctdb_db_context *ctdb_db;
584 uint32_t record_flags = 0;
588 ctdb_db = find_ctdb_db(ctdb, c->db_id);
590 ctdb_send_error(ctdb, hdr, -1,
591 "Unknown database in request. db_id==0x%08x",
596 if (hdr->generation != ctdb_db->generation) {
598 ("ctdb operation %u request %u from node %u to %u had an"
599 " invalid generation:%u while our generation is:%u\n",
600 hdr->operation, hdr->reqid, hdr->srcnode, hdr->destnode,
601 hdr->generation, ctdb_db->generation));
606 key.dsize = c->keylen;
607 data.dptr = c->data + c->keylen;
608 data.dsize = c->datalen;
609 len = offsetof(struct ctdb_req_dmaster, data) + key.dsize + data.dsize
611 if (len <= c->hdr.length) {
612 memcpy(&record_flags, &c->data[c->keylen + c->datalen],
613 sizeof(record_flags));
616 dmaster_defer_setup(ctdb_db, hdr, key);
618 /* fetch the current record */
619 ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header, hdr, &data2,
620 ctdb_call_input_pkt, ctdb, false);
622 ctdb_fatal(ctdb, "ctdb_req_dmaster failed to fetch record");
626 DEBUG(DEBUG_INFO,(__location__ " deferring ctdb_request_dmaster\n"));
630 if (ctdb_lmaster(ctdb, &key) != ctdb->pnn) {
631 DEBUG(DEBUG_ALERT,("pnn %u dmaster request to non-lmaster lmaster=%u gen=%u curgen=%u\n",
632 ctdb->pnn, ctdb_lmaster(ctdb, &key),
633 hdr->generation, ctdb->vnn_map->generation));
634 ctdb_fatal(ctdb, "ctdb_req_dmaster to non-lmaster");
637 DEBUG(DEBUG_DEBUG,("pnn %u dmaster request on %08x for %u from %u\n",
638 ctdb->pnn, ctdb_hash(&key), c->dmaster, c->hdr.srcnode));
640 /* its a protocol error if the sending node is not the current dmaster */
641 if (header.dmaster != hdr->srcnode) {
642 DEBUG(DEBUG_ALERT,("pnn %u dmaster request for new-dmaster %u from non-master %u real-dmaster=%u key %08x dbid 0x%08x gen=%u curgen=%u c->rsn=%llu header.rsn=%llu reqid=%u keyval=0x%08x\n",
643 ctdb->pnn, c->dmaster, hdr->srcnode, header.dmaster, ctdb_hash(&key),
644 ctdb_db->db_id, hdr->generation, ctdb_db->generation,
645 (unsigned long long)c->rsn, (unsigned long long)header.rsn, c->hdr.reqid,
646 (key.dsize >= 4)?(*(uint32_t *)key.dptr):0));
647 if (header.rsn != 0 || header.dmaster != ctdb->pnn) {
648 DEBUG(DEBUG_ERR,("ctdb_req_dmaster from non-master. Force a recovery.\n"));
650 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
651 ctdb_ltdb_unlock(ctdb_db, key);
656 if (header.rsn > c->rsn) {
657 DEBUG(DEBUG_ALERT,("pnn %u dmaster request with older RSN new-dmaster %u from %u real-dmaster=%u key %08x dbid 0x%08x gen=%u curgen=%u c->rsn=%llu header.rsn=%llu reqid=%u\n",
658 ctdb->pnn, c->dmaster, hdr->srcnode, header.dmaster, ctdb_hash(&key),
659 ctdb_db->db_id, hdr->generation, ctdb_db->generation,
660 (unsigned long long)c->rsn, (unsigned long long)header.rsn, c->hdr.reqid));
663 /* use the rsn from the sending node */
666 /* store the record flags from the sending node */
667 header.flags = record_flags;
669 /* check if the new dmaster is the lmaster, in which case we
670 skip the dmaster reply */
671 if (c->dmaster == ctdb->pnn) {
672 ctdb_become_dmaster(ctdb_db, hdr, key, data, c->rsn, record_flags);
674 ctdb_send_dmaster_reply(ctdb_db, &header, key, data, c->dmaster, hdr->reqid);
676 ret = ctdb_ltdb_unlock(ctdb_db, key);
678 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
683 static void ctdb_sticky_record_timeout(struct tevent_context *ev,
684 struct tevent_timer *te,
685 struct timeval t, void *private_data)
687 struct ctdb_sticky_record *sr = talloc_get_type(private_data,
688 struct ctdb_sticky_record);
692 static void *ctdb_make_sticky_record_callback(void *parm, void *data)
695 DEBUG(DEBUG_ERR,("Already have sticky record registered. Free old %p and create new %p\n", data, parm));
702 ctdb_make_record_sticky(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key)
704 TALLOC_CTX *tmp_ctx = talloc_new(NULL);
706 struct ctdb_sticky_record *sr;
708 k = ctdb_key_to_idkey(tmp_ctx, key);
710 DEBUG(DEBUG_ERR,("Failed to allocate key for sticky record\n"));
711 talloc_free(tmp_ctx);
715 sr = trbt_lookuparray32(ctdb_db->sticky_records, k[0], &k[0]);
717 talloc_free(tmp_ctx);
721 sr = talloc(ctdb_db->sticky_records, struct ctdb_sticky_record);
723 talloc_free(tmp_ctx);
724 DEBUG(DEBUG_ERR,("Failed to allocate sticky record structure\n"));
729 sr->ctdb_db = ctdb_db;
732 DEBUG(DEBUG_ERR,("Make record sticky for %d seconds in db %s key:0x%08x.\n",
733 ctdb->tunable.sticky_duration,
734 ctdb_db->db_name, ctdb_hash(&key)));
736 trbt_insertarray32_callback(ctdb_db->sticky_records, k[0], &k[0], ctdb_make_sticky_record_callback, sr);
738 tevent_add_timer(ctdb->ev, sr,
739 timeval_current_ofs(ctdb->tunable.sticky_duration, 0),
740 ctdb_sticky_record_timeout, sr);
742 talloc_free(tmp_ctx);
746 struct pinned_down_requeue_handle {
747 struct ctdb_context *ctdb;
748 struct ctdb_req_header *hdr;
751 struct pinned_down_deferred_call {
752 struct ctdb_context *ctdb;
753 struct ctdb_req_header *hdr;
756 static void pinned_down_requeue(struct tevent_context *ev,
757 struct tevent_timer *te,
758 struct timeval t, void *private_data)
760 struct pinned_down_requeue_handle *handle = talloc_get_type(private_data, struct pinned_down_requeue_handle);
761 struct ctdb_context *ctdb = handle->ctdb;
763 talloc_steal(ctdb, handle->hdr);
764 ctdb_call_input_pkt(ctdb, handle->hdr);
769 static int pinned_down_destructor(struct pinned_down_deferred_call *pinned_down)
771 struct ctdb_context *ctdb = pinned_down->ctdb;
772 struct pinned_down_requeue_handle *handle = talloc(ctdb, struct pinned_down_requeue_handle);
774 handle->ctdb = pinned_down->ctdb;
775 handle->hdr = pinned_down->hdr;
776 talloc_steal(handle, handle->hdr);
778 tevent_add_timer(ctdb->ev, handle, timeval_zero(),
779 pinned_down_requeue, handle);
785 ctdb_defer_pinned_down_request(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_req_header *hdr)
787 TALLOC_CTX *tmp_ctx = talloc_new(NULL);
789 struct ctdb_sticky_record *sr;
790 struct pinned_down_deferred_call *pinned_down;
792 k = ctdb_key_to_idkey(tmp_ctx, key);
794 DEBUG(DEBUG_ERR,("Failed to allocate key for sticky record\n"));
795 talloc_free(tmp_ctx);
799 sr = trbt_lookuparray32(ctdb_db->sticky_records, k[0], &k[0]);
801 talloc_free(tmp_ctx);
805 talloc_free(tmp_ctx);
807 if (sr->pindown == NULL) {
811 pinned_down = talloc(sr->pindown, struct pinned_down_deferred_call);
812 if (pinned_down == NULL) {
813 DEBUG(DEBUG_ERR,("Failed to allocate structure for deferred pinned down request\n"));
817 pinned_down->ctdb = ctdb;
818 pinned_down->hdr = hdr;
820 talloc_set_destructor(pinned_down, pinned_down_destructor);
821 talloc_steal(pinned_down, hdr);
827 ctdb_update_db_stat_hot_keys(struct ctdb_db_context *ctdb_db, TDB_DATA key, int hopcount)
831 /* smallest value is always at index 0 */
832 if (hopcount <= ctdb_db->statistics.hot_keys[0].count) {
836 /* see if we already know this key */
837 for (i = 0; i < MAX_HOT_KEYS; i++) {
838 if (key.dsize != ctdb_db->statistics.hot_keys[i].key.dsize) {
841 if (memcmp(key.dptr, ctdb_db->statistics.hot_keys[i].key.dptr, key.dsize)) {
844 /* found an entry for this key */
845 if (hopcount <= ctdb_db->statistics.hot_keys[i].count) {
848 ctdb_db->statistics.hot_keys[i].count = hopcount;
852 if (ctdb_db->statistics.num_hot_keys < MAX_HOT_KEYS) {
853 id = ctdb_db->statistics.num_hot_keys;
854 ctdb_db->statistics.num_hot_keys++;
859 if (ctdb_db->statistics.hot_keys[id].key.dptr != NULL) {
860 talloc_free(ctdb_db->statistics.hot_keys[id].key.dptr);
862 ctdb_db->statistics.hot_keys[id].key.dsize = key.dsize;
863 ctdb_db->statistics.hot_keys[id].key.dptr = talloc_memdup(ctdb_db, key.dptr, key.dsize);
864 ctdb_db->statistics.hot_keys[id].count = hopcount;
865 DEBUG(DEBUG_NOTICE,("Updated hot key database=%s key=0x%08x id=%d hop_count=%d\n",
866 ctdb_db->db_name, ctdb_hash(&key), id, hopcount));
869 for (i = 1; i < MAX_HOT_KEYS; i++) {
870 if (ctdb_db->statistics.hot_keys[i].count == 0) {
873 if (ctdb_db->statistics.hot_keys[i].count < ctdb_db->statistics.hot_keys[0].count) {
874 hopcount = ctdb_db->statistics.hot_keys[i].count;
875 ctdb_db->statistics.hot_keys[i].count = ctdb_db->statistics.hot_keys[0].count;
876 ctdb_db->statistics.hot_keys[0].count = hopcount;
878 key = ctdb_db->statistics.hot_keys[i].key;
879 ctdb_db->statistics.hot_keys[i].key = ctdb_db->statistics.hot_keys[0].key;
880 ctdb_db->statistics.hot_keys[0].key = key;
886 called when a CTDB_REQ_CALL packet comes in
888 void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
890 struct ctdb_req_call_old *c = (struct ctdb_req_call_old *)hdr;
892 struct ctdb_reply_call_old *r;
894 struct ctdb_ltdb_header header;
895 struct ctdb_call *call;
896 struct ctdb_db_context *ctdb_db;
897 int tmp_count, bucket;
899 if (ctdb->methods == NULL) {
900 DEBUG(DEBUG_INFO,(__location__ " Failed ctdb_request_call. Transport is DOWN\n"));
904 ctdb_db = find_ctdb_db(ctdb, c->db_id);
906 ctdb_send_error(ctdb, hdr, -1,
907 "Unknown database in request. db_id==0x%08x",
912 if (hdr->generation != ctdb_db->generation) {
914 ("ctdb operation %u request %u from node %u to %u had an"
915 " invalid generation:%u while our generation is:%u\n",
916 hdr->operation, hdr->reqid, hdr->srcnode, hdr->destnode,
917 hdr->generation, ctdb_db->generation));
921 call = talloc(hdr, struct ctdb_call);
922 CTDB_NO_MEMORY_FATAL(ctdb, call);
924 call->call_id = c->callid;
925 call->key.dptr = c->data;
926 call->key.dsize = c->keylen;
927 call->call_data.dptr = c->data + c->keylen;
928 call->call_data.dsize = c->calldatalen;
929 call->reply_data.dptr = NULL;
930 call->reply_data.dsize = 0;
933 /* If this record is pinned down we should defer the
934 request until the pindown times out
936 if (ctdb_db->sticky) {
937 if (ctdb_defer_pinned_down_request(ctdb, ctdb_db, call->key, hdr) == 0) {
939 ("Defer request for pinned down record in %s\n", ctdb_db->db_name));
945 if (dmaster_defer_add(ctdb_db, hdr, call->key) == 0) {
950 /* determine if we are the dmaster for this key. This also
951 fetches the record data (if any), thus avoiding a 2nd fetch of the data
952 if the call will be answered locally */
954 ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, call->key, &header, hdr, &data,
955 ctdb_call_input_pkt, ctdb, false);
957 ctdb_send_error(ctdb, hdr, ret, "ltdb fetch failed in ctdb_request_call");
962 DEBUG(DEBUG_INFO,(__location__ " deferred ctdb_request_call\n"));
967 /* Dont do READONLY if we dont have a tracking database */
968 if ((c->flags & CTDB_WANT_READONLY) && !ctdb_db->readonly) {
969 c->flags &= ~CTDB_WANT_READONLY;
972 if (header.flags & CTDB_REC_RO_REVOKE_COMPLETE) {
973 header.flags &= ~CTDB_REC_RO_FLAGS;
974 CTDB_INCREMENT_STAT(ctdb, total_ro_revokes);
975 CTDB_INCREMENT_DB_STAT(ctdb_db, db_ro_revokes);
976 if (ctdb_ltdb_store(ctdb_db, call->key, &header, data) != 0) {
977 ctdb_fatal(ctdb, "Failed to write header with cleared REVOKE flag");
979 /* and clear out the tracking data */
980 if (tdb_delete(ctdb_db->rottdb, call->key) != 0) {
981 DEBUG(DEBUG_ERR,(__location__ " Failed to clear out trackingdb record\n"));
985 /* if we are revoking, we must defer all other calls until the revoke
988 if (header.flags & CTDB_REC_RO_REVOKING_READONLY) {
989 talloc_free(data.dptr);
990 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
992 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, call->key, hdr, ctdb_call_input_pkt, ctdb) != 0) {
993 ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
1000 * If we are not the dmaster and are not hosting any delegations,
1001 * then we redirect the request to the node than can answer it
1002 * (the lmaster or the dmaster).
1004 if ((header.dmaster != ctdb->pnn)
1005 && (!(header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) ) {
1006 talloc_free(data.dptr);
1007 ctdb_call_send_redirect(ctdb, ctdb_db, call->key, c, &header);
1009 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
1011 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
1017 if ( (!(c->flags & CTDB_WANT_READONLY))
1018 && (header.flags & (CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY)) ) {
1019 header.flags |= CTDB_REC_RO_REVOKING_READONLY;
1020 if (ctdb_ltdb_store(ctdb_db, call->key, &header, data) != 0) {
1021 ctdb_fatal(ctdb, "Failed to store record with HAVE_DELEGATIONS set");
1023 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
1025 if (ctdb_start_revoke_ro_record(ctdb, ctdb_db, call->key, &header, data) != 0) {
1026 ctdb_fatal(ctdb, "Failed to start record revoke");
1028 talloc_free(data.dptr);
1030 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, call->key, hdr, ctdb_call_input_pkt, ctdb) != 0) {
1031 ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
1038 /* If this is the first request for delegation. bump rsn and set
1039 * the delegations flag
1041 if ((c->flags & CTDB_WANT_READONLY)
1042 && (c->callid == CTDB_FETCH_WITH_HEADER_FUNC)
1043 && (!(header.flags & CTDB_REC_RO_HAVE_DELEGATIONS))) {
1045 header.flags |= CTDB_REC_RO_HAVE_DELEGATIONS;
1046 if (ctdb_ltdb_store(ctdb_db, call->key, &header, data) != 0) {
1047 ctdb_fatal(ctdb, "Failed to store record with HAVE_DELEGATIONS set");
1050 if ((c->flags & CTDB_WANT_READONLY)
1051 && (call->call_id == CTDB_FETCH_WITH_HEADER_FUNC)) {
1054 tdata = tdb_fetch(ctdb_db->rottdb, call->key);
1055 if (ctdb_trackingdb_add_pnn(ctdb, &tdata, c->hdr.srcnode) != 0) {
1056 ctdb_fatal(ctdb, "Failed to add node to trackingdb");
1058 if (tdb_store(ctdb_db->rottdb, call->key, tdata, TDB_REPLACE) != 0) {
1059 ctdb_fatal(ctdb, "Failed to store trackingdb data");
1063 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
1065 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
1068 len = offsetof(struct ctdb_reply_call_old, data) + data.dsize + sizeof(struct ctdb_ltdb_header);
1069 r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REPLY_CALL, len,
1070 struct ctdb_reply_call_old);
1071 CTDB_NO_MEMORY_FATAL(ctdb, r);
1072 r->hdr.destnode = c->hdr.srcnode;
1073 r->hdr.reqid = c->hdr.reqid;
1074 r->hdr.generation = ctdb_db->generation;
1076 r->datalen = data.dsize + sizeof(struct ctdb_ltdb_header);
1078 header.flags |= CTDB_REC_RO_HAVE_READONLY;
1079 header.flags &= ~CTDB_REC_RO_HAVE_DELEGATIONS;
1080 memcpy(&r->data[0], &header, sizeof(struct ctdb_ltdb_header));
1083 memcpy(&r->data[sizeof(struct ctdb_ltdb_header)], data.dptr, data.dsize);
1086 ctdb_queue_packet(ctdb, &r->hdr);
1087 CTDB_INCREMENT_STAT(ctdb, total_ro_delegations);
1088 CTDB_INCREMENT_DB_STAT(ctdb_db, db_ro_delegations);
1095 CTDB_UPDATE_STAT(ctdb, max_hop_count, c->hopcount);
1096 tmp_count = c->hopcount;
1102 if (bucket >= MAX_COUNT_BUCKETS) {
1103 bucket = MAX_COUNT_BUCKETS - 1;
1105 CTDB_INCREMENT_STAT(ctdb, hop_count_bucket[bucket]);
1106 CTDB_INCREMENT_DB_STAT(ctdb_db, hop_count_bucket[bucket]);
1107 ctdb_update_db_stat_hot_keys(ctdb_db, call->key, c->hopcount);
1109 /* If this database supports sticky records, then check if the
1110 hopcount is big. If it is it means the record is hot and we
1111 should make it sticky.
1113 if (ctdb_db->sticky && c->hopcount >= ctdb->tunable.hopcount_make_sticky) {
1114 ctdb_make_record_sticky(ctdb, ctdb_db, call->key);
1118 /* Try if possible to migrate the record off to the caller node.
1119 * From the clients perspective a fetch of the data is just as
1120 * expensive as a migration.
1122 if (c->hdr.srcnode != ctdb->pnn) {
1123 if (ctdb_db->persistent_state) {
1124 DEBUG(DEBUG_INFO, (__location__ " refusing migration"
1125 " of key %s while transaction is active\n",
1126 (char *)call->key.dptr));
1128 DEBUG(DEBUG_DEBUG,("pnn %u starting migration of %08x to %u\n",
1129 ctdb->pnn, ctdb_hash(&(call->key)), c->hdr.srcnode));
1130 ctdb_call_send_dmaster(ctdb_db, c, &header, &(call->key), &data);
1131 talloc_free(data.dptr);
1133 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
1135 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
1142 ret = ctdb_call_local(ctdb_db, call, &header, hdr, &data, true);
1144 DEBUG(DEBUG_ERR,(__location__ " ctdb_call_local failed\n"));
1148 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
1150 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
1153 len = offsetof(struct ctdb_reply_call_old, data) + call->reply_data.dsize;
1154 r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REPLY_CALL, len,
1155 struct ctdb_reply_call_old);
1156 CTDB_NO_MEMORY_FATAL(ctdb, r);
1157 r->hdr.destnode = hdr->srcnode;
1158 r->hdr.reqid = hdr->reqid;
1159 r->hdr.generation = ctdb_db->generation;
1160 r->status = call->status;
1161 r->datalen = call->reply_data.dsize;
1162 if (call->reply_data.dsize) {
1163 memcpy(&r->data[0], call->reply_data.dptr, call->reply_data.dsize);
1166 ctdb_queue_packet(ctdb, &r->hdr);
1173 * called when a CTDB_REPLY_CALL packet comes in
1175 * This packet comes in response to a CTDB_REQ_CALL request packet. It
1176 * contains any reply data from the call
1178 void ctdb_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
1180 struct ctdb_reply_call_old *c = (struct ctdb_reply_call_old *)hdr;
1181 struct ctdb_call_state *state;
1183 state = reqid_find(ctdb->idr, hdr->reqid, struct ctdb_call_state);
1184 if (state == NULL) {
1185 DEBUG(DEBUG_ERR, (__location__ " reqid %u not found\n", hdr->reqid));
1189 if (hdr->reqid != state->reqid) {
1190 /* we found a record but it was the wrong one */
1191 DEBUG(DEBUG_ERR, ("Dropped orphaned call reply with reqid:%u\n",hdr->reqid));
1195 if (hdr->generation != state->generation) {
1197 ("ctdb operation %u request %u from node %u to %u had an"
1198 " invalid generation:%u while our generation is:%u\n",
1199 hdr->operation, hdr->reqid, hdr->srcnode, hdr->destnode,
1200 hdr->generation, state->generation));
1205 /* read only delegation processing */
1206 /* If we got a FETCH_WITH_HEADER we should check if this is a ro
1207 * delegation since we may need to update the record header
1209 if (state->c->callid == CTDB_FETCH_WITH_HEADER_FUNC) {
1210 struct ctdb_db_context *ctdb_db = state->ctdb_db;
1211 struct ctdb_ltdb_header *header = (struct ctdb_ltdb_header *)&c->data[0];
1212 struct ctdb_ltdb_header oldheader;
1213 TDB_DATA key, data, olddata;
1216 if (!(header->flags & CTDB_REC_RO_HAVE_READONLY)) {
1221 key.dsize = state->c->keylen;
1222 key.dptr = state->c->data;
1223 ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr,
1224 ctdb_call_input_pkt, ctdb, false);
1229 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock in ctdb_reply_call\n"));
1233 ret = ctdb_ltdb_fetch(ctdb_db, key, &oldheader, state, &olddata);
1235 DEBUG(DEBUG_ERR, ("Failed to fetch old record in ctdb_reply_call\n"));
1236 ctdb_ltdb_unlock(ctdb_db, key);
1240 if (header->rsn <= oldheader.rsn) {
1241 ctdb_ltdb_unlock(ctdb_db, key);
1245 if (c->datalen < sizeof(struct ctdb_ltdb_header)) {
1246 DEBUG(DEBUG_ERR,(__location__ " Got FETCH_WITH_HEADER reply with too little data: %d bytes\n", c->datalen));
1247 ctdb_ltdb_unlock(ctdb_db, key);
1251 data.dsize = c->datalen - sizeof(struct ctdb_ltdb_header);
1252 data.dptr = &c->data[sizeof(struct ctdb_ltdb_header)];
1253 ret = ctdb_ltdb_store(ctdb_db, key, header, data);
1255 DEBUG(DEBUG_ERR, ("Failed to store new record in ctdb_reply_call\n"));
1256 ctdb_ltdb_unlock(ctdb_db, key);
1260 ctdb_ltdb_unlock(ctdb_db, key);
1264 state->call->reply_data.dptr = c->data;
1265 state->call->reply_data.dsize = c->datalen;
1266 state->call->status = c->status;
1268 talloc_steal(state, c);
1270 state->state = CTDB_CALL_DONE;
1271 if (state->async.fn) {
1272 state->async.fn(state);
1278 * called when a CTDB_REPLY_DMASTER packet comes in
1280 * This packet comes in from the lmaster in response to a CTDB_REQ_CALL
1281 * request packet. It means that the current dmaster wants to give us
1284 void ctdb_reply_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
1286 struct ctdb_reply_dmaster *c = (struct ctdb_reply_dmaster *)hdr;
1287 struct ctdb_db_context *ctdb_db;
1289 uint32_t record_flags = 0;
1293 ctdb_db = find_ctdb_db(ctdb, c->db_id);
1294 if (ctdb_db == NULL) {
1295 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_reply_dmaster\n", c->db_id));
1299 if (hdr->generation != ctdb_db->generation) {
1301 ("ctdb operation %u request %u from node %u to %u had an"
1302 " invalid generation:%u while our generation is:%u\n",
1303 hdr->operation, hdr->reqid, hdr->srcnode, hdr->destnode,
1304 hdr->generation, ctdb_db->generation));
1309 key.dsize = c->keylen;
1310 data.dptr = &c->data[key.dsize];
1311 data.dsize = c->datalen;
1312 len = offsetof(struct ctdb_reply_dmaster, data) + key.dsize + data.dsize
1314 if (len <= c->hdr.length) {
1315 memcpy(&record_flags, &c->data[c->keylen + c->datalen],
1316 sizeof(record_flags));
1319 dmaster_defer_setup(ctdb_db, hdr, key);
1321 ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr,
1322 ctdb_call_input_pkt, ctdb, false);
1327 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock in ctdb_reply_dmaster\n"));
1331 ctdb_become_dmaster(ctdb_db, hdr, key, data, c->rsn, record_flags);
1336 called when a CTDB_REPLY_ERROR packet comes in
1338 void ctdb_reply_error(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
1340 struct ctdb_reply_error *c = (struct ctdb_reply_error *)hdr;
1341 struct ctdb_call_state *state;
1343 state = reqid_find(ctdb->idr, hdr->reqid, struct ctdb_call_state);
1344 if (state == NULL) {
1345 DEBUG(DEBUG_ERR,("pnn %u Invalid reqid %u in ctdb_reply_error\n",
1346 ctdb->pnn, hdr->reqid));
1350 if (hdr->reqid != state->reqid) {
1351 /* we found a record but it was the wrong one */
1352 DEBUG(DEBUG_ERR, ("Dropped orphaned error reply with reqid:%u\n",hdr->reqid));
1356 talloc_steal(state, c);
1358 state->state = CTDB_CALL_ERROR;
1359 state->errmsg = (char *)c->msg;
1360 if (state->async.fn) {
1361 state->async.fn(state);
1369 static int ctdb_call_destructor(struct ctdb_call_state *state)
1371 DLIST_REMOVE(state->ctdb_db->pending_calls, state);
1372 reqid_remove(state->ctdb_db->ctdb->idr, state->reqid);
1378 called when a ctdb_call needs to be resent after a reconfigure event
1380 static void ctdb_call_resend(struct ctdb_call_state *state)
1382 struct ctdb_context *ctdb = state->ctdb_db->ctdb;
1384 state->generation = state->ctdb_db->generation;
1386 /* use a new reqid, in case the old reply does eventually come in */
1387 reqid_remove(ctdb->idr, state->reqid);
1388 state->reqid = reqid_new(ctdb->idr, state);
1389 state->c->hdr.reqid = state->reqid;
1391 /* update the generation count for this request, so its valid with the new vnn_map */
1392 state->c->hdr.generation = state->generation;
1394 /* send the packet to ourselves, it will be redirected appropriately */
1395 state->c->hdr.destnode = ctdb->pnn;
1397 ctdb_queue_packet(ctdb, &state->c->hdr);
1398 DEBUG(DEBUG_NOTICE,("resent ctdb_call for db %s reqid %u generation %u\n",
1399 state->ctdb_db->db_name, state->reqid, state->generation));
1403 resend all pending calls on recovery
1405 void ctdb_call_resend_db(struct ctdb_db_context *ctdb_db)
1407 struct ctdb_call_state *state, *next;
1409 for (state = ctdb_db->pending_calls; state; state = next) {
1411 ctdb_call_resend(state);
1415 void ctdb_call_resend_all(struct ctdb_context *ctdb)
1417 struct ctdb_db_context *ctdb_db;
1419 for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
1420 ctdb_call_resend_db(ctdb_db);
1425 this allows the caller to setup a async.fn
1427 static void call_local_trigger(struct tevent_context *ev,
1428 struct tevent_timer *te,
1429 struct timeval t, void *private_data)
1431 struct ctdb_call_state *state = talloc_get_type(private_data, struct ctdb_call_state);
1432 if (state->async.fn) {
1433 state->async.fn(state);
1439 construct an event driven local ctdb_call
1441 this is used so that locally processed ctdb_call requests are processed
1442 in an event driven manner
1444 struct ctdb_call_state *ctdb_call_local_send(struct ctdb_db_context *ctdb_db,
1445 struct ctdb_call *call,
1446 struct ctdb_ltdb_header *header,
1449 struct ctdb_call_state *state;
1450 struct ctdb_context *ctdb = ctdb_db->ctdb;
1453 state = talloc_zero(ctdb_db, struct ctdb_call_state);
1454 CTDB_NO_MEMORY_NULL(ctdb, state);
1456 talloc_steal(state, data->dptr);
1458 state->state = CTDB_CALL_DONE;
1459 state->call = talloc(state, struct ctdb_call);
1460 CTDB_NO_MEMORY_NULL(ctdb, state->call);
1461 *(state->call) = *call;
1462 state->ctdb_db = ctdb_db;
1464 ret = ctdb_call_local(ctdb_db, state->call, header, state, data, true);
1466 DEBUG(DEBUG_DEBUG,("ctdb_call_local() failed, ignoring return code %d\n", ret));
1469 tevent_add_timer(ctdb->ev, state, timeval_zero(),
1470 call_local_trigger, state);
1477 make a remote ctdb call - async send. Called in daemon context.
1479 This constructs a ctdb_call request and queues it for processing.
1480 This call never blocks.
1482 struct ctdb_call_state *ctdb_daemon_call_send_remote(struct ctdb_db_context *ctdb_db,
1483 struct ctdb_call *call,
1484 struct ctdb_ltdb_header *header)
1487 struct ctdb_call_state *state;
1488 struct ctdb_context *ctdb = ctdb_db->ctdb;
1490 if (ctdb->methods == NULL) {
1491 DEBUG(DEBUG_INFO,(__location__ " Failed send packet. Transport is down\n"));
1495 state = talloc_zero(ctdb_db, struct ctdb_call_state);
1496 CTDB_NO_MEMORY_NULL(ctdb, state);
1497 state->call = talloc(state, struct ctdb_call);
1498 CTDB_NO_MEMORY_NULL(ctdb, state->call);
1500 state->reqid = reqid_new(ctdb->idr, state);
1501 state->ctdb_db = ctdb_db;
1502 talloc_set_destructor(state, ctdb_call_destructor);
1504 len = offsetof(struct ctdb_req_call_old, data) + call->key.dsize + call->call_data.dsize;
1505 state->c = ctdb_transport_allocate(ctdb, state, CTDB_REQ_CALL, len,
1506 struct ctdb_req_call_old);
1507 CTDB_NO_MEMORY_NULL(ctdb, state->c);
1508 state->c->hdr.destnode = header->dmaster;
1510 /* this limits us to 16k outstanding messages - not unreasonable */
1511 state->c->hdr.reqid = state->reqid;
1512 state->c->hdr.generation = ctdb_db->generation;
1513 state->c->flags = call->flags;
1514 state->c->db_id = ctdb_db->db_id;
1515 state->c->callid = call->call_id;
1516 state->c->hopcount = 0;
1517 state->c->keylen = call->key.dsize;
1518 state->c->calldatalen = call->call_data.dsize;
1519 memcpy(&state->c->data[0], call->key.dptr, call->key.dsize);
1520 memcpy(&state->c->data[call->key.dsize],
1521 call->call_data.dptr, call->call_data.dsize);
1522 *(state->call) = *call;
1523 state->call->call_data.dptr = &state->c->data[call->key.dsize];
1524 state->call->key.dptr = &state->c->data[0];
1526 state->state = CTDB_CALL_WAIT;
1527 state->generation = ctdb_db->generation;
1529 DLIST_ADD(ctdb_db->pending_calls, state);
1531 ctdb_queue_packet(ctdb, &state->c->hdr);
1537 make a remote ctdb call - async recv - called in daemon context
1539 This is called when the program wants to wait for a ctdb_call to complete and get the
1540 results. This call will block unless the call has already completed.
1542 int ctdb_daemon_call_recv(struct ctdb_call_state *state, struct ctdb_call *call)
1544 while (state->state < CTDB_CALL_DONE) {
1545 tevent_loop_once(state->ctdb_db->ctdb->ev);
1547 if (state->state != CTDB_CALL_DONE) {
1548 ctdb_set_error(state->ctdb_db->ctdb, "%s", state->errmsg);
1553 if (state->call->reply_data.dsize) {
1554 call->reply_data.dptr = talloc_memdup(call,
1555 state->call->reply_data.dptr,
1556 state->call->reply_data.dsize);
1557 call->reply_data.dsize = state->call->reply_data.dsize;
1559 call->reply_data.dptr = NULL;
1560 call->reply_data.dsize = 0;
1562 call->status = state->call->status;
1569 send a keepalive packet to the other node
1571 void ctdb_send_keepalive(struct ctdb_context *ctdb, uint32_t destnode)
1573 struct ctdb_req_keepalive *r;
1575 if (ctdb->methods == NULL) {
1576 DEBUG(DEBUG_INFO,(__location__ " Failed to send keepalive. Transport is DOWN\n"));
1580 r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_KEEPALIVE,
1581 sizeof(struct ctdb_req_keepalive),
1582 struct ctdb_req_keepalive);
1583 CTDB_NO_MEMORY_FATAL(ctdb, r);
1584 r->hdr.destnode = destnode;
1587 CTDB_INCREMENT_STAT(ctdb, keepalive_packets_sent);
1589 ctdb_queue_packet(ctdb, &r->hdr);
1596 struct revokechild_deferred_call {
1597 struct ctdb_context *ctdb;
1598 struct ctdb_req_header *hdr;
1599 deferred_requeue_fn fn;
1603 struct revokechild_handle {
1604 struct revokechild_handle *next, *prev;
1605 struct ctdb_context *ctdb;
1606 struct ctdb_db_context *ctdb_db;
1607 struct tevent_fd *fde;
1614 struct revokechild_requeue_handle {
1615 struct ctdb_context *ctdb;
1616 struct ctdb_req_header *hdr;
1617 deferred_requeue_fn fn;
1621 static void deferred_call_requeue(struct tevent_context *ev,
1622 struct tevent_timer *te,
1623 struct timeval t, void *private_data)
1625 struct revokechild_requeue_handle *requeue_handle = talloc_get_type(private_data, struct revokechild_requeue_handle);
1627 requeue_handle->fn(requeue_handle->ctx, requeue_handle->hdr);
1628 talloc_free(requeue_handle);
1631 static int deferred_call_destructor(struct revokechild_deferred_call *deferred_call)
1633 struct ctdb_context *ctdb = deferred_call->ctdb;
1634 struct revokechild_requeue_handle *requeue_handle = talloc(ctdb, struct revokechild_requeue_handle);
1635 struct ctdb_req_call_old *c = (struct ctdb_req_call_old *)deferred_call->hdr;
1637 requeue_handle->ctdb = ctdb;
1638 requeue_handle->hdr = deferred_call->hdr;
1639 requeue_handle->fn = deferred_call->fn;
1640 requeue_handle->ctx = deferred_call->ctx;
1641 talloc_steal(requeue_handle, requeue_handle->hdr);
1643 /* when revoking, any READONLY requests have 1 second grace to let read/write finish first */
1644 tevent_add_timer(ctdb->ev, requeue_handle,
1645 timeval_current_ofs(c->flags & CTDB_WANT_READONLY ? 1 : 0, 0),
1646 deferred_call_requeue, requeue_handle);
1652 static int revokechild_destructor(struct revokechild_handle *rc)
1654 if (rc->fde != NULL) {
1655 talloc_free(rc->fde);
1658 if (rc->fd[0] != -1) {
1661 if (rc->fd[1] != -1) {
1664 ctdb_kill(rc->ctdb, rc->child, SIGKILL);
1666 DLIST_REMOVE(rc->ctdb_db->revokechild_active, rc);
1670 static void revokechild_handler(struct tevent_context *ev,
1671 struct tevent_fd *fde,
1672 uint16_t flags, void *private_data)
1674 struct revokechild_handle *rc = talloc_get_type(private_data,
1675 struct revokechild_handle);
1679 ret = sys_read(rc->fd[0], &c, 1);
1681 DEBUG(DEBUG_ERR,("Failed to read status from revokechild. errno:%d\n", errno));
1687 DEBUG(DEBUG_ERR,("revokechild returned failure. status:%d\n", c));
1696 struct ctdb_revoke_state {
1697 struct ctdb_db_context *ctdb_db;
1699 struct ctdb_ltdb_header *header;
1706 static void update_record_cb(struct ctdb_client_control_state *state)
1708 struct ctdb_revoke_state *revoke_state;
1712 if (state == NULL) {
1715 revoke_state = state->async.private_data;
1717 state->async.fn = NULL;
1718 ret = ctdb_control_recv(state->ctdb, state, state, NULL, &res, NULL);
1719 if ((ret != 0) || (res != 0)) {
1720 DEBUG(DEBUG_ERR,("Recv for revoke update record failed ret:%d res:%d\n", ret, res));
1721 revoke_state->status = -1;
1724 revoke_state->count--;
1725 if (revoke_state->count <= 0) {
1726 revoke_state->finished = 1;
1730 static void revoke_send_cb(struct ctdb_context *ctdb, uint32_t pnn, void *private_data)
1732 struct ctdb_revoke_state *revoke_state = private_data;
1733 struct ctdb_client_control_state *state;
1735 state = ctdb_ctrl_updaterecord_send(ctdb, revoke_state, timeval_current_ofs(ctdb->tunable.control_timeout,0), pnn, revoke_state->ctdb_db, revoke_state->key, revoke_state->header, revoke_state->data);
1736 if (state == NULL) {
1737 DEBUG(DEBUG_ERR,("Failure to send update record to revoke readonly delegation\n"));
1738 revoke_state->status = -1;
1741 state->async.fn = update_record_cb;
1742 state->async.private_data = revoke_state;
1744 revoke_state->count++;
1748 static void ctdb_revoke_timeout_handler(struct tevent_context *ev,
1749 struct tevent_timer *te,
1750 struct timeval yt, void *private_data)
1752 struct ctdb_revoke_state *state = private_data;
1754 DEBUG(DEBUG_ERR,("Timed out waiting for revoke to finish\n"));
1755 state->finished = 1;
1759 static int ctdb_revoke_all_delegations(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA tdata, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
1761 struct ctdb_revoke_state *state = talloc_zero(ctdb, struct ctdb_revoke_state);
1762 struct ctdb_ltdb_header new_header;
1765 state->ctdb_db = ctdb_db;
1767 state->header = header;
1770 ctdb_trackingdb_traverse(ctdb, tdata, revoke_send_cb, state);
1772 tevent_add_timer(ctdb->ev, state,
1773 timeval_current_ofs(ctdb->tunable.control_timeout, 0),
1774 ctdb_revoke_timeout_handler, state);
1776 while (state->finished == 0) {
1777 tevent_loop_once(ctdb->ev);
1780 if (ctdb_ltdb_lock(ctdb_db, key) != 0) {
1781 DEBUG(DEBUG_ERR,("Failed to chainlock the database in revokechild\n"));
1785 if (ctdb_ltdb_fetch(ctdb_db, key, &new_header, state, &new_data) != 0) {
1786 ctdb_ltdb_unlock(ctdb_db, key);
1787 DEBUG(DEBUG_ERR,("Failed for fetch tdb record in revokechild\n"));
1792 if (new_header.rsn > header->rsn) {
1793 ctdb_ltdb_unlock(ctdb_db, key);
1794 DEBUG(DEBUG_ERR,("RSN too high in tdb record in revokechild\n"));
1798 if ( (new_header.flags & (CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_HAVE_DELEGATIONS)) != (CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_HAVE_DELEGATIONS) ) {
1799 ctdb_ltdb_unlock(ctdb_db, key);
1800 DEBUG(DEBUG_ERR,("Flags are wrong in tdb record in revokechild\n"));
1806 * If revoke on all nodes succeed, revoke is complete. Otherwise,
1807 * remove CTDB_REC_RO_REVOKING_READONLY flag and retry.
1809 if (state->status == 0) {
1811 new_header.flags |= CTDB_REC_RO_REVOKE_COMPLETE;
1813 DEBUG(DEBUG_NOTICE, ("Revoke all delegations failed, retrying.\n"));
1814 new_header.flags &= ~CTDB_REC_RO_REVOKING_READONLY;
1816 if (ctdb_ltdb_store(ctdb_db, key, &new_header, new_data) != 0) {
1817 ctdb_ltdb_unlock(ctdb_db, key);
1818 DEBUG(DEBUG_ERR,("Failed to write new record in revokechild\n"));
1822 ctdb_ltdb_unlock(ctdb_db, key);
1829 int ctdb_start_revoke_ro_record(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
1832 struct revokechild_handle *rc;
1833 pid_t parent = getpid();
1836 header->flags &= ~(CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY);
1837 header->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
1840 if ((rc = talloc_zero(ctdb_db, struct revokechild_handle)) == NULL) {
1841 DEBUG(DEBUG_ERR,("Failed to allocate revokechild_handle\n"));
1845 tdata = tdb_fetch(ctdb_db->rottdb, key);
1846 if (tdata.dsize > 0) {
1850 tdata.dptr = talloc_memdup(rc, tdata.dptr, tdata.dsize);
1856 rc->ctdb_db = ctdb_db;
1860 talloc_set_destructor(rc, revokechild_destructor);
1862 rc->key.dsize = key.dsize;
1863 rc->key.dptr = talloc_memdup(rc, key.dptr, key.dsize);
1864 if (rc->key.dptr == NULL) {
1865 DEBUG(DEBUG_ERR,("Failed to allocate key for revokechild_handle\n"));
1872 DEBUG(DEBUG_ERR,("Failed to allocate key for revokechild_handle\n"));
1878 rc->child = ctdb_fork(ctdb);
1879 if (rc->child == (pid_t)-1) {
1880 DEBUG(DEBUG_ERR,("Failed to fork child for revokechild\n"));
1885 if (rc->child == 0) {
1888 debug_extra = talloc_asprintf(NULL, "revokechild-%s:", ctdb_db->db_name);
1890 ctdb_set_process_name("ctdb_revokechild");
1891 if (switch_from_server_to_client(ctdb, "revokechild-%s", ctdb_db->db_name) != 0) {
1892 DEBUG(DEBUG_ERR,("Failed to switch from server to client for revokechild process\n"));
1894 goto child_finished;
1897 c = ctdb_revoke_all_delegations(ctdb, ctdb_db, tdata, key, header, data);
1900 sys_write(rc->fd[1], &c, 1);
1901 /* make sure we die when our parent dies */
1902 while (ctdb_kill(ctdb, parent, 0) == 0 || errno != ESRCH) {
1910 set_close_on_exec(rc->fd[0]);
1912 /* This is an active revokechild child process */
1913 DLIST_ADD_END(ctdb_db->revokechild_active, rc, NULL);
1915 rc->fde = tevent_add_fd(ctdb->ev, rc, rc->fd[0], TEVENT_FD_READ,
1916 revokechild_handler, (void *)rc);
1917 if (rc->fde == NULL) {
1918 DEBUG(DEBUG_ERR,("Failed to set up fd event for revokechild process\n"));
1921 tevent_fd_set_auto_close(rc->fde);
1926 int ctdb_add_revoke_deferred_call(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_req_header *hdr, deferred_requeue_fn fn, void *call_context)
1928 struct revokechild_handle *rc;
1929 struct revokechild_deferred_call *deferred_call;
1931 for (rc = ctdb_db->revokechild_active; rc; rc = rc->next) {
1932 if (rc->key.dsize == 0) {
1935 if (rc->key.dsize != key.dsize) {
1938 if (!memcmp(rc->key.dptr, key.dptr, key.dsize)) {
1944 DEBUG(DEBUG_ERR,("Failed to add deferred call to revoke list. revoke structure not found\n"));
1948 deferred_call = talloc(rc, struct revokechild_deferred_call);
1949 if (deferred_call == NULL) {
1950 DEBUG(DEBUG_ERR,("Failed to allocate deferred call structure for revoking record\n"));
1954 deferred_call->ctdb = ctdb;
1955 deferred_call->hdr = hdr;
1956 deferred_call->fn = fn;
1957 deferred_call->ctx = call_context;
1959 talloc_set_destructor(deferred_call, deferred_call_destructor);
1960 talloc_steal(deferred_call, hdr);