2 ctdb_call protocol code
4 Copyright (C) Andrew Tridgell 2006
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
20 see http://wiki.samba.org/index.php/Samba_%26_Clustering for
21 protocol design and packet details
25 #include "lib/util/dlinklist.h"
26 #include "system/network.h"
27 #include "system/filesys.h"
28 #include "../include/ctdb_private.h"
29 #include "../common/rb_tree.h"
30 #include "common/reqid.h"
31 #include "common/system.h"
33 struct ctdb_sticky_record {
34 struct ctdb_context *ctdb;
35 struct ctdb_db_context *ctdb_db;
40 find the ctdb_db from a db index
42 struct ctdb_db_context *find_ctdb_db(struct ctdb_context *ctdb, uint32_t id)
44 struct ctdb_db_context *ctdb_db;
46 for (ctdb_db=ctdb->db_list; ctdb_db; ctdb_db=ctdb_db->next) {
47 if (ctdb_db->db_id == id) {
55 a varient of input packet that can be used in lock requeue
57 static void ctdb_call_input_pkt(void *p, struct ctdb_req_header *hdr)
59 struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
60 ctdb_input_pkt(ctdb, hdr);
67 static void ctdb_send_error(struct ctdb_context *ctdb,
68 struct ctdb_req_header *hdr, uint32_t status,
69 const char *fmt, ...) PRINTF_ATTRIBUTE(4,5);
70 static void ctdb_send_error(struct ctdb_context *ctdb,
71 struct ctdb_req_header *hdr, uint32_t status,
75 struct ctdb_reply_error *r;
79 if (ctdb->methods == NULL) {
80 DEBUG(DEBUG_INFO,(__location__ " Failed to send error. Transport is DOWN\n"));
85 msg = talloc_vasprintf(ctdb, fmt, ap);
87 ctdb_fatal(ctdb, "Unable to allocate error in ctdb_send_error\n");
91 msglen = strlen(msg)+1;
92 len = offsetof(struct ctdb_reply_error, msg);
93 r = ctdb_transport_allocate(ctdb, msg, CTDB_REPLY_ERROR, len + msglen,
94 struct ctdb_reply_error);
95 CTDB_NO_MEMORY_FATAL(ctdb, r);
97 r->hdr.destnode = hdr->srcnode;
98 r->hdr.reqid = hdr->reqid;
101 memcpy(&r->msg[0], msg, msglen);
103 ctdb_queue_packet(ctdb, &r->hdr);
110 * send a redirect reply
112 * The logic behind this function is this:
114 * A client wants to grab a record and sends a CTDB_REQ_CALL packet
115 * to its local ctdb (ctdb_request_call). If the node is not itself
116 * the record's DMASTER, it first redirects the packet to the
117 * record's LMASTER. The LMASTER then redirects the call packet to
118 * the current DMASTER. Note that this works because of this: When
119 * a record is migrated off a node, then the new DMASTER is stored
120 * in the record's copy on the former DMASTER.
122 static void ctdb_call_send_redirect(struct ctdb_context *ctdb,
123 struct ctdb_db_context *ctdb_db,
125 struct ctdb_req_call *c,
126 struct ctdb_ltdb_header *header)
128 uint32_t lmaster = ctdb_lmaster(ctdb, &key);
130 c->hdr.destnode = lmaster;
131 if (ctdb->pnn == lmaster) {
132 c->hdr.destnode = header->dmaster;
136 if (c->hopcount%100 > 95) {
137 DEBUG(DEBUG_WARNING,("High hopcount %d dbid:%s "
138 "key:0x%08x reqid=%08x pnn:%d src:%d lmaster:%d "
139 "header->dmaster:%d dst:%d\n",
140 c->hopcount, ctdb_db->db_name, ctdb_hash(&key),
141 c->hdr.reqid, ctdb->pnn, c->hdr.srcnode, lmaster,
142 header->dmaster, c->hdr.destnode));
145 ctdb_queue_packet(ctdb, &c->hdr);
152 caller must have the chainlock before calling this routine. Caller must be
155 static void ctdb_send_dmaster_reply(struct ctdb_db_context *ctdb_db,
156 struct ctdb_ltdb_header *header,
157 TDB_DATA key, TDB_DATA data,
158 uint32_t new_dmaster,
161 struct ctdb_context *ctdb = ctdb_db->ctdb;
162 struct ctdb_reply_dmaster *r;
166 if (ctdb->pnn != ctdb_lmaster(ctdb, &key)) {
167 DEBUG(DEBUG_ALERT,(__location__ " Caller is not lmaster!\n"));
171 header->dmaster = new_dmaster;
172 ret = ctdb_ltdb_store(ctdb_db, key, header, data);
174 ctdb_fatal(ctdb, "ctdb_send_dmaster_reply unable to update dmaster");
178 if (ctdb->methods == NULL) {
179 ctdb_fatal(ctdb, "ctdb_send_dmaster_reply cant update dmaster since transport is down");
183 /* put the packet on a temporary context, allowing us to safely free
184 it below even if ctdb_reply_dmaster() has freed it already */
185 tmp_ctx = talloc_new(ctdb);
187 /* send the CTDB_REPLY_DMASTER */
188 len = offsetof(struct ctdb_reply_dmaster, data) + key.dsize + data.dsize + sizeof(uint32_t);
189 r = ctdb_transport_allocate(ctdb, tmp_ctx, CTDB_REPLY_DMASTER, len,
190 struct ctdb_reply_dmaster);
191 CTDB_NO_MEMORY_FATAL(ctdb, r);
193 r->hdr.destnode = new_dmaster;
194 r->hdr.reqid = reqid;
195 r->hdr.generation = ctdb_db->generation;
196 r->rsn = header->rsn;
197 r->keylen = key.dsize;
198 r->datalen = data.dsize;
199 r->db_id = ctdb_db->db_id;
200 memcpy(&r->data[0], key.dptr, key.dsize);
201 memcpy(&r->data[key.dsize], data.dptr, data.dsize);
202 memcpy(&r->data[key.dsize+data.dsize], &header->flags, sizeof(uint32_t));
204 ctdb_queue_packet(ctdb, &r->hdr);
206 talloc_free(tmp_ctx);
210 send a dmaster request (give another node the dmaster for a record)
212 This is always sent to the lmaster, which ensures that the lmaster
213 always knows who the dmaster is. The lmaster will then send a
214 CTDB_REPLY_DMASTER to the new dmaster
216 static void ctdb_call_send_dmaster(struct ctdb_db_context *ctdb_db,
217 struct ctdb_req_call *c,
218 struct ctdb_ltdb_header *header,
219 TDB_DATA *key, TDB_DATA *data)
221 struct ctdb_req_dmaster *r;
222 struct ctdb_context *ctdb = ctdb_db->ctdb;
224 uint32_t lmaster = ctdb_lmaster(ctdb, key);
226 if (ctdb->methods == NULL) {
227 ctdb_fatal(ctdb, "Failed ctdb_call_send_dmaster since transport is down");
231 if (data->dsize != 0) {
232 header->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
235 if (lmaster == ctdb->pnn) {
236 ctdb_send_dmaster_reply(ctdb_db, header, *key, *data,
237 c->hdr.srcnode, c->hdr.reqid);
241 len = offsetof(struct ctdb_req_dmaster, data) + key->dsize + data->dsize
243 r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_DMASTER, len,
244 struct ctdb_req_dmaster);
245 CTDB_NO_MEMORY_FATAL(ctdb, r);
246 r->hdr.destnode = lmaster;
247 r->hdr.reqid = c->hdr.reqid;
248 r->hdr.generation = ctdb_db->generation;
250 r->rsn = header->rsn;
251 r->dmaster = c->hdr.srcnode;
252 r->keylen = key->dsize;
253 r->datalen = data->dsize;
254 memcpy(&r->data[0], key->dptr, key->dsize);
255 memcpy(&r->data[key->dsize], data->dptr, data->dsize);
256 memcpy(&r->data[key->dsize + data->dsize], &header->flags, sizeof(uint32_t));
258 header->dmaster = c->hdr.srcnode;
259 if (ctdb_ltdb_store(ctdb_db, *key, header, *data) != 0) {
260 ctdb_fatal(ctdb, "Failed to store record in ctdb_call_send_dmaster");
263 ctdb_queue_packet(ctdb, &r->hdr);
268 static void ctdb_sticky_pindown_timeout(struct tevent_context *ev,
269 struct tevent_timer *te,
270 struct timeval t, void *private_data)
272 struct ctdb_sticky_record *sr = talloc_get_type(private_data,
273 struct ctdb_sticky_record);
275 DEBUG(DEBUG_ERR,("Pindown timeout db:%s unstick record\n", sr->ctdb_db->db_name));
276 if (sr->pindown != NULL) {
277 talloc_free(sr->pindown);
283 ctdb_set_sticky_pindown(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key)
285 TALLOC_CTX *tmp_ctx = talloc_new(NULL);
287 struct ctdb_sticky_record *sr;
289 k = ctdb_key_to_idkey(tmp_ctx, key);
291 DEBUG(DEBUG_ERR,("Failed to allocate key for sticky record\n"));
292 talloc_free(tmp_ctx);
296 sr = trbt_lookuparray32(ctdb_db->sticky_records, k[0], &k[0]);
298 talloc_free(tmp_ctx);
302 talloc_free(tmp_ctx);
304 if (sr->pindown == NULL) {
305 DEBUG(DEBUG_ERR,("Pinning down record in %s for %d ms\n", ctdb_db->db_name, ctdb->tunable.sticky_pindown));
306 sr->pindown = talloc_new(sr);
307 if (sr->pindown == NULL) {
308 DEBUG(DEBUG_ERR,("Failed to allocate pindown context for sticky record\n"));
311 tevent_add_timer(ctdb->ev, sr->pindown,
312 timeval_current_ofs(ctdb->tunable.sticky_pindown / 1000,
313 (ctdb->tunable.sticky_pindown * 1000) % 1000000),
314 ctdb_sticky_pindown_timeout, sr);
321 called when a CTDB_REPLY_DMASTER packet comes in, or when the lmaster
322 gets a CTDB_REQUEST_DMASTER for itself. We become the dmaster.
324 must be called with the chainlock held. This function releases the chainlock
326 static void ctdb_become_dmaster(struct ctdb_db_context *ctdb_db,
327 struct ctdb_req_header *hdr,
328 TDB_DATA key, TDB_DATA data,
329 uint64_t rsn, uint32_t record_flags)
331 struct ctdb_call_state *state;
332 struct ctdb_context *ctdb = ctdb_db->ctdb;
333 struct ctdb_ltdb_header header;
336 DEBUG(DEBUG_DEBUG,("pnn %u dmaster response %08x\n", ctdb->pnn, ctdb_hash(&key)));
340 header.dmaster = ctdb->pnn;
341 header.flags = record_flags;
343 state = reqid_find(ctdb->idr, hdr->reqid, struct ctdb_call_state);
346 if (state->call->flags & CTDB_CALL_FLAG_VACUUM_MIGRATION) {
348 * We temporarily add the VACUUM_MIGRATED flag to
349 * the record flags, so that ctdb_ltdb_store can
350 * decide whether the record should be stored or
353 header.flags |= CTDB_REC_FLAG_VACUUM_MIGRATED;
357 if (ctdb_ltdb_store(ctdb_db, key, &header, data) != 0) {
358 ctdb_fatal(ctdb, "ctdb_reply_dmaster store failed\n");
360 ret = ctdb_ltdb_unlock(ctdb_db, key);
362 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
367 /* we just became DMASTER and this database is "sticky",
368 see if the record is flagged as "hot" and set up a pin-down
369 context to stop migrations for a little while if so
371 if (ctdb_db->sticky) {
372 ctdb_set_sticky_pindown(ctdb, ctdb_db, key);
376 DEBUG(DEBUG_ERR,("pnn %u Invalid reqid %u in ctdb_become_dmaster from node %u\n",
377 ctdb->pnn, hdr->reqid, hdr->srcnode));
379 ret = ctdb_ltdb_unlock(ctdb_db, key);
381 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
386 if (key.dsize != state->call->key.dsize || memcmp(key.dptr, state->call->key.dptr, key.dsize)) {
387 DEBUG(DEBUG_ERR, ("Got bogus DMASTER packet reqid:%u from node %u. Key does not match key held in matching idr.\n", hdr->reqid, hdr->srcnode));
389 ret = ctdb_ltdb_unlock(ctdb_db, key);
391 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
396 if (hdr->reqid != state->reqid) {
397 /* we found a record but it was the wrong one */
398 DEBUG(DEBUG_ERR, ("Dropped orphan in ctdb_become_dmaster with reqid:%u\n from node %u", hdr->reqid, hdr->srcnode));
400 ret = ctdb_ltdb_unlock(ctdb_db, key);
402 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
407 ctdb_call_local(ctdb_db, state->call, &header, state, &data, true);
409 ret = ctdb_ltdb_unlock(ctdb_db, state->call->key);
411 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
414 state->state = CTDB_CALL_DONE;
415 if (state->async.fn) {
416 state->async.fn(state);
420 struct dmaster_defer_call {
421 struct dmaster_defer_call *next, *prev;
422 struct ctdb_context *ctdb;
423 struct ctdb_req_header *hdr;
426 struct dmaster_defer_queue {
427 struct ctdb_db_context *ctdb_db;
429 struct dmaster_defer_call *deferred_calls;
432 static void dmaster_defer_reprocess(struct tevent_context *ev,
433 struct tevent_timer *te,
437 struct dmaster_defer_call *call = talloc_get_type(
438 private_data, struct dmaster_defer_call);
440 ctdb_input_pkt(call->ctdb, call->hdr);
444 static int dmaster_defer_queue_destructor(struct dmaster_defer_queue *ddq)
446 /* Ignore requests, if database recovery happens in-between. */
447 if (ddq->generation != ddq->ctdb_db->generation) {
451 while (ddq->deferred_calls != NULL) {
452 struct dmaster_defer_call *call = ddq->deferred_calls;
454 DLIST_REMOVE(ddq->deferred_calls, call);
456 talloc_steal(call->ctdb, call);
457 tevent_add_timer(call->ctdb->ev, call, timeval_zero(),
458 dmaster_defer_reprocess, call);
463 static void *insert_ddq_callback(void *parm, void *data)
472 * This function is used to reigster a key in database that needs to be updated.
473 * Any requests for that key should get deferred till this is completed.
475 static int dmaster_defer_setup(struct ctdb_db_context *ctdb_db,
476 struct ctdb_req_header *hdr,
480 struct dmaster_defer_queue *ddq;
482 k = ctdb_key_to_idkey(hdr, key);
484 DEBUG(DEBUG_ERR, ("Failed to allocate key for dmaster defer setup\n"));
489 ddq = trbt_lookuparray32(ctdb_db->defer_dmaster, k[0], k);
491 if (ddq->generation == ctdb_db->generation) {
496 /* Recovery ocurred - get rid of old queue. All the deferred
497 * requests will be resent anyway from ctdb_call_resend_db.
502 ddq = talloc(hdr, struct dmaster_defer_queue);
504 DEBUG(DEBUG_ERR, ("Failed to allocate dmaster defer queue\n"));
508 ddq->ctdb_db = ctdb_db;
509 ddq->generation = hdr->generation;
510 ddq->deferred_calls = NULL;
512 trbt_insertarray32_callback(ctdb_db->defer_dmaster, k[0], k,
513 insert_ddq_callback, ddq);
514 talloc_set_destructor(ddq, dmaster_defer_queue_destructor);
520 static int dmaster_defer_add(struct ctdb_db_context *ctdb_db,
521 struct ctdb_req_header *hdr,
524 struct dmaster_defer_queue *ddq;
525 struct dmaster_defer_call *call;
528 k = ctdb_key_to_idkey(hdr, key);
530 DEBUG(DEBUG_ERR, ("Failed to allocate key for dmaster defer add\n"));
534 ddq = trbt_lookuparray32(ctdb_db->defer_dmaster, k[0], k);
542 if (ddq->generation != hdr->generation) {
543 talloc_set_destructor(ddq, NULL);
548 call = talloc(ddq, struct dmaster_defer_call);
550 DEBUG(DEBUG_ERR, ("Failed to allocate dmaster defer call\n"));
554 call->ctdb = ctdb_db->ctdb;
555 call->hdr = talloc_steal(call, hdr);
557 DLIST_ADD_END(ddq->deferred_calls, call, NULL);
563 called when a CTDB_REQ_DMASTER packet comes in
565 this comes into the lmaster for a record when the current dmaster
566 wants to give up the dmaster role and give it to someone else
568 void ctdb_request_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
570 struct ctdb_req_dmaster *c = (struct ctdb_req_dmaster *)hdr;
571 TDB_DATA key, data, data2;
572 struct ctdb_ltdb_header header;
573 struct ctdb_db_context *ctdb_db;
574 uint32_t record_flags = 0;
578 ctdb_db = find_ctdb_db(ctdb, c->db_id);
580 ctdb_send_error(ctdb, hdr, -1,
581 "Unknown database in request. db_id==0x%08x",
586 if (hdr->generation != ctdb_db->generation) {
588 ("ctdb operation %u request %u from node %u to %u had an"
589 " invalid generation:%u while our generation is:%u\n",
590 hdr->operation, hdr->reqid, hdr->srcnode, hdr->destnode,
591 hdr->generation, ctdb_db->generation));
596 key.dsize = c->keylen;
597 data.dptr = c->data + c->keylen;
598 data.dsize = c->datalen;
599 len = offsetof(struct ctdb_req_dmaster, data) + key.dsize + data.dsize
601 if (len <= c->hdr.length) {
602 memcpy(&record_flags, &c->data[c->keylen + c->datalen],
603 sizeof(record_flags));
606 dmaster_defer_setup(ctdb_db, hdr, key);
608 /* fetch the current record */
609 ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header, hdr, &data2,
610 ctdb_call_input_pkt, ctdb, false);
612 ctdb_fatal(ctdb, "ctdb_req_dmaster failed to fetch record");
616 DEBUG(DEBUG_INFO,(__location__ " deferring ctdb_request_dmaster\n"));
620 if (ctdb_lmaster(ctdb, &key) != ctdb->pnn) {
621 DEBUG(DEBUG_ALERT,("pnn %u dmaster request to non-lmaster lmaster=%u gen=%u curgen=%u\n",
622 ctdb->pnn, ctdb_lmaster(ctdb, &key),
623 hdr->generation, ctdb->vnn_map->generation));
624 ctdb_fatal(ctdb, "ctdb_req_dmaster to non-lmaster");
627 DEBUG(DEBUG_DEBUG,("pnn %u dmaster request on %08x for %u from %u\n",
628 ctdb->pnn, ctdb_hash(&key), c->dmaster, c->hdr.srcnode));
630 /* its a protocol error if the sending node is not the current dmaster */
631 if (header.dmaster != hdr->srcnode) {
632 DEBUG(DEBUG_ALERT,("pnn %u dmaster request for new-dmaster %u from non-master %u real-dmaster=%u key %08x dbid 0x%08x gen=%u curgen=%u c->rsn=%llu header.rsn=%llu reqid=%u keyval=0x%08x\n",
633 ctdb->pnn, c->dmaster, hdr->srcnode, header.dmaster, ctdb_hash(&key),
634 ctdb_db->db_id, hdr->generation, ctdb_db->generation,
635 (unsigned long long)c->rsn, (unsigned long long)header.rsn, c->hdr.reqid,
636 (key.dsize >= 4)?(*(uint32_t *)key.dptr):0));
637 if (header.rsn != 0 || header.dmaster != ctdb->pnn) {
638 DEBUG(DEBUG_ERR,("ctdb_req_dmaster from non-master. Force a recovery.\n"));
640 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
641 ctdb_ltdb_unlock(ctdb_db, key);
646 if (header.rsn > c->rsn) {
647 DEBUG(DEBUG_ALERT,("pnn %u dmaster request with older RSN new-dmaster %u from %u real-dmaster=%u key %08x dbid 0x%08x gen=%u curgen=%u c->rsn=%llu header.rsn=%llu reqid=%u\n",
648 ctdb->pnn, c->dmaster, hdr->srcnode, header.dmaster, ctdb_hash(&key),
649 ctdb_db->db_id, hdr->generation, ctdb_db->generation,
650 (unsigned long long)c->rsn, (unsigned long long)header.rsn, c->hdr.reqid));
653 /* use the rsn from the sending node */
656 /* store the record flags from the sending node */
657 header.flags = record_flags;
659 /* check if the new dmaster is the lmaster, in which case we
660 skip the dmaster reply */
661 if (c->dmaster == ctdb->pnn) {
662 ctdb_become_dmaster(ctdb_db, hdr, key, data, c->rsn, record_flags);
664 ctdb_send_dmaster_reply(ctdb_db, &header, key, data, c->dmaster, hdr->reqid);
666 ret = ctdb_ltdb_unlock(ctdb_db, key);
668 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
673 static void ctdb_sticky_record_timeout(struct tevent_context *ev,
674 struct tevent_timer *te,
675 struct timeval t, void *private_data)
677 struct ctdb_sticky_record *sr = talloc_get_type(private_data,
678 struct ctdb_sticky_record);
682 static void *ctdb_make_sticky_record_callback(void *parm, void *data)
685 DEBUG(DEBUG_ERR,("Already have sticky record registered. Free old %p and create new %p\n", data, parm));
692 ctdb_make_record_sticky(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key)
694 TALLOC_CTX *tmp_ctx = talloc_new(NULL);
696 struct ctdb_sticky_record *sr;
698 k = ctdb_key_to_idkey(tmp_ctx, key);
700 DEBUG(DEBUG_ERR,("Failed to allocate key for sticky record\n"));
701 talloc_free(tmp_ctx);
705 sr = trbt_lookuparray32(ctdb_db->sticky_records, k[0], &k[0]);
707 talloc_free(tmp_ctx);
711 sr = talloc(ctdb_db->sticky_records, struct ctdb_sticky_record);
713 talloc_free(tmp_ctx);
714 DEBUG(DEBUG_ERR,("Failed to allocate sticky record structure\n"));
719 sr->ctdb_db = ctdb_db;
722 DEBUG(DEBUG_ERR,("Make record sticky for %d seconds in db %s key:0x%08x.\n",
723 ctdb->tunable.sticky_duration,
724 ctdb_db->db_name, ctdb_hash(&key)));
726 trbt_insertarray32_callback(ctdb_db->sticky_records, k[0], &k[0], ctdb_make_sticky_record_callback, sr);
728 tevent_add_timer(ctdb->ev, sr,
729 timeval_current_ofs(ctdb->tunable.sticky_duration, 0),
730 ctdb_sticky_record_timeout, sr);
732 talloc_free(tmp_ctx);
736 struct pinned_down_requeue_handle {
737 struct ctdb_context *ctdb;
738 struct ctdb_req_header *hdr;
741 struct pinned_down_deferred_call {
742 struct ctdb_context *ctdb;
743 struct ctdb_req_header *hdr;
746 static void pinned_down_requeue(struct tevent_context *ev,
747 struct tevent_timer *te,
748 struct timeval t, void *private_data)
750 struct pinned_down_requeue_handle *handle = talloc_get_type(private_data, struct pinned_down_requeue_handle);
751 struct ctdb_context *ctdb = handle->ctdb;
753 talloc_steal(ctdb, handle->hdr);
754 ctdb_call_input_pkt(ctdb, handle->hdr);
759 static int pinned_down_destructor(struct pinned_down_deferred_call *pinned_down)
761 struct ctdb_context *ctdb = pinned_down->ctdb;
762 struct pinned_down_requeue_handle *handle = talloc(ctdb, struct pinned_down_requeue_handle);
764 handle->ctdb = pinned_down->ctdb;
765 handle->hdr = pinned_down->hdr;
766 talloc_steal(handle, handle->hdr);
768 tevent_add_timer(ctdb->ev, handle, timeval_zero(),
769 pinned_down_requeue, handle);
775 ctdb_defer_pinned_down_request(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_req_header *hdr)
777 TALLOC_CTX *tmp_ctx = talloc_new(NULL);
779 struct ctdb_sticky_record *sr;
780 struct pinned_down_deferred_call *pinned_down;
782 k = ctdb_key_to_idkey(tmp_ctx, key);
784 DEBUG(DEBUG_ERR,("Failed to allocate key for sticky record\n"));
785 talloc_free(tmp_ctx);
789 sr = trbt_lookuparray32(ctdb_db->sticky_records, k[0], &k[0]);
791 talloc_free(tmp_ctx);
795 talloc_free(tmp_ctx);
797 if (sr->pindown == NULL) {
801 pinned_down = talloc(sr->pindown, struct pinned_down_deferred_call);
802 if (pinned_down == NULL) {
803 DEBUG(DEBUG_ERR,("Failed to allocate structure for deferred pinned down request\n"));
807 pinned_down->ctdb = ctdb;
808 pinned_down->hdr = hdr;
810 talloc_set_destructor(pinned_down, pinned_down_destructor);
811 talloc_steal(pinned_down, hdr);
817 ctdb_update_db_stat_hot_keys(struct ctdb_db_context *ctdb_db, TDB_DATA key, int hopcount)
821 /* smallest value is always at index 0 */
822 if (hopcount <= ctdb_db->statistics.hot_keys[0].count) {
826 /* see if we already know this key */
827 for (i = 0; i < MAX_HOT_KEYS; i++) {
828 if (key.dsize != ctdb_db->statistics.hot_keys[i].key.dsize) {
831 if (memcmp(key.dptr, ctdb_db->statistics.hot_keys[i].key.dptr, key.dsize)) {
834 /* found an entry for this key */
835 if (hopcount <= ctdb_db->statistics.hot_keys[i].count) {
838 ctdb_db->statistics.hot_keys[i].count = hopcount;
842 if (ctdb_db->statistics.num_hot_keys < MAX_HOT_KEYS) {
843 id = ctdb_db->statistics.num_hot_keys;
844 ctdb_db->statistics.num_hot_keys++;
849 if (ctdb_db->statistics.hot_keys[id].key.dptr != NULL) {
850 talloc_free(ctdb_db->statistics.hot_keys[id].key.dptr);
852 ctdb_db->statistics.hot_keys[id].key.dsize = key.dsize;
853 ctdb_db->statistics.hot_keys[id].key.dptr = talloc_memdup(ctdb_db, key.dptr, key.dsize);
854 ctdb_db->statistics.hot_keys[id].count = hopcount;
855 DEBUG(DEBUG_NOTICE,("Updated hot key database=%s key=0x%08x id=%d hop_count=%d\n",
856 ctdb_db->db_name, ctdb_hash(&key), id, hopcount));
859 for (i = 1; i < MAX_HOT_KEYS; i++) {
860 if (ctdb_db->statistics.hot_keys[i].count == 0) {
863 if (ctdb_db->statistics.hot_keys[i].count < ctdb_db->statistics.hot_keys[0].count) {
864 hopcount = ctdb_db->statistics.hot_keys[i].count;
865 ctdb_db->statistics.hot_keys[i].count = ctdb_db->statistics.hot_keys[0].count;
866 ctdb_db->statistics.hot_keys[0].count = hopcount;
868 key = ctdb_db->statistics.hot_keys[i].key;
869 ctdb_db->statistics.hot_keys[i].key = ctdb_db->statistics.hot_keys[0].key;
870 ctdb_db->statistics.hot_keys[0].key = key;
876 called when a CTDB_REQ_CALL packet comes in
878 void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
880 struct ctdb_req_call *c = (struct ctdb_req_call *)hdr;
882 struct ctdb_reply_call *r;
884 struct ctdb_ltdb_header header;
885 struct ctdb_call *call;
886 struct ctdb_db_context *ctdb_db;
887 int tmp_count, bucket;
889 if (ctdb->methods == NULL) {
890 DEBUG(DEBUG_INFO,(__location__ " Failed ctdb_request_call. Transport is DOWN\n"));
894 ctdb_db = find_ctdb_db(ctdb, c->db_id);
896 ctdb_send_error(ctdb, hdr, -1,
897 "Unknown database in request. db_id==0x%08x",
902 if (hdr->generation != ctdb_db->generation) {
904 ("ctdb operation %u request %u from node %u to %u had an"
905 " invalid generation:%u while our generation is:%u\n",
906 hdr->operation, hdr->reqid, hdr->srcnode, hdr->destnode,
907 hdr->generation, ctdb_db->generation));
911 call = talloc(hdr, struct ctdb_call);
912 CTDB_NO_MEMORY_FATAL(ctdb, call);
914 call->call_id = c->callid;
915 call->key.dptr = c->data;
916 call->key.dsize = c->keylen;
917 call->call_data.dptr = c->data + c->keylen;
918 call->call_data.dsize = c->calldatalen;
919 call->reply_data.dptr = NULL;
920 call->reply_data.dsize = 0;
923 /* If this record is pinned down we should defer the
924 request until the pindown times out
926 if (ctdb_db->sticky) {
927 if (ctdb_defer_pinned_down_request(ctdb, ctdb_db, call->key, hdr) == 0) {
929 ("Defer request for pinned down record in %s\n", ctdb_db->db_name));
935 if (dmaster_defer_add(ctdb_db, hdr, call->key) == 0) {
940 /* determine if we are the dmaster for this key. This also
941 fetches the record data (if any), thus avoiding a 2nd fetch of the data
942 if the call will be answered locally */
944 ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, call->key, &header, hdr, &data,
945 ctdb_call_input_pkt, ctdb, false);
947 ctdb_send_error(ctdb, hdr, ret, "ltdb fetch failed in ctdb_request_call");
952 DEBUG(DEBUG_INFO,(__location__ " deferred ctdb_request_call\n"));
957 /* Dont do READONLY if we dont have a tracking database */
958 if ((c->flags & CTDB_WANT_READONLY) && !ctdb_db->readonly) {
959 c->flags &= ~CTDB_WANT_READONLY;
962 if (header.flags & CTDB_REC_RO_REVOKE_COMPLETE) {
963 header.flags &= ~CTDB_REC_RO_FLAGS;
964 CTDB_INCREMENT_STAT(ctdb, total_ro_revokes);
965 CTDB_INCREMENT_DB_STAT(ctdb_db, db_ro_revokes);
966 if (ctdb_ltdb_store(ctdb_db, call->key, &header, data) != 0) {
967 ctdb_fatal(ctdb, "Failed to write header with cleared REVOKE flag");
969 /* and clear out the tracking data */
970 if (tdb_delete(ctdb_db->rottdb, call->key) != 0) {
971 DEBUG(DEBUG_ERR,(__location__ " Failed to clear out trackingdb record\n"));
975 /* if we are revoking, we must defer all other calls until the revoke
978 if (header.flags & CTDB_REC_RO_REVOKING_READONLY) {
979 talloc_free(data.dptr);
980 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
982 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, call->key, hdr, ctdb_call_input_pkt, ctdb) != 0) {
983 ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
990 * If we are not the dmaster and are not hosting any delegations,
991 * then we redirect the request to the node than can answer it
992 * (the lmaster or the dmaster).
994 if ((header.dmaster != ctdb->pnn)
995 && (!(header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) ) {
996 talloc_free(data.dptr);
997 ctdb_call_send_redirect(ctdb, ctdb_db, call->key, c, &header);
999 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
1001 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
1007 if ( (!(c->flags & CTDB_WANT_READONLY))
1008 && (header.flags & (CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY)) ) {
1009 header.flags |= CTDB_REC_RO_REVOKING_READONLY;
1010 if (ctdb_ltdb_store(ctdb_db, call->key, &header, data) != 0) {
1011 ctdb_fatal(ctdb, "Failed to store record with HAVE_DELEGATIONS set");
1013 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
1015 if (ctdb_start_revoke_ro_record(ctdb, ctdb_db, call->key, &header, data) != 0) {
1016 ctdb_fatal(ctdb, "Failed to start record revoke");
1018 talloc_free(data.dptr);
1020 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, call->key, hdr, ctdb_call_input_pkt, ctdb) != 0) {
1021 ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
1028 /* If this is the first request for delegation. bump rsn and set
1029 * the delegations flag
1031 if ((c->flags & CTDB_WANT_READONLY)
1032 && (c->callid == CTDB_FETCH_WITH_HEADER_FUNC)
1033 && (!(header.flags & CTDB_REC_RO_HAVE_DELEGATIONS))) {
1035 header.flags |= CTDB_REC_RO_HAVE_DELEGATIONS;
1036 if (ctdb_ltdb_store(ctdb_db, call->key, &header, data) != 0) {
1037 ctdb_fatal(ctdb, "Failed to store record with HAVE_DELEGATIONS set");
1040 if ((c->flags & CTDB_WANT_READONLY)
1041 && (call->call_id == CTDB_FETCH_WITH_HEADER_FUNC)) {
1044 tdata = tdb_fetch(ctdb_db->rottdb, call->key);
1045 if (ctdb_trackingdb_add_pnn(ctdb, &tdata, c->hdr.srcnode) != 0) {
1046 ctdb_fatal(ctdb, "Failed to add node to trackingdb");
1048 if (tdb_store(ctdb_db->rottdb, call->key, tdata, TDB_REPLACE) != 0) {
1049 ctdb_fatal(ctdb, "Failed to store trackingdb data");
1053 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
1055 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
1058 len = offsetof(struct ctdb_reply_call, data) + data.dsize + sizeof(struct ctdb_ltdb_header);
1059 r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REPLY_CALL, len,
1060 struct ctdb_reply_call);
1061 CTDB_NO_MEMORY_FATAL(ctdb, r);
1062 r->hdr.destnode = c->hdr.srcnode;
1063 r->hdr.reqid = c->hdr.reqid;
1064 r->hdr.generation = ctdb_db->generation;
1066 r->datalen = data.dsize + sizeof(struct ctdb_ltdb_header);
1068 header.flags |= CTDB_REC_RO_HAVE_READONLY;
1069 header.flags &= ~CTDB_REC_RO_HAVE_DELEGATIONS;
1070 memcpy(&r->data[0], &header, sizeof(struct ctdb_ltdb_header));
1073 memcpy(&r->data[sizeof(struct ctdb_ltdb_header)], data.dptr, data.dsize);
1076 ctdb_queue_packet(ctdb, &r->hdr);
1077 CTDB_INCREMENT_STAT(ctdb, total_ro_delegations);
1078 CTDB_INCREMENT_DB_STAT(ctdb_db, db_ro_delegations);
1085 CTDB_UPDATE_STAT(ctdb, max_hop_count, c->hopcount);
1086 tmp_count = c->hopcount;
1092 if (bucket >= MAX_COUNT_BUCKETS) {
1093 bucket = MAX_COUNT_BUCKETS - 1;
1095 CTDB_INCREMENT_STAT(ctdb, hop_count_bucket[bucket]);
1096 CTDB_INCREMENT_DB_STAT(ctdb_db, hop_count_bucket[bucket]);
1097 ctdb_update_db_stat_hot_keys(ctdb_db, call->key, c->hopcount);
1099 /* If this database supports sticky records, then check if the
1100 hopcount is big. If it is it means the record is hot and we
1101 should make it sticky.
1103 if (ctdb_db->sticky && c->hopcount >= ctdb->tunable.hopcount_make_sticky) {
1104 ctdb_make_record_sticky(ctdb, ctdb_db, call->key);
1108 /* Try if possible to migrate the record off to the caller node.
1109 * From the clients perspective a fetch of the data is just as
1110 * expensive as a migration.
1112 if (c->hdr.srcnode != ctdb->pnn) {
1113 if (ctdb_db->persistent_state) {
1114 DEBUG(DEBUG_INFO, (__location__ " refusing migration"
1115 " of key %s while transaction is active\n",
1116 (char *)call->key.dptr));
1118 DEBUG(DEBUG_DEBUG,("pnn %u starting migration of %08x to %u\n",
1119 ctdb->pnn, ctdb_hash(&(call->key)), c->hdr.srcnode));
1120 ctdb_call_send_dmaster(ctdb_db, c, &header, &(call->key), &data);
1121 talloc_free(data.dptr);
1123 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
1125 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
1132 ret = ctdb_call_local(ctdb_db, call, &header, hdr, &data, true);
1134 DEBUG(DEBUG_ERR,(__location__ " ctdb_call_local failed\n"));
1138 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
1140 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
1143 len = offsetof(struct ctdb_reply_call, data) + call->reply_data.dsize;
1144 r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REPLY_CALL, len,
1145 struct ctdb_reply_call);
1146 CTDB_NO_MEMORY_FATAL(ctdb, r);
1147 r->hdr.destnode = hdr->srcnode;
1148 r->hdr.reqid = hdr->reqid;
1149 r->hdr.generation = ctdb_db->generation;
1150 r->status = call->status;
1151 r->datalen = call->reply_data.dsize;
1152 if (call->reply_data.dsize) {
1153 memcpy(&r->data[0], call->reply_data.dptr, call->reply_data.dsize);
1156 ctdb_queue_packet(ctdb, &r->hdr);
1163 * called when a CTDB_REPLY_CALL packet comes in
1165 * This packet comes in response to a CTDB_REQ_CALL request packet. It
1166 * contains any reply data from the call
1168 void ctdb_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
1170 struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr;
1171 struct ctdb_call_state *state;
1173 state = reqid_find(ctdb->idr, hdr->reqid, struct ctdb_call_state);
1174 if (state == NULL) {
1175 DEBUG(DEBUG_ERR, (__location__ " reqid %u not found\n", hdr->reqid));
1179 if (hdr->reqid != state->reqid) {
1180 /* we found a record but it was the wrong one */
1181 DEBUG(DEBUG_ERR, ("Dropped orphaned call reply with reqid:%u\n",hdr->reqid));
1185 if (hdr->generation != state->generation) {
1187 ("ctdb operation %u request %u from node %u to %u had an"
1188 " invalid generation:%u while our generation is:%u\n",
1189 hdr->operation, hdr->reqid, hdr->srcnode, hdr->destnode,
1190 hdr->generation, state->generation));
1195 /* read only delegation processing */
1196 /* If we got a FETCH_WITH_HEADER we should check if this is a ro
1197 * delegation since we may need to update the record header
1199 if (state->c->callid == CTDB_FETCH_WITH_HEADER_FUNC) {
1200 struct ctdb_db_context *ctdb_db = state->ctdb_db;
1201 struct ctdb_ltdb_header *header = (struct ctdb_ltdb_header *)&c->data[0];
1202 struct ctdb_ltdb_header oldheader;
1203 TDB_DATA key, data, olddata;
1206 if (!(header->flags & CTDB_REC_RO_HAVE_READONLY)) {
1211 key.dsize = state->c->keylen;
1212 key.dptr = state->c->data;
1213 ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr,
1214 ctdb_call_input_pkt, ctdb, false);
1219 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock in ctdb_reply_call\n"));
1223 ret = ctdb_ltdb_fetch(ctdb_db, key, &oldheader, state, &olddata);
1225 DEBUG(DEBUG_ERR, ("Failed to fetch old record in ctdb_reply_call\n"));
1226 ctdb_ltdb_unlock(ctdb_db, key);
1230 if (header->rsn <= oldheader.rsn) {
1231 ctdb_ltdb_unlock(ctdb_db, key);
1235 if (c->datalen < sizeof(struct ctdb_ltdb_header)) {
1236 DEBUG(DEBUG_ERR,(__location__ " Got FETCH_WITH_HEADER reply with too little data: %d bytes\n", c->datalen));
1237 ctdb_ltdb_unlock(ctdb_db, key);
1241 data.dsize = c->datalen - sizeof(struct ctdb_ltdb_header);
1242 data.dptr = &c->data[sizeof(struct ctdb_ltdb_header)];
1243 ret = ctdb_ltdb_store(ctdb_db, key, header, data);
1245 DEBUG(DEBUG_ERR, ("Failed to store new record in ctdb_reply_call\n"));
1246 ctdb_ltdb_unlock(ctdb_db, key);
1250 ctdb_ltdb_unlock(ctdb_db, key);
1254 state->call->reply_data.dptr = c->data;
1255 state->call->reply_data.dsize = c->datalen;
1256 state->call->status = c->status;
1258 talloc_steal(state, c);
1260 state->state = CTDB_CALL_DONE;
1261 if (state->async.fn) {
1262 state->async.fn(state);
1268 * called when a CTDB_REPLY_DMASTER packet comes in
1270 * This packet comes in from the lmaster in response to a CTDB_REQ_CALL
1271 * request packet. It means that the current dmaster wants to give us
1274 void ctdb_reply_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
1276 struct ctdb_reply_dmaster *c = (struct ctdb_reply_dmaster *)hdr;
1277 struct ctdb_db_context *ctdb_db;
1279 uint32_t record_flags = 0;
1283 ctdb_db = find_ctdb_db(ctdb, c->db_id);
1284 if (ctdb_db == NULL) {
1285 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_reply_dmaster\n", c->db_id));
1289 if (hdr->generation != ctdb_db->generation) {
1291 ("ctdb operation %u request %u from node %u to %u had an"
1292 " invalid generation:%u while our generation is:%u\n",
1293 hdr->operation, hdr->reqid, hdr->srcnode, hdr->destnode,
1294 hdr->generation, ctdb_db->generation));
1299 key.dsize = c->keylen;
1300 data.dptr = &c->data[key.dsize];
1301 data.dsize = c->datalen;
1302 len = offsetof(struct ctdb_reply_dmaster, data) + key.dsize + data.dsize
1304 if (len <= c->hdr.length) {
1305 memcpy(&record_flags, &c->data[c->keylen + c->datalen],
1306 sizeof(record_flags));
1309 dmaster_defer_setup(ctdb_db, hdr, key);
1311 ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr,
1312 ctdb_call_input_pkt, ctdb, false);
1317 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock in ctdb_reply_dmaster\n"));
1321 ctdb_become_dmaster(ctdb_db, hdr, key, data, c->rsn, record_flags);
1326 called when a CTDB_REPLY_ERROR packet comes in
1328 void ctdb_reply_error(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
1330 struct ctdb_reply_error *c = (struct ctdb_reply_error *)hdr;
1331 struct ctdb_call_state *state;
1333 state = reqid_find(ctdb->idr, hdr->reqid, struct ctdb_call_state);
1334 if (state == NULL) {
1335 DEBUG(DEBUG_ERR,("pnn %u Invalid reqid %u in ctdb_reply_error\n",
1336 ctdb->pnn, hdr->reqid));
1340 if (hdr->reqid != state->reqid) {
1341 /* we found a record but it was the wrong one */
1342 DEBUG(DEBUG_ERR, ("Dropped orphaned error reply with reqid:%u\n",hdr->reqid));
1346 talloc_steal(state, c);
1348 state->state = CTDB_CALL_ERROR;
1349 state->errmsg = (char *)c->msg;
1350 if (state->async.fn) {
1351 state->async.fn(state);
1359 static int ctdb_call_destructor(struct ctdb_call_state *state)
1361 DLIST_REMOVE(state->ctdb_db->pending_calls, state);
1362 reqid_remove(state->ctdb_db->ctdb->idr, state->reqid);
1368 called when a ctdb_call needs to be resent after a reconfigure event
1370 static void ctdb_call_resend(struct ctdb_call_state *state)
1372 struct ctdb_context *ctdb = state->ctdb_db->ctdb;
1374 state->generation = state->ctdb_db->generation;
1376 /* use a new reqid, in case the old reply does eventually come in */
1377 reqid_remove(ctdb->idr, state->reqid);
1378 state->reqid = reqid_new(ctdb->idr, state);
1379 state->c->hdr.reqid = state->reqid;
1381 /* update the generation count for this request, so its valid with the new vnn_map */
1382 state->c->hdr.generation = state->generation;
1384 /* send the packet to ourselves, it will be redirected appropriately */
1385 state->c->hdr.destnode = ctdb->pnn;
1387 ctdb_queue_packet(ctdb, &state->c->hdr);
1388 DEBUG(DEBUG_NOTICE,("resent ctdb_call for db %s reqid %u generation %u\n",
1389 state->ctdb_db->db_name, state->reqid, state->generation));
1393 resend all pending calls on recovery
1395 void ctdb_call_resend_db(struct ctdb_db_context *ctdb_db)
1397 struct ctdb_call_state *state, *next;
1399 for (state = ctdb_db->pending_calls; state; state = next) {
1401 ctdb_call_resend(state);
1405 void ctdb_call_resend_all(struct ctdb_context *ctdb)
1407 struct ctdb_db_context *ctdb_db;
1409 for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
1410 ctdb_call_resend_db(ctdb_db);
1415 this allows the caller to setup a async.fn
1417 static void call_local_trigger(struct tevent_context *ev,
1418 struct tevent_timer *te,
1419 struct timeval t, void *private_data)
1421 struct ctdb_call_state *state = talloc_get_type(private_data, struct ctdb_call_state);
1422 if (state->async.fn) {
1423 state->async.fn(state);
1429 construct an event driven local ctdb_call
1431 this is used so that locally processed ctdb_call requests are processed
1432 in an event driven manner
1434 struct ctdb_call_state *ctdb_call_local_send(struct ctdb_db_context *ctdb_db,
1435 struct ctdb_call *call,
1436 struct ctdb_ltdb_header *header,
1439 struct ctdb_call_state *state;
1440 struct ctdb_context *ctdb = ctdb_db->ctdb;
1443 state = talloc_zero(ctdb_db, struct ctdb_call_state);
1444 CTDB_NO_MEMORY_NULL(ctdb, state);
1446 talloc_steal(state, data->dptr);
1448 state->state = CTDB_CALL_DONE;
1449 state->call = talloc(state, struct ctdb_call);
1450 CTDB_NO_MEMORY_NULL(ctdb, state->call);
1451 *(state->call) = *call;
1452 state->ctdb_db = ctdb_db;
1454 ret = ctdb_call_local(ctdb_db, state->call, header, state, data, true);
1456 DEBUG(DEBUG_DEBUG,("ctdb_call_local() failed, ignoring return code %d\n", ret));
1459 tevent_add_timer(ctdb->ev, state, timeval_zero(),
1460 call_local_trigger, state);
1467 make a remote ctdb call - async send. Called in daemon context.
1469 This constructs a ctdb_call request and queues it for processing.
1470 This call never blocks.
1472 struct ctdb_call_state *ctdb_daemon_call_send_remote(struct ctdb_db_context *ctdb_db,
1473 struct ctdb_call *call,
1474 struct ctdb_ltdb_header *header)
1477 struct ctdb_call_state *state;
1478 struct ctdb_context *ctdb = ctdb_db->ctdb;
1480 if (ctdb->methods == NULL) {
1481 DEBUG(DEBUG_INFO,(__location__ " Failed send packet. Transport is down\n"));
1485 state = talloc_zero(ctdb_db, struct ctdb_call_state);
1486 CTDB_NO_MEMORY_NULL(ctdb, state);
1487 state->call = talloc(state, struct ctdb_call);
1488 CTDB_NO_MEMORY_NULL(ctdb, state->call);
1490 state->reqid = reqid_new(ctdb->idr, state);
1491 state->ctdb_db = ctdb_db;
1492 talloc_set_destructor(state, ctdb_call_destructor);
1494 len = offsetof(struct ctdb_req_call, data) + call->key.dsize + call->call_data.dsize;
1495 state->c = ctdb_transport_allocate(ctdb, state, CTDB_REQ_CALL, len,
1496 struct ctdb_req_call);
1497 CTDB_NO_MEMORY_NULL(ctdb, state->c);
1498 state->c->hdr.destnode = header->dmaster;
1500 /* this limits us to 16k outstanding messages - not unreasonable */
1501 state->c->hdr.reqid = state->reqid;
1502 state->c->hdr.generation = ctdb_db->generation;
1503 state->c->flags = call->flags;
1504 state->c->db_id = ctdb_db->db_id;
1505 state->c->callid = call->call_id;
1506 state->c->hopcount = 0;
1507 state->c->keylen = call->key.dsize;
1508 state->c->calldatalen = call->call_data.dsize;
1509 memcpy(&state->c->data[0], call->key.dptr, call->key.dsize);
1510 memcpy(&state->c->data[call->key.dsize],
1511 call->call_data.dptr, call->call_data.dsize);
1512 *(state->call) = *call;
1513 state->call->call_data.dptr = &state->c->data[call->key.dsize];
1514 state->call->key.dptr = &state->c->data[0];
1516 state->state = CTDB_CALL_WAIT;
1517 state->generation = ctdb_db->generation;
1519 DLIST_ADD(ctdb_db->pending_calls, state);
1521 ctdb_queue_packet(ctdb, &state->c->hdr);
1527 make a remote ctdb call - async recv - called in daemon context
1529 This is called when the program wants to wait for a ctdb_call to complete and get the
1530 results. This call will block unless the call has already completed.
1532 int ctdb_daemon_call_recv(struct ctdb_call_state *state, struct ctdb_call *call)
1534 while (state->state < CTDB_CALL_DONE) {
1535 tevent_loop_once(state->ctdb_db->ctdb->ev);
1537 if (state->state != CTDB_CALL_DONE) {
1538 ctdb_set_error(state->ctdb_db->ctdb, "%s", state->errmsg);
1543 if (state->call->reply_data.dsize) {
1544 call->reply_data.dptr = talloc_memdup(call,
1545 state->call->reply_data.dptr,
1546 state->call->reply_data.dsize);
1547 call->reply_data.dsize = state->call->reply_data.dsize;
1549 call->reply_data.dptr = NULL;
1550 call->reply_data.dsize = 0;
1552 call->status = state->call->status;
1559 send a keepalive packet to the other node
1561 void ctdb_send_keepalive(struct ctdb_context *ctdb, uint32_t destnode)
1563 struct ctdb_req_keepalive *r;
1565 if (ctdb->methods == NULL) {
1566 DEBUG(DEBUG_INFO,(__location__ " Failed to send keepalive. Transport is DOWN\n"));
1570 r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_KEEPALIVE,
1571 sizeof(struct ctdb_req_keepalive),
1572 struct ctdb_req_keepalive);
1573 CTDB_NO_MEMORY_FATAL(ctdb, r);
1574 r->hdr.destnode = destnode;
1577 CTDB_INCREMENT_STAT(ctdb, keepalive_packets_sent);
1579 ctdb_queue_packet(ctdb, &r->hdr);
1586 struct revokechild_deferred_call {
1587 struct ctdb_context *ctdb;
1588 struct ctdb_req_header *hdr;
1589 deferred_requeue_fn fn;
1593 struct revokechild_handle {
1594 struct revokechild_handle *next, *prev;
1595 struct ctdb_context *ctdb;
1596 struct ctdb_db_context *ctdb_db;
1597 struct tevent_fd *fde;
1604 struct revokechild_requeue_handle {
1605 struct ctdb_context *ctdb;
1606 struct ctdb_req_header *hdr;
1607 deferred_requeue_fn fn;
1611 static void deferred_call_requeue(struct tevent_context *ev,
1612 struct tevent_timer *te,
1613 struct timeval t, void *private_data)
1615 struct revokechild_requeue_handle *requeue_handle = talloc_get_type(private_data, struct revokechild_requeue_handle);
1617 requeue_handle->fn(requeue_handle->ctx, requeue_handle->hdr);
1618 talloc_free(requeue_handle);
1621 static int deferred_call_destructor(struct revokechild_deferred_call *deferred_call)
1623 struct ctdb_context *ctdb = deferred_call->ctdb;
1624 struct revokechild_requeue_handle *requeue_handle = talloc(ctdb, struct revokechild_requeue_handle);
1625 struct ctdb_req_call *c = (struct ctdb_req_call *)deferred_call->hdr;
1627 requeue_handle->ctdb = ctdb;
1628 requeue_handle->hdr = deferred_call->hdr;
1629 requeue_handle->fn = deferred_call->fn;
1630 requeue_handle->ctx = deferred_call->ctx;
1631 talloc_steal(requeue_handle, requeue_handle->hdr);
1633 /* when revoking, any READONLY requests have 1 second grace to let read/write finish first */
1634 tevent_add_timer(ctdb->ev, requeue_handle,
1635 timeval_current_ofs(c->flags & CTDB_WANT_READONLY ? 1 : 0, 0),
1636 deferred_call_requeue, requeue_handle);
1642 static int revokechild_destructor(struct revokechild_handle *rc)
1644 if (rc->fde != NULL) {
1645 talloc_free(rc->fde);
1648 if (rc->fd[0] != -1) {
1651 if (rc->fd[1] != -1) {
1654 ctdb_kill(rc->ctdb, rc->child, SIGKILL);
1656 DLIST_REMOVE(rc->ctdb_db->revokechild_active, rc);
1660 static void revokechild_handler(struct tevent_context *ev,
1661 struct tevent_fd *fde,
1662 uint16_t flags, void *private_data)
1664 struct revokechild_handle *rc = talloc_get_type(private_data,
1665 struct revokechild_handle);
1669 ret = sys_read(rc->fd[0], &c, 1);
1671 DEBUG(DEBUG_ERR,("Failed to read status from revokechild. errno:%d\n", errno));
1677 DEBUG(DEBUG_ERR,("revokechild returned failure. status:%d\n", c));
1686 struct ctdb_revoke_state {
1687 struct ctdb_db_context *ctdb_db;
1689 struct ctdb_ltdb_header *header;
1696 static void update_record_cb(struct ctdb_client_control_state *state)
1698 struct ctdb_revoke_state *revoke_state;
1702 if (state == NULL) {
1705 revoke_state = state->async.private_data;
1707 state->async.fn = NULL;
1708 ret = ctdb_control_recv(state->ctdb, state, state, NULL, &res, NULL);
1709 if ((ret != 0) || (res != 0)) {
1710 DEBUG(DEBUG_ERR,("Recv for revoke update record failed ret:%d res:%d\n", ret, res));
1711 revoke_state->status = -1;
1714 revoke_state->count--;
1715 if (revoke_state->count <= 0) {
1716 revoke_state->finished = 1;
1720 static void revoke_send_cb(struct ctdb_context *ctdb, uint32_t pnn, void *private_data)
1722 struct ctdb_revoke_state *revoke_state = private_data;
1723 struct ctdb_client_control_state *state;
1725 state = ctdb_ctrl_updaterecord_send(ctdb, revoke_state, timeval_current_ofs(ctdb->tunable.control_timeout,0), pnn, revoke_state->ctdb_db, revoke_state->key, revoke_state->header, revoke_state->data);
1726 if (state == NULL) {
1727 DEBUG(DEBUG_ERR,("Failure to send update record to revoke readonly delegation\n"));
1728 revoke_state->status = -1;
1731 state->async.fn = update_record_cb;
1732 state->async.private_data = revoke_state;
1734 revoke_state->count++;
1738 static void ctdb_revoke_timeout_handler(struct tevent_context *ev,
1739 struct tevent_timer *te,
1740 struct timeval yt, void *private_data)
1742 struct ctdb_revoke_state *state = private_data;
1744 DEBUG(DEBUG_ERR,("Timed out waiting for revoke to finish\n"));
1745 state->finished = 1;
1749 static int ctdb_revoke_all_delegations(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA tdata, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
1751 struct ctdb_revoke_state *state = talloc_zero(ctdb, struct ctdb_revoke_state);
1752 struct ctdb_ltdb_header new_header;
1755 state->ctdb_db = ctdb_db;
1757 state->header = header;
1760 ctdb_trackingdb_traverse(ctdb, tdata, revoke_send_cb, state);
1762 tevent_add_timer(ctdb->ev, state,
1763 timeval_current_ofs(ctdb->tunable.control_timeout, 0),
1764 ctdb_revoke_timeout_handler, state);
1766 while (state->finished == 0) {
1767 tevent_loop_once(ctdb->ev);
1770 if (ctdb_ltdb_lock(ctdb_db, key) != 0) {
1771 DEBUG(DEBUG_ERR,("Failed to chainlock the database in revokechild\n"));
1775 if (ctdb_ltdb_fetch(ctdb_db, key, &new_header, state, &new_data) != 0) {
1776 ctdb_ltdb_unlock(ctdb_db, key);
1777 DEBUG(DEBUG_ERR,("Failed for fetch tdb record in revokechild\n"));
1782 if (new_header.rsn > header->rsn) {
1783 ctdb_ltdb_unlock(ctdb_db, key);
1784 DEBUG(DEBUG_ERR,("RSN too high in tdb record in revokechild\n"));
1788 if ( (new_header.flags & (CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_HAVE_DELEGATIONS)) != (CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_HAVE_DELEGATIONS) ) {
1789 ctdb_ltdb_unlock(ctdb_db, key);
1790 DEBUG(DEBUG_ERR,("Flags are wrong in tdb record in revokechild\n"));
1796 * If revoke on all nodes succeed, revoke is complete. Otherwise,
1797 * remove CTDB_REC_RO_REVOKING_READONLY flag and retry.
1799 if (state->status == 0) {
1801 new_header.flags |= CTDB_REC_RO_REVOKE_COMPLETE;
1803 DEBUG(DEBUG_NOTICE, ("Revoke all delegations failed, retrying.\n"));
1804 new_header.flags &= ~CTDB_REC_RO_REVOKING_READONLY;
1806 if (ctdb_ltdb_store(ctdb_db, key, &new_header, new_data) != 0) {
1807 ctdb_ltdb_unlock(ctdb_db, key);
1808 DEBUG(DEBUG_ERR,("Failed to write new record in revokechild\n"));
1812 ctdb_ltdb_unlock(ctdb_db, key);
1819 int ctdb_start_revoke_ro_record(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
1822 struct revokechild_handle *rc;
1823 pid_t parent = getpid();
1826 header->flags &= ~(CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY);
1827 header->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
1830 if ((rc = talloc_zero(ctdb_db, struct revokechild_handle)) == NULL) {
1831 DEBUG(DEBUG_ERR,("Failed to allocate revokechild_handle\n"));
1835 tdata = tdb_fetch(ctdb_db->rottdb, key);
1836 if (tdata.dsize > 0) {
1840 tdata.dptr = talloc_memdup(rc, tdata.dptr, tdata.dsize);
1846 rc->ctdb_db = ctdb_db;
1850 talloc_set_destructor(rc, revokechild_destructor);
1852 rc->key.dsize = key.dsize;
1853 rc->key.dptr = talloc_memdup(rc, key.dptr, key.dsize);
1854 if (rc->key.dptr == NULL) {
1855 DEBUG(DEBUG_ERR,("Failed to allocate key for revokechild_handle\n"));
1862 DEBUG(DEBUG_ERR,("Failed to allocate key for revokechild_handle\n"));
1868 rc->child = ctdb_fork(ctdb);
1869 if (rc->child == (pid_t)-1) {
1870 DEBUG(DEBUG_ERR,("Failed to fork child for revokechild\n"));
1875 if (rc->child == 0) {
1878 debug_extra = talloc_asprintf(NULL, "revokechild-%s:", ctdb_db->db_name);
1880 ctdb_set_process_name("ctdb_revokechild");
1881 if (switch_from_server_to_client(ctdb, "revokechild-%s", ctdb_db->db_name) != 0) {
1882 DEBUG(DEBUG_ERR,("Failed to switch from server to client for revokechild process\n"));
1884 goto child_finished;
1887 c = ctdb_revoke_all_delegations(ctdb, ctdb_db, tdata, key, header, data);
1890 sys_write(rc->fd[1], &c, 1);
1891 /* make sure we die when our parent dies */
1892 while (ctdb_kill(ctdb, parent, 0) == 0 || errno != ESRCH) {
1900 set_close_on_exec(rc->fd[0]);
1902 /* This is an active revokechild child process */
1903 DLIST_ADD_END(ctdb_db->revokechild_active, rc, NULL);
1905 rc->fde = tevent_add_fd(ctdb->ev, rc, rc->fd[0], TEVENT_FD_READ,
1906 revokechild_handler, (void *)rc);
1907 if (rc->fde == NULL) {
1908 DEBUG(DEBUG_ERR,("Failed to set up fd event for revokechild process\n"));
1911 tevent_fd_set_auto_close(rc->fde);
1916 int ctdb_add_revoke_deferred_call(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_req_header *hdr, deferred_requeue_fn fn, void *call_context)
1918 struct revokechild_handle *rc;
1919 struct revokechild_deferred_call *deferred_call;
1921 for (rc = ctdb_db->revokechild_active; rc; rc = rc->next) {
1922 if (rc->key.dsize == 0) {
1925 if (rc->key.dsize != key.dsize) {
1928 if (!memcmp(rc->key.dptr, key.dptr, key.dsize)) {
1934 DEBUG(DEBUG_ERR,("Failed to add deferred call to revoke list. revoke structure not found\n"));
1938 deferred_call = talloc(rc, struct revokechild_deferred_call);
1939 if (deferred_call == NULL) {
1940 DEBUG(DEBUG_ERR,("Failed to allocate deferred call structure for revoking record\n"));
1944 deferred_call->ctdb = ctdb;
1945 deferred_call->hdr = hdr;
1946 deferred_call->fn = fn;
1947 deferred_call->ctx = call_context;
1949 talloc_set_destructor(deferred_call, deferred_call_destructor);
1950 talloc_steal(deferred_call, hdr);