2 ctdb_call protocol code
4 Copyright (C) Andrew Tridgell 2006
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
20 see http://wiki.samba.org/index.php/Samba_%26_Clustering for
21 protocol design and packet details
25 #include "lib/util/dlinklist.h"
26 #include "system/network.h"
27 #include "system/filesys.h"
28 #include "../include/ctdb_private.h"
29 #include "../common/rb_tree.h"
30 #include "common/reqid.h"
32 struct ctdb_sticky_record {
33 struct ctdb_context *ctdb;
34 struct ctdb_db_context *ctdb_db;
39 find the ctdb_db from a db index
41 struct ctdb_db_context *find_ctdb_db(struct ctdb_context *ctdb, uint32_t id)
43 struct ctdb_db_context *ctdb_db;
45 for (ctdb_db=ctdb->db_list; ctdb_db; ctdb_db=ctdb_db->next) {
46 if (ctdb_db->db_id == id) {
54 a varient of input packet that can be used in lock requeue
56 static void ctdb_call_input_pkt(void *p, struct ctdb_req_header *hdr)
58 struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
59 ctdb_input_pkt(ctdb, hdr);
66 static void ctdb_send_error(struct ctdb_context *ctdb,
67 struct ctdb_req_header *hdr, uint32_t status,
68 const char *fmt, ...) PRINTF_ATTRIBUTE(4,5);
69 static void ctdb_send_error(struct ctdb_context *ctdb,
70 struct ctdb_req_header *hdr, uint32_t status,
74 struct ctdb_reply_error *r;
78 if (ctdb->methods == NULL) {
79 DEBUG(DEBUG_INFO,(__location__ " Failed to send error. Transport is DOWN\n"));
84 msg = talloc_vasprintf(ctdb, fmt, ap);
86 ctdb_fatal(ctdb, "Unable to allocate error in ctdb_send_error\n");
90 msglen = strlen(msg)+1;
91 len = offsetof(struct ctdb_reply_error, msg);
92 r = ctdb_transport_allocate(ctdb, msg, CTDB_REPLY_ERROR, len + msglen,
93 struct ctdb_reply_error);
94 CTDB_NO_MEMORY_FATAL(ctdb, r);
96 r->hdr.destnode = hdr->srcnode;
97 r->hdr.reqid = hdr->reqid;
100 memcpy(&r->msg[0], msg, msglen);
102 ctdb_queue_packet(ctdb, &r->hdr);
109 * send a redirect reply
111 * The logic behind this function is this:
113 * A client wants to grab a record and sends a CTDB_REQ_CALL packet
114 * to its local ctdb (ctdb_request_call). If the node is not itself
115 * the record's DMASTER, it first redirects the packet to the
116 * record's LMASTER. The LMASTER then redirects the call packet to
117 * the current DMASTER. Note that this works because of this: When
118 * a record is migrated off a node, then the new DMASTER is stored
119 * in the record's copy on the former DMASTER.
121 static void ctdb_call_send_redirect(struct ctdb_context *ctdb,
122 struct ctdb_db_context *ctdb_db,
124 struct ctdb_req_call *c,
125 struct ctdb_ltdb_header *header)
127 uint32_t lmaster = ctdb_lmaster(ctdb, &key);
129 c->hdr.destnode = lmaster;
130 if (ctdb->pnn == lmaster) {
131 c->hdr.destnode = header->dmaster;
135 if (c->hopcount%100 > 95) {
136 DEBUG(DEBUG_WARNING,("High hopcount %d dbid:%s "
137 "key:0x%08x reqid=%08x pnn:%d src:%d lmaster:%d "
138 "header->dmaster:%d dst:%d\n",
139 c->hopcount, ctdb_db->db_name, ctdb_hash(&key),
140 c->hdr.reqid, ctdb->pnn, c->hdr.srcnode, lmaster,
141 header->dmaster, c->hdr.destnode));
144 ctdb_queue_packet(ctdb, &c->hdr);
151 caller must have the chainlock before calling this routine. Caller must be
154 static void ctdb_send_dmaster_reply(struct ctdb_db_context *ctdb_db,
155 struct ctdb_ltdb_header *header,
156 TDB_DATA key, TDB_DATA data,
157 uint32_t new_dmaster,
160 struct ctdb_context *ctdb = ctdb_db->ctdb;
161 struct ctdb_reply_dmaster *r;
165 if (ctdb->pnn != ctdb_lmaster(ctdb, &key)) {
166 DEBUG(DEBUG_ALERT,(__location__ " Caller is not lmaster!\n"));
170 header->dmaster = new_dmaster;
171 ret = ctdb_ltdb_store(ctdb_db, key, header, data);
173 ctdb_fatal(ctdb, "ctdb_send_dmaster_reply unable to update dmaster");
177 if (ctdb->methods == NULL) {
178 ctdb_fatal(ctdb, "ctdb_send_dmaster_reply cant update dmaster since transport is down");
182 /* put the packet on a temporary context, allowing us to safely free
183 it below even if ctdb_reply_dmaster() has freed it already */
184 tmp_ctx = talloc_new(ctdb);
186 /* send the CTDB_REPLY_DMASTER */
187 len = offsetof(struct ctdb_reply_dmaster, data) + key.dsize + data.dsize + sizeof(uint32_t);
188 r = ctdb_transport_allocate(ctdb, tmp_ctx, CTDB_REPLY_DMASTER, len,
189 struct ctdb_reply_dmaster);
190 CTDB_NO_MEMORY_FATAL(ctdb, r);
192 r->hdr.destnode = new_dmaster;
193 r->hdr.reqid = reqid;
194 r->hdr.generation = ctdb_db->generation;
195 r->rsn = header->rsn;
196 r->keylen = key.dsize;
197 r->datalen = data.dsize;
198 r->db_id = ctdb_db->db_id;
199 memcpy(&r->data[0], key.dptr, key.dsize);
200 memcpy(&r->data[key.dsize], data.dptr, data.dsize);
201 memcpy(&r->data[key.dsize+data.dsize], &header->flags, sizeof(uint32_t));
203 ctdb_queue_packet(ctdb, &r->hdr);
205 talloc_free(tmp_ctx);
209 send a dmaster request (give another node the dmaster for a record)
211 This is always sent to the lmaster, which ensures that the lmaster
212 always knows who the dmaster is. The lmaster will then send a
213 CTDB_REPLY_DMASTER to the new dmaster
215 static void ctdb_call_send_dmaster(struct ctdb_db_context *ctdb_db,
216 struct ctdb_req_call *c,
217 struct ctdb_ltdb_header *header,
218 TDB_DATA *key, TDB_DATA *data)
220 struct ctdb_req_dmaster *r;
221 struct ctdb_context *ctdb = ctdb_db->ctdb;
223 uint32_t lmaster = ctdb_lmaster(ctdb, key);
225 if (ctdb->methods == NULL) {
226 ctdb_fatal(ctdb, "Failed ctdb_call_send_dmaster since transport is down");
230 if (data->dsize != 0) {
231 header->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
234 if (lmaster == ctdb->pnn) {
235 ctdb_send_dmaster_reply(ctdb_db, header, *key, *data,
236 c->hdr.srcnode, c->hdr.reqid);
240 len = offsetof(struct ctdb_req_dmaster, data) + key->dsize + data->dsize
242 r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_DMASTER, len,
243 struct ctdb_req_dmaster);
244 CTDB_NO_MEMORY_FATAL(ctdb, r);
245 r->hdr.destnode = lmaster;
246 r->hdr.reqid = c->hdr.reqid;
247 r->hdr.generation = ctdb_db->generation;
249 r->rsn = header->rsn;
250 r->dmaster = c->hdr.srcnode;
251 r->keylen = key->dsize;
252 r->datalen = data->dsize;
253 memcpy(&r->data[0], key->dptr, key->dsize);
254 memcpy(&r->data[key->dsize], data->dptr, data->dsize);
255 memcpy(&r->data[key->dsize + data->dsize], &header->flags, sizeof(uint32_t));
257 header->dmaster = c->hdr.srcnode;
258 if (ctdb_ltdb_store(ctdb_db, *key, header, *data) != 0) {
259 ctdb_fatal(ctdb, "Failed to store record in ctdb_call_send_dmaster");
262 ctdb_queue_packet(ctdb, &r->hdr);
267 static void ctdb_sticky_pindown_timeout(struct event_context *ev, struct timed_event *te,
268 struct timeval t, void *private_data)
270 struct ctdb_sticky_record *sr = talloc_get_type(private_data,
271 struct ctdb_sticky_record);
273 DEBUG(DEBUG_ERR,("Pindown timeout db:%s unstick record\n", sr->ctdb_db->db_name));
274 if (sr->pindown != NULL) {
275 talloc_free(sr->pindown);
281 ctdb_set_sticky_pindown(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key)
283 TALLOC_CTX *tmp_ctx = talloc_new(NULL);
285 struct ctdb_sticky_record *sr;
287 k = ctdb_key_to_idkey(tmp_ctx, key);
289 DEBUG(DEBUG_ERR,("Failed to allocate key for sticky record\n"));
290 talloc_free(tmp_ctx);
294 sr = trbt_lookuparray32(ctdb_db->sticky_records, k[0], &k[0]);
296 talloc_free(tmp_ctx);
300 talloc_free(tmp_ctx);
302 if (sr->pindown == NULL) {
303 DEBUG(DEBUG_ERR,("Pinning down record in %s for %d ms\n", ctdb_db->db_name, ctdb->tunable.sticky_pindown));
304 sr->pindown = talloc_new(sr);
305 if (sr->pindown == NULL) {
306 DEBUG(DEBUG_ERR,("Failed to allocate pindown context for sticky record\n"));
309 event_add_timed(ctdb->ev, sr->pindown, timeval_current_ofs(ctdb->tunable.sticky_pindown / 1000, (ctdb->tunable.sticky_pindown * 1000) % 1000000), ctdb_sticky_pindown_timeout, sr);
316 called when a CTDB_REPLY_DMASTER packet comes in, or when the lmaster
317 gets a CTDB_REQUEST_DMASTER for itself. We become the dmaster.
319 must be called with the chainlock held. This function releases the chainlock
321 static void ctdb_become_dmaster(struct ctdb_db_context *ctdb_db,
322 struct ctdb_req_header *hdr,
323 TDB_DATA key, TDB_DATA data,
324 uint64_t rsn, uint32_t record_flags)
326 struct ctdb_call_state *state;
327 struct ctdb_context *ctdb = ctdb_db->ctdb;
328 struct ctdb_ltdb_header header;
331 DEBUG(DEBUG_DEBUG,("pnn %u dmaster response %08x\n", ctdb->pnn, ctdb_hash(&key)));
335 header.dmaster = ctdb->pnn;
336 header.flags = record_flags;
338 state = reqid_find(ctdb->idr, hdr->reqid, struct ctdb_call_state);
341 if (state->call->flags & CTDB_CALL_FLAG_VACUUM_MIGRATION) {
343 * We temporarily add the VACUUM_MIGRATED flag to
344 * the record flags, so that ctdb_ltdb_store can
345 * decide whether the record should be stored or
348 header.flags |= CTDB_REC_FLAG_VACUUM_MIGRATED;
352 if (ctdb_ltdb_store(ctdb_db, key, &header, data) != 0) {
353 ctdb_fatal(ctdb, "ctdb_reply_dmaster store failed\n");
355 ret = ctdb_ltdb_unlock(ctdb_db, key);
357 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
362 /* we just became DMASTER and this database is "sticky",
363 see if the record is flagged as "hot" and set up a pin-down
364 context to stop migrations for a little while if so
366 if (ctdb_db->sticky) {
367 ctdb_set_sticky_pindown(ctdb, ctdb_db, key);
371 DEBUG(DEBUG_ERR,("pnn %u Invalid reqid %u in ctdb_become_dmaster from node %u\n",
372 ctdb->pnn, hdr->reqid, hdr->srcnode));
374 ret = ctdb_ltdb_unlock(ctdb_db, key);
376 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
381 if (key.dsize != state->call->key.dsize || memcmp(key.dptr, state->call->key.dptr, key.dsize)) {
382 DEBUG(DEBUG_ERR, ("Got bogus DMASTER packet reqid:%u from node %u. Key does not match key held in matching idr.\n", hdr->reqid, hdr->srcnode));
384 ret = ctdb_ltdb_unlock(ctdb_db, key);
386 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
391 if (hdr->reqid != state->reqid) {
392 /* we found a record but it was the wrong one */
393 DEBUG(DEBUG_ERR, ("Dropped orphan in ctdb_become_dmaster with reqid:%u\n from node %u", hdr->reqid, hdr->srcnode));
395 ret = ctdb_ltdb_unlock(ctdb_db, key);
397 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
402 ctdb_call_local(ctdb_db, state->call, &header, state, &data, true);
404 ret = ctdb_ltdb_unlock(ctdb_db, state->call->key);
406 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
409 state->state = CTDB_CALL_DONE;
410 if (state->async.fn) {
411 state->async.fn(state);
415 struct dmaster_defer_call {
416 struct dmaster_defer_call *next, *prev;
417 struct ctdb_context *ctdb;
418 struct ctdb_req_header *hdr;
421 struct dmaster_defer_queue {
422 struct ctdb_db_context *ctdb_db;
424 struct dmaster_defer_call *deferred_calls;
427 static void dmaster_defer_reprocess(struct tevent_context *ev,
428 struct tevent_timer *te,
432 struct dmaster_defer_call *call = talloc_get_type(
433 private_data, struct dmaster_defer_call);
435 ctdb_input_pkt(call->ctdb, call->hdr);
439 static int dmaster_defer_queue_destructor(struct dmaster_defer_queue *ddq)
441 /* Ignore requests, if database recovery happens in-between. */
442 if (ddq->generation != ddq->ctdb_db->generation) {
446 while (ddq->deferred_calls != NULL) {
447 struct dmaster_defer_call *call = ddq->deferred_calls;
449 DLIST_REMOVE(ddq->deferred_calls, call);
451 talloc_steal(call->ctdb, call);
452 tevent_add_timer(call->ctdb->ev, call, timeval_zero(),
453 dmaster_defer_reprocess, call);
458 static void *insert_ddq_callback(void *parm, void *data)
467 * This function is used to reigster a key in database that needs to be updated.
468 * Any requests for that key should get deferred till this is completed.
470 static int dmaster_defer_setup(struct ctdb_db_context *ctdb_db,
471 struct ctdb_req_header *hdr,
475 struct dmaster_defer_queue *ddq;
477 k = ctdb_key_to_idkey(hdr, key);
479 DEBUG(DEBUG_ERR, ("Failed to allocate key for dmaster defer setup\n"));
484 ddq = trbt_lookuparray32(ctdb_db->defer_dmaster, k[0], k);
486 if (ddq->generation == ctdb_db->generation) {
491 /* Recovery ocurred - get rid of old queue. All the deferred
492 * requests will be resent anyway from ctdb_call_resend_db.
497 ddq = talloc(hdr, struct dmaster_defer_queue);
499 DEBUG(DEBUG_ERR, ("Failed to allocate dmaster defer queue\n"));
503 ddq->ctdb_db = ctdb_db;
504 ddq->generation = hdr->generation;
505 ddq->deferred_calls = NULL;
507 trbt_insertarray32_callback(ctdb_db->defer_dmaster, k[0], k,
508 insert_ddq_callback, ddq);
509 talloc_set_destructor(ddq, dmaster_defer_queue_destructor);
515 static int dmaster_defer_add(struct ctdb_db_context *ctdb_db,
516 struct ctdb_req_header *hdr,
519 struct dmaster_defer_queue *ddq;
520 struct dmaster_defer_call *call;
523 k = ctdb_key_to_idkey(hdr, key);
525 DEBUG(DEBUG_ERR, ("Failed to allocate key for dmaster defer add\n"));
529 ddq = trbt_lookuparray32(ctdb_db->defer_dmaster, k[0], k);
537 if (ddq->generation != hdr->generation) {
538 talloc_set_destructor(ddq, NULL);
543 call = talloc(ddq, struct dmaster_defer_call);
545 DEBUG(DEBUG_ERR, ("Failed to allocate dmaster defer call\n"));
549 call->ctdb = ctdb_db->ctdb;
550 call->hdr = talloc_steal(call, hdr);
552 DLIST_ADD_END(ddq->deferred_calls, call, NULL);
558 called when a CTDB_REQ_DMASTER packet comes in
560 this comes into the lmaster for a record when the current dmaster
561 wants to give up the dmaster role and give it to someone else
563 void ctdb_request_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
565 struct ctdb_req_dmaster *c = (struct ctdb_req_dmaster *)hdr;
566 TDB_DATA key, data, data2;
567 struct ctdb_ltdb_header header;
568 struct ctdb_db_context *ctdb_db;
569 uint32_t record_flags = 0;
573 ctdb_db = find_ctdb_db(ctdb, c->db_id);
575 ctdb_send_error(ctdb, hdr, -1,
576 "Unknown database in request. db_id==0x%08x",
581 if (hdr->generation != ctdb_db->generation) {
583 ("ctdb operation %u request %u from node %u to %u had an"
584 " invalid generation:%u while our generation is:%u\n",
585 hdr->operation, hdr->reqid, hdr->srcnode, hdr->destnode,
586 hdr->generation, ctdb_db->generation));
591 key.dsize = c->keylen;
592 data.dptr = c->data + c->keylen;
593 data.dsize = c->datalen;
594 len = offsetof(struct ctdb_req_dmaster, data) + key.dsize + data.dsize
596 if (len <= c->hdr.length) {
597 memcpy(&record_flags, &c->data[c->keylen + c->datalen],
598 sizeof(record_flags));
601 dmaster_defer_setup(ctdb_db, hdr, key);
603 /* fetch the current record */
604 ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header, hdr, &data2,
605 ctdb_call_input_pkt, ctdb, false);
607 ctdb_fatal(ctdb, "ctdb_req_dmaster failed to fetch record");
611 DEBUG(DEBUG_INFO,(__location__ " deferring ctdb_request_dmaster\n"));
615 if (ctdb_lmaster(ctdb, &key) != ctdb->pnn) {
616 DEBUG(DEBUG_ALERT,("pnn %u dmaster request to non-lmaster lmaster=%u gen=%u curgen=%u\n",
617 ctdb->pnn, ctdb_lmaster(ctdb, &key),
618 hdr->generation, ctdb->vnn_map->generation));
619 ctdb_fatal(ctdb, "ctdb_req_dmaster to non-lmaster");
622 DEBUG(DEBUG_DEBUG,("pnn %u dmaster request on %08x for %u from %u\n",
623 ctdb->pnn, ctdb_hash(&key), c->dmaster, c->hdr.srcnode));
625 /* its a protocol error if the sending node is not the current dmaster */
626 if (header.dmaster != hdr->srcnode) {
627 DEBUG(DEBUG_ALERT,("pnn %u dmaster request for new-dmaster %u from non-master %u real-dmaster=%u key %08x dbid 0x%08x gen=%u curgen=%u c->rsn=%llu header.rsn=%llu reqid=%u keyval=0x%08x\n",
628 ctdb->pnn, c->dmaster, hdr->srcnode, header.dmaster, ctdb_hash(&key),
629 ctdb_db->db_id, hdr->generation, ctdb_db->generation,
630 (unsigned long long)c->rsn, (unsigned long long)header.rsn, c->hdr.reqid,
631 (key.dsize >= 4)?(*(uint32_t *)key.dptr):0));
632 if (header.rsn != 0 || header.dmaster != ctdb->pnn) {
633 DEBUG(DEBUG_ERR,("ctdb_req_dmaster from non-master. Force a recovery.\n"));
635 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
636 ctdb_ltdb_unlock(ctdb_db, key);
641 if (header.rsn > c->rsn) {
642 DEBUG(DEBUG_ALERT,("pnn %u dmaster request with older RSN new-dmaster %u from %u real-dmaster=%u key %08x dbid 0x%08x gen=%u curgen=%u c->rsn=%llu header.rsn=%llu reqid=%u\n",
643 ctdb->pnn, c->dmaster, hdr->srcnode, header.dmaster, ctdb_hash(&key),
644 ctdb_db->db_id, hdr->generation, ctdb_db->generation,
645 (unsigned long long)c->rsn, (unsigned long long)header.rsn, c->hdr.reqid));
648 /* use the rsn from the sending node */
651 /* store the record flags from the sending node */
652 header.flags = record_flags;
654 /* check if the new dmaster is the lmaster, in which case we
655 skip the dmaster reply */
656 if (c->dmaster == ctdb->pnn) {
657 ctdb_become_dmaster(ctdb_db, hdr, key, data, c->rsn, record_flags);
659 ctdb_send_dmaster_reply(ctdb_db, &header, key, data, c->dmaster, hdr->reqid);
661 ret = ctdb_ltdb_unlock(ctdb_db, key);
663 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
668 static void ctdb_sticky_record_timeout(struct event_context *ev, struct timed_event *te,
669 struct timeval t, void *private_data)
671 struct ctdb_sticky_record *sr = talloc_get_type(private_data,
672 struct ctdb_sticky_record);
676 static void *ctdb_make_sticky_record_callback(void *parm, void *data)
679 DEBUG(DEBUG_ERR,("Already have sticky record registered. Free old %p and create new %p\n", data, parm));
686 ctdb_make_record_sticky(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key)
688 TALLOC_CTX *tmp_ctx = talloc_new(NULL);
690 struct ctdb_sticky_record *sr;
692 k = ctdb_key_to_idkey(tmp_ctx, key);
694 DEBUG(DEBUG_ERR,("Failed to allocate key for sticky record\n"));
695 talloc_free(tmp_ctx);
699 sr = trbt_lookuparray32(ctdb_db->sticky_records, k[0], &k[0]);
701 talloc_free(tmp_ctx);
705 sr = talloc(ctdb_db->sticky_records, struct ctdb_sticky_record);
707 talloc_free(tmp_ctx);
708 DEBUG(DEBUG_ERR,("Failed to allocate sticky record structure\n"));
713 sr->ctdb_db = ctdb_db;
716 DEBUG(DEBUG_ERR,("Make record sticky for %d seconds in db %s key:0x%08x.\n",
717 ctdb->tunable.sticky_duration,
718 ctdb_db->db_name, ctdb_hash(&key)));
720 trbt_insertarray32_callback(ctdb_db->sticky_records, k[0], &k[0], ctdb_make_sticky_record_callback, sr);
722 event_add_timed(ctdb->ev, sr, timeval_current_ofs(ctdb->tunable.sticky_duration, 0), ctdb_sticky_record_timeout, sr);
724 talloc_free(tmp_ctx);
728 struct pinned_down_requeue_handle {
729 struct ctdb_context *ctdb;
730 struct ctdb_req_header *hdr;
733 struct pinned_down_deferred_call {
734 struct ctdb_context *ctdb;
735 struct ctdb_req_header *hdr;
738 static void pinned_down_requeue(struct event_context *ev, struct timed_event *te,
739 struct timeval t, void *private_data)
741 struct pinned_down_requeue_handle *handle = talloc_get_type(private_data, struct pinned_down_requeue_handle);
742 struct ctdb_context *ctdb = handle->ctdb;
744 talloc_steal(ctdb, handle->hdr);
745 ctdb_call_input_pkt(ctdb, handle->hdr);
750 static int pinned_down_destructor(struct pinned_down_deferred_call *pinned_down)
752 struct ctdb_context *ctdb = pinned_down->ctdb;
753 struct pinned_down_requeue_handle *handle = talloc(ctdb, struct pinned_down_requeue_handle);
755 handle->ctdb = pinned_down->ctdb;
756 handle->hdr = pinned_down->hdr;
757 talloc_steal(handle, handle->hdr);
759 event_add_timed(ctdb->ev, handle, timeval_zero(), pinned_down_requeue, handle);
765 ctdb_defer_pinned_down_request(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_req_header *hdr)
767 TALLOC_CTX *tmp_ctx = talloc_new(NULL);
769 struct ctdb_sticky_record *sr;
770 struct pinned_down_deferred_call *pinned_down;
772 k = ctdb_key_to_idkey(tmp_ctx, key);
774 DEBUG(DEBUG_ERR,("Failed to allocate key for sticky record\n"));
775 talloc_free(tmp_ctx);
779 sr = trbt_lookuparray32(ctdb_db->sticky_records, k[0], &k[0]);
781 talloc_free(tmp_ctx);
785 talloc_free(tmp_ctx);
787 if (sr->pindown == NULL) {
791 pinned_down = talloc(sr->pindown, struct pinned_down_deferred_call);
792 if (pinned_down == NULL) {
793 DEBUG(DEBUG_ERR,("Failed to allocate structure for deferred pinned down request\n"));
797 pinned_down->ctdb = ctdb;
798 pinned_down->hdr = hdr;
800 talloc_set_destructor(pinned_down, pinned_down_destructor);
801 talloc_steal(pinned_down, hdr);
807 ctdb_update_db_stat_hot_keys(struct ctdb_db_context *ctdb_db, TDB_DATA key, int hopcount)
811 /* smallest value is always at index 0 */
812 if (hopcount <= ctdb_db->statistics.hot_keys[0].count) {
816 /* see if we already know this key */
817 for (i = 0; i < MAX_HOT_KEYS; i++) {
818 if (key.dsize != ctdb_db->statistics.hot_keys[i].key.dsize) {
821 if (memcmp(key.dptr, ctdb_db->statistics.hot_keys[i].key.dptr, key.dsize)) {
824 /* found an entry for this key */
825 if (hopcount <= ctdb_db->statistics.hot_keys[i].count) {
828 ctdb_db->statistics.hot_keys[i].count = hopcount;
832 if (ctdb_db->statistics.num_hot_keys < MAX_HOT_KEYS) {
833 id = ctdb_db->statistics.num_hot_keys;
834 ctdb_db->statistics.num_hot_keys++;
839 if (ctdb_db->statistics.hot_keys[id].key.dptr != NULL) {
840 talloc_free(ctdb_db->statistics.hot_keys[id].key.dptr);
842 ctdb_db->statistics.hot_keys[id].key.dsize = key.dsize;
843 ctdb_db->statistics.hot_keys[id].key.dptr = talloc_memdup(ctdb_db, key.dptr, key.dsize);
844 ctdb_db->statistics.hot_keys[id].count = hopcount;
845 DEBUG(DEBUG_NOTICE,("Updated hot key database=%s key=0x%08x id=%d hop_count=%d\n",
846 ctdb_db->db_name, ctdb_hash(&key), id, hopcount));
849 for (i = 1; i < MAX_HOT_KEYS; i++) {
850 if (ctdb_db->statistics.hot_keys[i].count == 0) {
853 if (ctdb_db->statistics.hot_keys[i].count < ctdb_db->statistics.hot_keys[0].count) {
854 hopcount = ctdb_db->statistics.hot_keys[i].count;
855 ctdb_db->statistics.hot_keys[i].count = ctdb_db->statistics.hot_keys[0].count;
856 ctdb_db->statistics.hot_keys[0].count = hopcount;
858 key = ctdb_db->statistics.hot_keys[i].key;
859 ctdb_db->statistics.hot_keys[i].key = ctdb_db->statistics.hot_keys[0].key;
860 ctdb_db->statistics.hot_keys[0].key = key;
866 called when a CTDB_REQ_CALL packet comes in
868 void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
870 struct ctdb_req_call *c = (struct ctdb_req_call *)hdr;
872 struct ctdb_reply_call *r;
874 struct ctdb_ltdb_header header;
875 struct ctdb_call *call;
876 struct ctdb_db_context *ctdb_db;
877 int tmp_count, bucket;
879 if (ctdb->methods == NULL) {
880 DEBUG(DEBUG_INFO,(__location__ " Failed ctdb_request_call. Transport is DOWN\n"));
884 ctdb_db = find_ctdb_db(ctdb, c->db_id);
886 ctdb_send_error(ctdb, hdr, -1,
887 "Unknown database in request. db_id==0x%08x",
892 if (hdr->generation != ctdb_db->generation) {
894 ("ctdb operation %u request %u from node %u to %u had an"
895 " invalid generation:%u while our generation is:%u\n",
896 hdr->operation, hdr->reqid, hdr->srcnode, hdr->destnode,
897 hdr->generation, ctdb_db->generation));
901 call = talloc(hdr, struct ctdb_call);
902 CTDB_NO_MEMORY_FATAL(ctdb, call);
904 call->call_id = c->callid;
905 call->key.dptr = c->data;
906 call->key.dsize = c->keylen;
907 call->call_data.dptr = c->data + c->keylen;
908 call->call_data.dsize = c->calldatalen;
909 call->reply_data.dptr = NULL;
910 call->reply_data.dsize = 0;
913 /* If this record is pinned down we should defer the
914 request until the pindown times out
916 if (ctdb_db->sticky) {
917 if (ctdb_defer_pinned_down_request(ctdb, ctdb_db, call->key, hdr) == 0) {
919 ("Defer request for pinned down record in %s\n", ctdb_db->db_name));
925 if (dmaster_defer_add(ctdb_db, hdr, call->key) == 0) {
930 /* determine if we are the dmaster for this key. This also
931 fetches the record data (if any), thus avoiding a 2nd fetch of the data
932 if the call will be answered locally */
934 ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, call->key, &header, hdr, &data,
935 ctdb_call_input_pkt, ctdb, false);
937 ctdb_send_error(ctdb, hdr, ret, "ltdb fetch failed in ctdb_request_call");
942 DEBUG(DEBUG_INFO,(__location__ " deferred ctdb_request_call\n"));
947 /* Dont do READONLY if we dont have a tracking database */
948 if ((c->flags & CTDB_WANT_READONLY) && !ctdb_db->readonly) {
949 c->flags &= ~CTDB_WANT_READONLY;
952 if (header.flags & CTDB_REC_RO_REVOKE_COMPLETE) {
953 header.flags &= ~CTDB_REC_RO_FLAGS;
954 CTDB_INCREMENT_STAT(ctdb, total_ro_revokes);
955 CTDB_INCREMENT_DB_STAT(ctdb_db, db_ro_revokes);
956 if (ctdb_ltdb_store(ctdb_db, call->key, &header, data) != 0) {
957 ctdb_fatal(ctdb, "Failed to write header with cleared REVOKE flag");
959 /* and clear out the tracking data */
960 if (tdb_delete(ctdb_db->rottdb, call->key) != 0) {
961 DEBUG(DEBUG_ERR,(__location__ " Failed to clear out trackingdb record\n"));
965 /* if we are revoking, we must defer all other calls until the revoke
968 if (header.flags & CTDB_REC_RO_REVOKING_READONLY) {
969 talloc_free(data.dptr);
970 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
972 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, call->key, hdr, ctdb_call_input_pkt, ctdb) != 0) {
973 ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
980 * If we are not the dmaster and are not hosting any delegations,
981 * then we redirect the request to the node than can answer it
982 * (the lmaster or the dmaster).
984 if ((header.dmaster != ctdb->pnn)
985 && (!(header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) ) {
986 talloc_free(data.dptr);
987 ctdb_call_send_redirect(ctdb, ctdb_db, call->key, c, &header);
989 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
991 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
997 if ( (!(c->flags & CTDB_WANT_READONLY))
998 && (header.flags & (CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY)) ) {
999 header.flags |= CTDB_REC_RO_REVOKING_READONLY;
1000 if (ctdb_ltdb_store(ctdb_db, call->key, &header, data) != 0) {
1001 ctdb_fatal(ctdb, "Failed to store record with HAVE_DELEGATIONS set");
1003 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
1005 if (ctdb_start_revoke_ro_record(ctdb, ctdb_db, call->key, &header, data) != 0) {
1006 ctdb_fatal(ctdb, "Failed to start record revoke");
1008 talloc_free(data.dptr);
1010 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, call->key, hdr, ctdb_call_input_pkt, ctdb) != 0) {
1011 ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
1018 /* If this is the first request for delegation. bump rsn and set
1019 * the delegations flag
1021 if ((c->flags & CTDB_WANT_READONLY)
1022 && (c->callid == CTDB_FETCH_WITH_HEADER_FUNC)
1023 && (!(header.flags & CTDB_REC_RO_HAVE_DELEGATIONS))) {
1025 header.flags |= CTDB_REC_RO_HAVE_DELEGATIONS;
1026 if (ctdb_ltdb_store(ctdb_db, call->key, &header, data) != 0) {
1027 ctdb_fatal(ctdb, "Failed to store record with HAVE_DELEGATIONS set");
1030 if ((c->flags & CTDB_WANT_READONLY)
1031 && (call->call_id == CTDB_FETCH_WITH_HEADER_FUNC)) {
1034 tdata = tdb_fetch(ctdb_db->rottdb, call->key);
1035 if (ctdb_trackingdb_add_pnn(ctdb, &tdata, c->hdr.srcnode) != 0) {
1036 ctdb_fatal(ctdb, "Failed to add node to trackingdb");
1038 if (tdb_store(ctdb_db->rottdb, call->key, tdata, TDB_REPLACE) != 0) {
1039 ctdb_fatal(ctdb, "Failed to store trackingdb data");
1043 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
1045 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
1048 len = offsetof(struct ctdb_reply_call, data) + data.dsize + sizeof(struct ctdb_ltdb_header);
1049 r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REPLY_CALL, len,
1050 struct ctdb_reply_call);
1051 CTDB_NO_MEMORY_FATAL(ctdb, r);
1052 r->hdr.destnode = c->hdr.srcnode;
1053 r->hdr.reqid = c->hdr.reqid;
1054 r->hdr.generation = ctdb_db->generation;
1056 r->datalen = data.dsize + sizeof(struct ctdb_ltdb_header);
1058 header.flags |= CTDB_REC_RO_HAVE_READONLY;
1059 header.flags &= ~CTDB_REC_RO_HAVE_DELEGATIONS;
1060 memcpy(&r->data[0], &header, sizeof(struct ctdb_ltdb_header));
1063 memcpy(&r->data[sizeof(struct ctdb_ltdb_header)], data.dptr, data.dsize);
1066 ctdb_queue_packet(ctdb, &r->hdr);
1067 CTDB_INCREMENT_STAT(ctdb, total_ro_delegations);
1068 CTDB_INCREMENT_DB_STAT(ctdb_db, db_ro_delegations);
1075 CTDB_UPDATE_STAT(ctdb, max_hop_count, c->hopcount);
1076 tmp_count = c->hopcount;
1082 if (bucket >= MAX_COUNT_BUCKETS) {
1083 bucket = MAX_COUNT_BUCKETS - 1;
1085 CTDB_INCREMENT_STAT(ctdb, hop_count_bucket[bucket]);
1086 CTDB_INCREMENT_DB_STAT(ctdb_db, hop_count_bucket[bucket]);
1087 ctdb_update_db_stat_hot_keys(ctdb_db, call->key, c->hopcount);
1089 /* If this database supports sticky records, then check if the
1090 hopcount is big. If it is it means the record is hot and we
1091 should make it sticky.
1093 if (ctdb_db->sticky && c->hopcount >= ctdb->tunable.hopcount_make_sticky) {
1094 ctdb_make_record_sticky(ctdb, ctdb_db, call->key);
1098 /* Try if possible to migrate the record off to the caller node.
1099 * From the clients perspective a fetch of the data is just as
1100 * expensive as a migration.
1102 if (c->hdr.srcnode != ctdb->pnn) {
1103 if (ctdb_db->persistent_state) {
1104 DEBUG(DEBUG_INFO, (__location__ " refusing migration"
1105 " of key %s while transaction is active\n",
1106 (char *)call->key.dptr));
1108 DEBUG(DEBUG_DEBUG,("pnn %u starting migration of %08x to %u\n",
1109 ctdb->pnn, ctdb_hash(&(call->key)), c->hdr.srcnode));
1110 ctdb_call_send_dmaster(ctdb_db, c, &header, &(call->key), &data);
1111 talloc_free(data.dptr);
1113 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
1115 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
1122 ret = ctdb_call_local(ctdb_db, call, &header, hdr, &data, true);
1124 DEBUG(DEBUG_ERR,(__location__ " ctdb_call_local failed\n"));
1128 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
1130 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
1133 len = offsetof(struct ctdb_reply_call, data) + call->reply_data.dsize;
1134 r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REPLY_CALL, len,
1135 struct ctdb_reply_call);
1136 CTDB_NO_MEMORY_FATAL(ctdb, r);
1137 r->hdr.destnode = hdr->srcnode;
1138 r->hdr.reqid = hdr->reqid;
1139 r->hdr.generation = ctdb_db->generation;
1140 r->status = call->status;
1141 r->datalen = call->reply_data.dsize;
1142 if (call->reply_data.dsize) {
1143 memcpy(&r->data[0], call->reply_data.dptr, call->reply_data.dsize);
1146 ctdb_queue_packet(ctdb, &r->hdr);
1153 * called when a CTDB_REPLY_CALL packet comes in
1155 * This packet comes in response to a CTDB_REQ_CALL request packet. It
1156 * contains any reply data from the call
1158 void ctdb_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
1160 struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr;
1161 struct ctdb_call_state *state;
1163 state = reqid_find(ctdb->idr, hdr->reqid, struct ctdb_call_state);
1164 if (state == NULL) {
1165 DEBUG(DEBUG_ERR, (__location__ " reqid %u not found\n", hdr->reqid));
1169 if (hdr->reqid != state->reqid) {
1170 /* we found a record but it was the wrong one */
1171 DEBUG(DEBUG_ERR, ("Dropped orphaned call reply with reqid:%u\n",hdr->reqid));
1175 if (hdr->generation != state->generation) {
1177 ("ctdb operation %u request %u from node %u to %u had an"
1178 " invalid generation:%u while our generation is:%u\n",
1179 hdr->operation, hdr->reqid, hdr->srcnode, hdr->destnode,
1180 hdr->generation, state->generation));
1185 /* read only delegation processing */
1186 /* If we got a FETCH_WITH_HEADER we should check if this is a ro
1187 * delegation since we may need to update the record header
1189 if (state->c->callid == CTDB_FETCH_WITH_HEADER_FUNC) {
1190 struct ctdb_db_context *ctdb_db = state->ctdb_db;
1191 struct ctdb_ltdb_header *header = (struct ctdb_ltdb_header *)&c->data[0];
1192 struct ctdb_ltdb_header oldheader;
1193 TDB_DATA key, data, olddata;
1196 if (!(header->flags & CTDB_REC_RO_HAVE_READONLY)) {
1201 key.dsize = state->c->keylen;
1202 key.dptr = state->c->data;
1203 ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr,
1204 ctdb_call_input_pkt, ctdb, false);
1209 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock in ctdb_reply_call\n"));
1213 ret = ctdb_ltdb_fetch(ctdb_db, key, &oldheader, state, &olddata);
1215 DEBUG(DEBUG_ERR, ("Failed to fetch old record in ctdb_reply_call\n"));
1216 ctdb_ltdb_unlock(ctdb_db, key);
1220 if (header->rsn <= oldheader.rsn) {
1221 ctdb_ltdb_unlock(ctdb_db, key);
1225 if (c->datalen < sizeof(struct ctdb_ltdb_header)) {
1226 DEBUG(DEBUG_ERR,(__location__ " Got FETCH_WITH_HEADER reply with too little data: %d bytes\n", c->datalen));
1227 ctdb_ltdb_unlock(ctdb_db, key);
1231 data.dsize = c->datalen - sizeof(struct ctdb_ltdb_header);
1232 data.dptr = &c->data[sizeof(struct ctdb_ltdb_header)];
1233 ret = ctdb_ltdb_store(ctdb_db, key, header, data);
1235 DEBUG(DEBUG_ERR, ("Failed to store new record in ctdb_reply_call\n"));
1236 ctdb_ltdb_unlock(ctdb_db, key);
1240 ctdb_ltdb_unlock(ctdb_db, key);
1244 state->call->reply_data.dptr = c->data;
1245 state->call->reply_data.dsize = c->datalen;
1246 state->call->status = c->status;
1248 talloc_steal(state, c);
1250 state->state = CTDB_CALL_DONE;
1251 if (state->async.fn) {
1252 state->async.fn(state);
1258 * called when a CTDB_REPLY_DMASTER packet comes in
1260 * This packet comes in from the lmaster in response to a CTDB_REQ_CALL
1261 * request packet. It means that the current dmaster wants to give us
1264 void ctdb_reply_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
1266 struct ctdb_reply_dmaster *c = (struct ctdb_reply_dmaster *)hdr;
1267 struct ctdb_db_context *ctdb_db;
1269 uint32_t record_flags = 0;
1273 ctdb_db = find_ctdb_db(ctdb, c->db_id);
1274 if (ctdb_db == NULL) {
1275 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_reply_dmaster\n", c->db_id));
1279 if (hdr->generation != ctdb_db->generation) {
1281 ("ctdb operation %u request %u from node %u to %u had an"
1282 " invalid generation:%u while our generation is:%u\n",
1283 hdr->operation, hdr->reqid, hdr->srcnode, hdr->destnode,
1284 hdr->generation, ctdb_db->generation));
1289 key.dsize = c->keylen;
1290 data.dptr = &c->data[key.dsize];
1291 data.dsize = c->datalen;
1292 len = offsetof(struct ctdb_reply_dmaster, data) + key.dsize + data.dsize
1294 if (len <= c->hdr.length) {
1295 memcpy(&record_flags, &c->data[c->keylen + c->datalen],
1296 sizeof(record_flags));
1299 dmaster_defer_setup(ctdb_db, hdr, key);
1301 ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr,
1302 ctdb_call_input_pkt, ctdb, false);
1307 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock in ctdb_reply_dmaster\n"));
1311 ctdb_become_dmaster(ctdb_db, hdr, key, data, c->rsn, record_flags);
1316 called when a CTDB_REPLY_ERROR packet comes in
1318 void ctdb_reply_error(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
1320 struct ctdb_reply_error *c = (struct ctdb_reply_error *)hdr;
1321 struct ctdb_call_state *state;
1323 state = reqid_find(ctdb->idr, hdr->reqid, struct ctdb_call_state);
1324 if (state == NULL) {
1325 DEBUG(DEBUG_ERR,("pnn %u Invalid reqid %u in ctdb_reply_error\n",
1326 ctdb->pnn, hdr->reqid));
1330 if (hdr->reqid != state->reqid) {
1331 /* we found a record but it was the wrong one */
1332 DEBUG(DEBUG_ERR, ("Dropped orphaned error reply with reqid:%u\n",hdr->reqid));
1336 talloc_steal(state, c);
1338 state->state = CTDB_CALL_ERROR;
1339 state->errmsg = (char *)c->msg;
1340 if (state->async.fn) {
1341 state->async.fn(state);
1349 static int ctdb_call_destructor(struct ctdb_call_state *state)
1351 DLIST_REMOVE(state->ctdb_db->pending_calls, state);
1352 reqid_remove(state->ctdb_db->ctdb->idr, state->reqid);
1358 called when a ctdb_call needs to be resent after a reconfigure event
1360 static void ctdb_call_resend(struct ctdb_call_state *state)
1362 struct ctdb_context *ctdb = state->ctdb_db->ctdb;
1364 state->generation = state->ctdb_db->generation;
1366 /* use a new reqid, in case the old reply does eventually come in */
1367 reqid_remove(ctdb->idr, state->reqid);
1368 state->reqid = reqid_new(ctdb->idr, state);
1369 state->c->hdr.reqid = state->reqid;
1371 /* update the generation count for this request, so its valid with the new vnn_map */
1372 state->c->hdr.generation = state->generation;
1374 /* send the packet to ourselves, it will be redirected appropriately */
1375 state->c->hdr.destnode = ctdb->pnn;
1377 ctdb_queue_packet(ctdb, &state->c->hdr);
1378 DEBUG(DEBUG_NOTICE,("resent ctdb_call\n"));
1382 resend all pending calls on recovery
1384 void ctdb_call_resend_db(struct ctdb_db_context *ctdb_db)
1386 struct ctdb_call_state *state, *next;
1388 for (state = ctdb_db->pending_calls; state; state = next) {
1390 ctdb_call_resend(state);
1394 void ctdb_call_resend_all(struct ctdb_context *ctdb)
1396 struct ctdb_db_context *ctdb_db;
1398 for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
1399 ctdb_call_resend_db(ctdb_db);
1404 this allows the caller to setup a async.fn
1406 static void call_local_trigger(struct event_context *ev, struct timed_event *te,
1407 struct timeval t, void *private_data)
1409 struct ctdb_call_state *state = talloc_get_type(private_data, struct ctdb_call_state);
1410 if (state->async.fn) {
1411 state->async.fn(state);
1417 construct an event driven local ctdb_call
1419 this is used so that locally processed ctdb_call requests are processed
1420 in an event driven manner
1422 struct ctdb_call_state *ctdb_call_local_send(struct ctdb_db_context *ctdb_db,
1423 struct ctdb_call *call,
1424 struct ctdb_ltdb_header *header,
1427 struct ctdb_call_state *state;
1428 struct ctdb_context *ctdb = ctdb_db->ctdb;
1431 state = talloc_zero(ctdb_db, struct ctdb_call_state);
1432 CTDB_NO_MEMORY_NULL(ctdb, state);
1434 talloc_steal(state, data->dptr);
1436 state->state = CTDB_CALL_DONE;
1437 state->call = talloc(state, struct ctdb_call);
1438 CTDB_NO_MEMORY_NULL(ctdb, state->call);
1439 *(state->call) = *call;
1440 state->ctdb_db = ctdb_db;
1442 ret = ctdb_call_local(ctdb_db, state->call, header, state, data, true);
1444 DEBUG(DEBUG_DEBUG,("ctdb_call_local() failed, ignoring return code %d\n", ret));
1447 event_add_timed(ctdb->ev, state, timeval_zero(), call_local_trigger, state);
1454 make a remote ctdb call - async send. Called in daemon context.
1456 This constructs a ctdb_call request and queues it for processing.
1457 This call never blocks.
1459 struct ctdb_call_state *ctdb_daemon_call_send_remote(struct ctdb_db_context *ctdb_db,
1460 struct ctdb_call *call,
1461 struct ctdb_ltdb_header *header)
1464 struct ctdb_call_state *state;
1465 struct ctdb_context *ctdb = ctdb_db->ctdb;
1467 if (ctdb->methods == NULL) {
1468 DEBUG(DEBUG_INFO,(__location__ " Failed send packet. Transport is down\n"));
1472 state = talloc_zero(ctdb_db, struct ctdb_call_state);
1473 CTDB_NO_MEMORY_NULL(ctdb, state);
1474 state->call = talloc(state, struct ctdb_call);
1475 CTDB_NO_MEMORY_NULL(ctdb, state->call);
1477 state->reqid = reqid_new(ctdb->idr, state);
1478 state->ctdb_db = ctdb_db;
1479 talloc_set_destructor(state, ctdb_call_destructor);
1481 len = offsetof(struct ctdb_req_call, data) + call->key.dsize + call->call_data.dsize;
1482 state->c = ctdb_transport_allocate(ctdb, state, CTDB_REQ_CALL, len,
1483 struct ctdb_req_call);
1484 CTDB_NO_MEMORY_NULL(ctdb, state->c);
1485 state->c->hdr.destnode = header->dmaster;
1487 /* this limits us to 16k outstanding messages - not unreasonable */
1488 state->c->hdr.reqid = state->reqid;
1489 state->c->hdr.generation = ctdb_db->generation;
1490 state->c->flags = call->flags;
1491 state->c->db_id = ctdb_db->db_id;
1492 state->c->callid = call->call_id;
1493 state->c->hopcount = 0;
1494 state->c->keylen = call->key.dsize;
1495 state->c->calldatalen = call->call_data.dsize;
1496 memcpy(&state->c->data[0], call->key.dptr, call->key.dsize);
1497 memcpy(&state->c->data[call->key.dsize],
1498 call->call_data.dptr, call->call_data.dsize);
1499 *(state->call) = *call;
1500 state->call->call_data.dptr = &state->c->data[call->key.dsize];
1501 state->call->key.dptr = &state->c->data[0];
1503 state->state = CTDB_CALL_WAIT;
1504 state->generation = ctdb_db->generation;
1506 DLIST_ADD(ctdb_db->pending_calls, state);
1508 ctdb_queue_packet(ctdb, &state->c->hdr);
1514 make a remote ctdb call - async recv - called in daemon context
1516 This is called when the program wants to wait for a ctdb_call to complete and get the
1517 results. This call will block unless the call has already completed.
1519 int ctdb_daemon_call_recv(struct ctdb_call_state *state, struct ctdb_call *call)
1521 while (state->state < CTDB_CALL_DONE) {
1522 event_loop_once(state->ctdb_db->ctdb->ev);
1524 if (state->state != CTDB_CALL_DONE) {
1525 ctdb_set_error(state->ctdb_db->ctdb, "%s", state->errmsg);
1530 if (state->call->reply_data.dsize) {
1531 call->reply_data.dptr = talloc_memdup(call,
1532 state->call->reply_data.dptr,
1533 state->call->reply_data.dsize);
1534 call->reply_data.dsize = state->call->reply_data.dsize;
1536 call->reply_data.dptr = NULL;
1537 call->reply_data.dsize = 0;
1539 call->status = state->call->status;
1546 send a keepalive packet to the other node
1548 void ctdb_send_keepalive(struct ctdb_context *ctdb, uint32_t destnode)
1550 struct ctdb_req_keepalive *r;
1552 if (ctdb->methods == NULL) {
1553 DEBUG(DEBUG_INFO,(__location__ " Failed to send keepalive. Transport is DOWN\n"));
1557 r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_KEEPALIVE,
1558 sizeof(struct ctdb_req_keepalive),
1559 struct ctdb_req_keepalive);
1560 CTDB_NO_MEMORY_FATAL(ctdb, r);
1561 r->hdr.destnode = destnode;
1564 CTDB_INCREMENT_STAT(ctdb, keepalive_packets_sent);
1566 ctdb_queue_packet(ctdb, &r->hdr);
1573 struct revokechild_deferred_call {
1574 struct ctdb_context *ctdb;
1575 struct ctdb_req_header *hdr;
1576 deferred_requeue_fn fn;
1580 struct revokechild_handle {
1581 struct revokechild_handle *next, *prev;
1582 struct ctdb_context *ctdb;
1583 struct ctdb_db_context *ctdb_db;
1584 struct fd_event *fde;
1591 struct revokechild_requeue_handle {
1592 struct ctdb_context *ctdb;
1593 struct ctdb_req_header *hdr;
1594 deferred_requeue_fn fn;
1598 static void deferred_call_requeue(struct event_context *ev, struct timed_event *te,
1599 struct timeval t, void *private_data)
1601 struct revokechild_requeue_handle *requeue_handle = talloc_get_type(private_data, struct revokechild_requeue_handle);
1603 requeue_handle->fn(requeue_handle->ctx, requeue_handle->hdr);
1604 talloc_free(requeue_handle);
1607 static int deferred_call_destructor(struct revokechild_deferred_call *deferred_call)
1609 struct ctdb_context *ctdb = deferred_call->ctdb;
1610 struct revokechild_requeue_handle *requeue_handle = talloc(ctdb, struct revokechild_requeue_handle);
1611 struct ctdb_req_call *c = (struct ctdb_req_call *)deferred_call->hdr;
1613 requeue_handle->ctdb = ctdb;
1614 requeue_handle->hdr = deferred_call->hdr;
1615 requeue_handle->fn = deferred_call->fn;
1616 requeue_handle->ctx = deferred_call->ctx;
1617 talloc_steal(requeue_handle, requeue_handle->hdr);
1619 /* when revoking, any READONLY requests have 1 second grace to let read/write finish first */
1620 event_add_timed(ctdb->ev, requeue_handle, timeval_current_ofs(c->flags & CTDB_WANT_READONLY ? 1 : 0, 0), deferred_call_requeue, requeue_handle);
1626 static int revokechild_destructor(struct revokechild_handle *rc)
1628 if (rc->fde != NULL) {
1629 talloc_free(rc->fde);
1632 if (rc->fd[0] != -1) {
1635 if (rc->fd[1] != -1) {
1638 ctdb_kill(rc->ctdb, rc->child, SIGKILL);
1640 DLIST_REMOVE(rc->ctdb_db->revokechild_active, rc);
1644 static void revokechild_handler(struct event_context *ev, struct fd_event *fde,
1645 uint16_t flags, void *private_data)
1647 struct revokechild_handle *rc = talloc_get_type(private_data,
1648 struct revokechild_handle);
1652 ret = sys_read(rc->fd[0], &c, 1);
1654 DEBUG(DEBUG_ERR,("Failed to read status from revokechild. errno:%d\n", errno));
1660 DEBUG(DEBUG_ERR,("revokechild returned failure. status:%d\n", c));
1669 struct ctdb_revoke_state {
1670 struct ctdb_db_context *ctdb_db;
1672 struct ctdb_ltdb_header *header;
1679 static void update_record_cb(struct ctdb_client_control_state *state)
1681 struct ctdb_revoke_state *revoke_state;
1685 if (state == NULL) {
1688 revoke_state = state->async.private_data;
1690 state->async.fn = NULL;
1691 ret = ctdb_control_recv(state->ctdb, state, state, NULL, &res, NULL);
1692 if ((ret != 0) || (res != 0)) {
1693 DEBUG(DEBUG_ERR,("Recv for revoke update record failed ret:%d res:%d\n", ret, res));
1694 revoke_state->status = -1;
1697 revoke_state->count--;
1698 if (revoke_state->count <= 0) {
1699 revoke_state->finished = 1;
1703 static void revoke_send_cb(struct ctdb_context *ctdb, uint32_t pnn, void *private_data)
1705 struct ctdb_revoke_state *revoke_state = private_data;
1706 struct ctdb_client_control_state *state;
1708 state = ctdb_ctrl_updaterecord_send(ctdb, revoke_state, timeval_current_ofs(ctdb->tunable.control_timeout,0), pnn, revoke_state->ctdb_db, revoke_state->key, revoke_state->header, revoke_state->data);
1709 if (state == NULL) {
1710 DEBUG(DEBUG_ERR,("Failure to send update record to revoke readonly delegation\n"));
1711 revoke_state->status = -1;
1714 state->async.fn = update_record_cb;
1715 state->async.private_data = revoke_state;
1717 revoke_state->count++;
1721 static void ctdb_revoke_timeout_handler(struct event_context *ev, struct timed_event *te,
1722 struct timeval yt, void *private_data)
1724 struct ctdb_revoke_state *state = private_data;
1726 DEBUG(DEBUG_ERR,("Timed out waiting for revoke to finish\n"));
1727 state->finished = 1;
1731 static int ctdb_revoke_all_delegations(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA tdata, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
1733 struct ctdb_revoke_state *state = talloc_zero(ctdb, struct ctdb_revoke_state);
1734 struct ctdb_ltdb_header new_header;
1737 state->ctdb_db = ctdb_db;
1739 state->header = header;
1742 ctdb_trackingdb_traverse(ctdb, tdata, revoke_send_cb, state);
1744 event_add_timed(ctdb->ev, state, timeval_current_ofs(ctdb->tunable.control_timeout, 0), ctdb_revoke_timeout_handler, state);
1746 while (state->finished == 0) {
1747 event_loop_once(ctdb->ev);
1750 if (ctdb_ltdb_lock(ctdb_db, key) != 0) {
1751 DEBUG(DEBUG_ERR,("Failed to chainlock the database in revokechild\n"));
1755 if (ctdb_ltdb_fetch(ctdb_db, key, &new_header, state, &new_data) != 0) {
1756 ctdb_ltdb_unlock(ctdb_db, key);
1757 DEBUG(DEBUG_ERR,("Failed for fetch tdb record in revokechild\n"));
1762 if (new_header.rsn > header->rsn) {
1763 ctdb_ltdb_unlock(ctdb_db, key);
1764 DEBUG(DEBUG_ERR,("RSN too high in tdb record in revokechild\n"));
1768 if ( (new_header.flags & (CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_HAVE_DELEGATIONS)) != (CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_HAVE_DELEGATIONS) ) {
1769 ctdb_ltdb_unlock(ctdb_db, key);
1770 DEBUG(DEBUG_ERR,("Flags are wrong in tdb record in revokechild\n"));
1776 * If revoke on all nodes succeed, revoke is complete. Otherwise,
1777 * remove CTDB_REC_RO_REVOKING_READONLY flag and retry.
1779 if (state->status == 0) {
1781 new_header.flags |= CTDB_REC_RO_REVOKE_COMPLETE;
1783 DEBUG(DEBUG_NOTICE, ("Revoke all delegations failed, retrying.\n"));
1784 new_header.flags &= ~CTDB_REC_RO_REVOKING_READONLY;
1786 if (ctdb_ltdb_store(ctdb_db, key, &new_header, new_data) != 0) {
1787 ctdb_ltdb_unlock(ctdb_db, key);
1788 DEBUG(DEBUG_ERR,("Failed to write new record in revokechild\n"));
1792 ctdb_ltdb_unlock(ctdb_db, key);
1799 int ctdb_start_revoke_ro_record(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
1802 struct revokechild_handle *rc;
1803 pid_t parent = getpid();
1806 header->flags &= ~(CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY);
1807 header->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
1810 if ((rc = talloc_zero(ctdb_db, struct revokechild_handle)) == NULL) {
1811 DEBUG(DEBUG_ERR,("Failed to allocate revokechild_handle\n"));
1815 tdata = tdb_fetch(ctdb_db->rottdb, key);
1816 if (tdata.dsize > 0) {
1820 tdata.dptr = talloc_memdup(rc, tdata.dptr, tdata.dsize);
1826 rc->ctdb_db = ctdb_db;
1830 talloc_set_destructor(rc, revokechild_destructor);
1832 rc->key.dsize = key.dsize;
1833 rc->key.dptr = talloc_memdup(rc, key.dptr, key.dsize);
1834 if (rc->key.dptr == NULL) {
1835 DEBUG(DEBUG_ERR,("Failed to allocate key for revokechild_handle\n"));
1842 DEBUG(DEBUG_ERR,("Failed to allocate key for revokechild_handle\n"));
1848 rc->child = ctdb_fork(ctdb);
1849 if (rc->child == (pid_t)-1) {
1850 DEBUG(DEBUG_ERR,("Failed to fork child for revokechild\n"));
1855 if (rc->child == 0) {
1858 debug_extra = talloc_asprintf(NULL, "revokechild-%s:", ctdb_db->db_name);
1860 ctdb_set_process_name("ctdb_revokechild");
1861 if (switch_from_server_to_client(ctdb, "revokechild-%s", ctdb_db->db_name) != 0) {
1862 DEBUG(DEBUG_ERR,("Failed to switch from server to client for revokechild process\n"));
1864 goto child_finished;
1867 c = ctdb_revoke_all_delegations(ctdb, ctdb_db, tdata, key, header, data);
1870 sys_write(rc->fd[1], &c, 1);
1871 /* make sure we die when our parent dies */
1872 while (ctdb_kill(ctdb, parent, 0) == 0 || errno != ESRCH) {
1880 set_close_on_exec(rc->fd[0]);
1882 /* This is an active revokechild child process */
1883 DLIST_ADD_END(ctdb_db->revokechild_active, rc, NULL);
1885 rc->fde = event_add_fd(ctdb->ev, rc, rc->fd[0],
1886 EVENT_FD_READ, revokechild_handler,
1888 if (rc->fde == NULL) {
1889 DEBUG(DEBUG_ERR,("Failed to set up fd event for revokechild process\n"));
1892 tevent_fd_set_auto_close(rc->fde);
1897 int ctdb_add_revoke_deferred_call(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_req_header *hdr, deferred_requeue_fn fn, void *call_context)
1899 struct revokechild_handle *rc;
1900 struct revokechild_deferred_call *deferred_call;
1902 for (rc = ctdb_db->revokechild_active; rc; rc = rc->next) {
1903 if (rc->key.dsize == 0) {
1906 if (rc->key.dsize != key.dsize) {
1909 if (!memcmp(rc->key.dptr, key.dptr, key.dsize)) {
1915 DEBUG(DEBUG_ERR,("Failed to add deferred call to revoke list. revoke structure not found\n"));
1919 deferred_call = talloc(rc, struct revokechild_deferred_call);
1920 if (deferred_call == NULL) {
1921 DEBUG(DEBUG_ERR,("Failed to allocate deferred call structure for revoking record\n"));
1925 deferred_call->ctdb = ctdb;
1926 deferred_call->hdr = hdr;
1927 deferred_call->fn = fn;
1928 deferred_call->ctx = call_context;
1930 talloc_set_destructor(deferred_call, deferred_call_destructor);
1931 talloc_steal(deferred_call, hdr);