2 ctdb_call protocol code
4 Copyright (C) Andrew Tridgell 2006
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
20 see http://wiki.samba.org/index.php/Samba_%26_Clustering for
21 protocol design and packet details
25 #include "lib/util/dlinklist.h"
26 #include "system/network.h"
27 #include "system/filesys.h"
28 #include "../include/ctdb_private.h"
29 #include "../common/rb_tree.h"
30 #include "common/reqid.h"
31 #include "common/system.h"
33 struct ctdb_sticky_record {
34 struct ctdb_context *ctdb;
35 struct ctdb_db_context *ctdb_db;
40 find the ctdb_db from a db index
42 struct ctdb_db_context *find_ctdb_db(struct ctdb_context *ctdb, uint32_t id)
44 struct ctdb_db_context *ctdb_db;
46 for (ctdb_db=ctdb->db_list; ctdb_db; ctdb_db=ctdb_db->next) {
47 if (ctdb_db->db_id == id) {
55 a varient of input packet that can be used in lock requeue
57 static void ctdb_call_input_pkt(void *p, struct ctdb_req_header *hdr)
59 struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
60 ctdb_input_pkt(ctdb, hdr);
67 static void ctdb_send_error(struct ctdb_context *ctdb,
68 struct ctdb_req_header *hdr, uint32_t status,
69 const char *fmt, ...) PRINTF_ATTRIBUTE(4,5);
70 static void ctdb_send_error(struct ctdb_context *ctdb,
71 struct ctdb_req_header *hdr, uint32_t status,
75 struct ctdb_reply_error *r;
79 if (ctdb->methods == NULL) {
80 DEBUG(DEBUG_INFO,(__location__ " Failed to send error. Transport is DOWN\n"));
85 msg = talloc_vasprintf(ctdb, fmt, ap);
87 ctdb_fatal(ctdb, "Unable to allocate error in ctdb_send_error\n");
91 msglen = strlen(msg)+1;
92 len = offsetof(struct ctdb_reply_error, msg);
93 r = ctdb_transport_allocate(ctdb, msg, CTDB_REPLY_ERROR, len + msglen,
94 struct ctdb_reply_error);
95 CTDB_NO_MEMORY_FATAL(ctdb, r);
97 r->hdr.destnode = hdr->srcnode;
98 r->hdr.reqid = hdr->reqid;
101 memcpy(&r->msg[0], msg, msglen);
103 ctdb_queue_packet(ctdb, &r->hdr);
110 * send a redirect reply
112 * The logic behind this function is this:
114 * A client wants to grab a record and sends a CTDB_REQ_CALL packet
115 * to its local ctdb (ctdb_request_call). If the node is not itself
116 * the record's DMASTER, it first redirects the packet to the
117 * record's LMASTER. The LMASTER then redirects the call packet to
118 * the current DMASTER. Note that this works because of this: When
119 * a record is migrated off a node, then the new DMASTER is stored
120 * in the record's copy on the former DMASTER.
122 static void ctdb_call_send_redirect(struct ctdb_context *ctdb,
123 struct ctdb_db_context *ctdb_db,
125 struct ctdb_req_call *c,
126 struct ctdb_ltdb_header *header)
128 uint32_t lmaster = ctdb_lmaster(ctdb, &key);
130 c->hdr.destnode = lmaster;
131 if (ctdb->pnn == lmaster) {
132 c->hdr.destnode = header->dmaster;
136 if (c->hopcount%100 > 95) {
137 DEBUG(DEBUG_WARNING,("High hopcount %d dbid:%s "
138 "key:0x%08x reqid=%08x pnn:%d src:%d lmaster:%d "
139 "header->dmaster:%d dst:%d\n",
140 c->hopcount, ctdb_db->db_name, ctdb_hash(&key),
141 c->hdr.reqid, ctdb->pnn, c->hdr.srcnode, lmaster,
142 header->dmaster, c->hdr.destnode));
145 ctdb_queue_packet(ctdb, &c->hdr);
152 caller must have the chainlock before calling this routine. Caller must be
155 static void ctdb_send_dmaster_reply(struct ctdb_db_context *ctdb_db,
156 struct ctdb_ltdb_header *header,
157 TDB_DATA key, TDB_DATA data,
158 uint32_t new_dmaster,
161 struct ctdb_context *ctdb = ctdb_db->ctdb;
162 struct ctdb_reply_dmaster *r;
166 if (ctdb->pnn != ctdb_lmaster(ctdb, &key)) {
167 DEBUG(DEBUG_ALERT,(__location__ " Caller is not lmaster!\n"));
171 header->dmaster = new_dmaster;
172 ret = ctdb_ltdb_store(ctdb_db, key, header, data);
174 ctdb_fatal(ctdb, "ctdb_send_dmaster_reply unable to update dmaster");
178 if (ctdb->methods == NULL) {
179 ctdb_fatal(ctdb, "ctdb_send_dmaster_reply cant update dmaster since transport is down");
183 /* put the packet on a temporary context, allowing us to safely free
184 it below even if ctdb_reply_dmaster() has freed it already */
185 tmp_ctx = talloc_new(ctdb);
187 /* send the CTDB_REPLY_DMASTER */
188 len = offsetof(struct ctdb_reply_dmaster, data) + key.dsize + data.dsize + sizeof(uint32_t);
189 r = ctdb_transport_allocate(ctdb, tmp_ctx, CTDB_REPLY_DMASTER, len,
190 struct ctdb_reply_dmaster);
191 CTDB_NO_MEMORY_FATAL(ctdb, r);
193 r->hdr.destnode = new_dmaster;
194 r->hdr.reqid = reqid;
195 r->hdr.generation = ctdb_db->generation;
196 r->rsn = header->rsn;
197 r->keylen = key.dsize;
198 r->datalen = data.dsize;
199 r->db_id = ctdb_db->db_id;
200 memcpy(&r->data[0], key.dptr, key.dsize);
201 memcpy(&r->data[key.dsize], data.dptr, data.dsize);
202 memcpy(&r->data[key.dsize+data.dsize], &header->flags, sizeof(uint32_t));
204 ctdb_queue_packet(ctdb, &r->hdr);
206 talloc_free(tmp_ctx);
210 send a dmaster request (give another node the dmaster for a record)
212 This is always sent to the lmaster, which ensures that the lmaster
213 always knows who the dmaster is. The lmaster will then send a
214 CTDB_REPLY_DMASTER to the new dmaster
216 static void ctdb_call_send_dmaster(struct ctdb_db_context *ctdb_db,
217 struct ctdb_req_call *c,
218 struct ctdb_ltdb_header *header,
219 TDB_DATA *key, TDB_DATA *data)
221 struct ctdb_req_dmaster *r;
222 struct ctdb_context *ctdb = ctdb_db->ctdb;
224 uint32_t lmaster = ctdb_lmaster(ctdb, key);
226 if (ctdb->methods == NULL) {
227 ctdb_fatal(ctdb, "Failed ctdb_call_send_dmaster since transport is down");
231 if (data->dsize != 0) {
232 header->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
235 if (lmaster == ctdb->pnn) {
236 ctdb_send_dmaster_reply(ctdb_db, header, *key, *data,
237 c->hdr.srcnode, c->hdr.reqid);
241 len = offsetof(struct ctdb_req_dmaster, data) + key->dsize + data->dsize
243 r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_DMASTER, len,
244 struct ctdb_req_dmaster);
245 CTDB_NO_MEMORY_FATAL(ctdb, r);
246 r->hdr.destnode = lmaster;
247 r->hdr.reqid = c->hdr.reqid;
248 r->hdr.generation = ctdb_db->generation;
250 r->rsn = header->rsn;
251 r->dmaster = c->hdr.srcnode;
252 r->keylen = key->dsize;
253 r->datalen = data->dsize;
254 memcpy(&r->data[0], key->dptr, key->dsize);
255 memcpy(&r->data[key->dsize], data->dptr, data->dsize);
256 memcpy(&r->data[key->dsize + data->dsize], &header->flags, sizeof(uint32_t));
258 header->dmaster = c->hdr.srcnode;
259 if (ctdb_ltdb_store(ctdb_db, *key, header, *data) != 0) {
260 ctdb_fatal(ctdb, "Failed to store record in ctdb_call_send_dmaster");
263 ctdb_queue_packet(ctdb, &r->hdr);
268 static void ctdb_sticky_pindown_timeout(struct event_context *ev, struct timed_event *te,
269 struct timeval t, void *private_data)
271 struct ctdb_sticky_record *sr = talloc_get_type(private_data,
272 struct ctdb_sticky_record);
274 DEBUG(DEBUG_ERR,("Pindown timeout db:%s unstick record\n", sr->ctdb_db->db_name));
275 if (sr->pindown != NULL) {
276 talloc_free(sr->pindown);
282 ctdb_set_sticky_pindown(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key)
284 TALLOC_CTX *tmp_ctx = talloc_new(NULL);
286 struct ctdb_sticky_record *sr;
288 k = ctdb_key_to_idkey(tmp_ctx, key);
290 DEBUG(DEBUG_ERR,("Failed to allocate key for sticky record\n"));
291 talloc_free(tmp_ctx);
295 sr = trbt_lookuparray32(ctdb_db->sticky_records, k[0], &k[0]);
297 talloc_free(tmp_ctx);
301 talloc_free(tmp_ctx);
303 if (sr->pindown == NULL) {
304 DEBUG(DEBUG_ERR,("Pinning down record in %s for %d ms\n", ctdb_db->db_name, ctdb->tunable.sticky_pindown));
305 sr->pindown = talloc_new(sr);
306 if (sr->pindown == NULL) {
307 DEBUG(DEBUG_ERR,("Failed to allocate pindown context for sticky record\n"));
310 event_add_timed(ctdb->ev, sr->pindown, timeval_current_ofs(ctdb->tunable.sticky_pindown / 1000, (ctdb->tunable.sticky_pindown * 1000) % 1000000), ctdb_sticky_pindown_timeout, sr);
317 called when a CTDB_REPLY_DMASTER packet comes in, or when the lmaster
318 gets a CTDB_REQUEST_DMASTER for itself. We become the dmaster.
320 must be called with the chainlock held. This function releases the chainlock
322 static void ctdb_become_dmaster(struct ctdb_db_context *ctdb_db,
323 struct ctdb_req_header *hdr,
324 TDB_DATA key, TDB_DATA data,
325 uint64_t rsn, uint32_t record_flags)
327 struct ctdb_call_state *state;
328 struct ctdb_context *ctdb = ctdb_db->ctdb;
329 struct ctdb_ltdb_header header;
332 DEBUG(DEBUG_DEBUG,("pnn %u dmaster response %08x\n", ctdb->pnn, ctdb_hash(&key)));
336 header.dmaster = ctdb->pnn;
337 header.flags = record_flags;
339 state = reqid_find(ctdb->idr, hdr->reqid, struct ctdb_call_state);
342 if (state->call->flags & CTDB_CALL_FLAG_VACUUM_MIGRATION) {
344 * We temporarily add the VACUUM_MIGRATED flag to
345 * the record flags, so that ctdb_ltdb_store can
346 * decide whether the record should be stored or
349 header.flags |= CTDB_REC_FLAG_VACUUM_MIGRATED;
353 if (ctdb_ltdb_store(ctdb_db, key, &header, data) != 0) {
354 ctdb_fatal(ctdb, "ctdb_reply_dmaster store failed\n");
356 ret = ctdb_ltdb_unlock(ctdb_db, key);
358 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
363 /* we just became DMASTER and this database is "sticky",
364 see if the record is flagged as "hot" and set up a pin-down
365 context to stop migrations for a little while if so
367 if (ctdb_db->sticky) {
368 ctdb_set_sticky_pindown(ctdb, ctdb_db, key);
372 DEBUG(DEBUG_ERR,("pnn %u Invalid reqid %u in ctdb_become_dmaster from node %u\n",
373 ctdb->pnn, hdr->reqid, hdr->srcnode));
375 ret = ctdb_ltdb_unlock(ctdb_db, key);
377 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
382 if (key.dsize != state->call->key.dsize || memcmp(key.dptr, state->call->key.dptr, key.dsize)) {
383 DEBUG(DEBUG_ERR, ("Got bogus DMASTER packet reqid:%u from node %u. Key does not match key held in matching idr.\n", hdr->reqid, hdr->srcnode));
385 ret = ctdb_ltdb_unlock(ctdb_db, key);
387 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
392 if (hdr->reqid != state->reqid) {
393 /* we found a record but it was the wrong one */
394 DEBUG(DEBUG_ERR, ("Dropped orphan in ctdb_become_dmaster with reqid:%u\n from node %u", hdr->reqid, hdr->srcnode));
396 ret = ctdb_ltdb_unlock(ctdb_db, key);
398 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
403 ctdb_call_local(ctdb_db, state->call, &header, state, &data, true);
405 ret = ctdb_ltdb_unlock(ctdb_db, state->call->key);
407 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
410 state->state = CTDB_CALL_DONE;
411 if (state->async.fn) {
412 state->async.fn(state);
416 struct dmaster_defer_call {
417 struct dmaster_defer_call *next, *prev;
418 struct ctdb_context *ctdb;
419 struct ctdb_req_header *hdr;
422 struct dmaster_defer_queue {
423 struct ctdb_db_context *ctdb_db;
425 struct dmaster_defer_call *deferred_calls;
428 static void dmaster_defer_reprocess(struct tevent_context *ev,
429 struct tevent_timer *te,
433 struct dmaster_defer_call *call = talloc_get_type(
434 private_data, struct dmaster_defer_call);
436 ctdb_input_pkt(call->ctdb, call->hdr);
440 static int dmaster_defer_queue_destructor(struct dmaster_defer_queue *ddq)
442 /* Ignore requests, if database recovery happens in-between. */
443 if (ddq->generation != ddq->ctdb_db->generation) {
447 while (ddq->deferred_calls != NULL) {
448 struct dmaster_defer_call *call = ddq->deferred_calls;
450 DLIST_REMOVE(ddq->deferred_calls, call);
452 talloc_steal(call->ctdb, call);
453 tevent_add_timer(call->ctdb->ev, call, timeval_zero(),
454 dmaster_defer_reprocess, call);
459 static void *insert_ddq_callback(void *parm, void *data)
468 * This function is used to reigster a key in database that needs to be updated.
469 * Any requests for that key should get deferred till this is completed.
471 static int dmaster_defer_setup(struct ctdb_db_context *ctdb_db,
472 struct ctdb_req_header *hdr,
476 struct dmaster_defer_queue *ddq;
478 k = ctdb_key_to_idkey(hdr, key);
480 DEBUG(DEBUG_ERR, ("Failed to allocate key for dmaster defer setup\n"));
485 ddq = trbt_lookuparray32(ctdb_db->defer_dmaster, k[0], k);
487 if (ddq->generation == ctdb_db->generation) {
492 /* Recovery ocurred - get rid of old queue. All the deferred
493 * requests will be resent anyway from ctdb_call_resend_db.
498 ddq = talloc(hdr, struct dmaster_defer_queue);
500 DEBUG(DEBUG_ERR, ("Failed to allocate dmaster defer queue\n"));
504 ddq->ctdb_db = ctdb_db;
505 ddq->generation = hdr->generation;
506 ddq->deferred_calls = NULL;
508 trbt_insertarray32_callback(ctdb_db->defer_dmaster, k[0], k,
509 insert_ddq_callback, ddq);
510 talloc_set_destructor(ddq, dmaster_defer_queue_destructor);
516 static int dmaster_defer_add(struct ctdb_db_context *ctdb_db,
517 struct ctdb_req_header *hdr,
520 struct dmaster_defer_queue *ddq;
521 struct dmaster_defer_call *call;
524 k = ctdb_key_to_idkey(hdr, key);
526 DEBUG(DEBUG_ERR, ("Failed to allocate key for dmaster defer add\n"));
530 ddq = trbt_lookuparray32(ctdb_db->defer_dmaster, k[0], k);
538 if (ddq->generation != hdr->generation) {
539 talloc_set_destructor(ddq, NULL);
544 call = talloc(ddq, struct dmaster_defer_call);
546 DEBUG(DEBUG_ERR, ("Failed to allocate dmaster defer call\n"));
550 call->ctdb = ctdb_db->ctdb;
551 call->hdr = talloc_steal(call, hdr);
553 DLIST_ADD_END(ddq->deferred_calls, call, NULL);
559 called when a CTDB_REQ_DMASTER packet comes in
561 this comes into the lmaster for a record when the current dmaster
562 wants to give up the dmaster role and give it to someone else
564 void ctdb_request_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
566 struct ctdb_req_dmaster *c = (struct ctdb_req_dmaster *)hdr;
567 TDB_DATA key, data, data2;
568 struct ctdb_ltdb_header header;
569 struct ctdb_db_context *ctdb_db;
570 uint32_t record_flags = 0;
574 ctdb_db = find_ctdb_db(ctdb, c->db_id);
576 ctdb_send_error(ctdb, hdr, -1,
577 "Unknown database in request. db_id==0x%08x",
582 if (hdr->generation != ctdb_db->generation) {
584 ("ctdb operation %u request %u from node %u to %u had an"
585 " invalid generation:%u while our generation is:%u\n",
586 hdr->operation, hdr->reqid, hdr->srcnode, hdr->destnode,
587 hdr->generation, ctdb_db->generation));
592 key.dsize = c->keylen;
593 data.dptr = c->data + c->keylen;
594 data.dsize = c->datalen;
595 len = offsetof(struct ctdb_req_dmaster, data) + key.dsize + data.dsize
597 if (len <= c->hdr.length) {
598 memcpy(&record_flags, &c->data[c->keylen + c->datalen],
599 sizeof(record_flags));
602 dmaster_defer_setup(ctdb_db, hdr, key);
604 /* fetch the current record */
605 ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header, hdr, &data2,
606 ctdb_call_input_pkt, ctdb, false);
608 ctdb_fatal(ctdb, "ctdb_req_dmaster failed to fetch record");
612 DEBUG(DEBUG_INFO,(__location__ " deferring ctdb_request_dmaster\n"));
616 if (ctdb_lmaster(ctdb, &key) != ctdb->pnn) {
617 DEBUG(DEBUG_ALERT,("pnn %u dmaster request to non-lmaster lmaster=%u gen=%u curgen=%u\n",
618 ctdb->pnn, ctdb_lmaster(ctdb, &key),
619 hdr->generation, ctdb->vnn_map->generation));
620 ctdb_fatal(ctdb, "ctdb_req_dmaster to non-lmaster");
623 DEBUG(DEBUG_DEBUG,("pnn %u dmaster request on %08x for %u from %u\n",
624 ctdb->pnn, ctdb_hash(&key), c->dmaster, c->hdr.srcnode));
626 /* its a protocol error if the sending node is not the current dmaster */
627 if (header.dmaster != hdr->srcnode) {
628 DEBUG(DEBUG_ALERT,("pnn %u dmaster request for new-dmaster %u from non-master %u real-dmaster=%u key %08x dbid 0x%08x gen=%u curgen=%u c->rsn=%llu header.rsn=%llu reqid=%u keyval=0x%08x\n",
629 ctdb->pnn, c->dmaster, hdr->srcnode, header.dmaster, ctdb_hash(&key),
630 ctdb_db->db_id, hdr->generation, ctdb_db->generation,
631 (unsigned long long)c->rsn, (unsigned long long)header.rsn, c->hdr.reqid,
632 (key.dsize >= 4)?(*(uint32_t *)key.dptr):0));
633 if (header.rsn != 0 || header.dmaster != ctdb->pnn) {
634 DEBUG(DEBUG_ERR,("ctdb_req_dmaster from non-master. Force a recovery.\n"));
636 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
637 ctdb_ltdb_unlock(ctdb_db, key);
642 if (header.rsn > c->rsn) {
643 DEBUG(DEBUG_ALERT,("pnn %u dmaster request with older RSN new-dmaster %u from %u real-dmaster=%u key %08x dbid 0x%08x gen=%u curgen=%u c->rsn=%llu header.rsn=%llu reqid=%u\n",
644 ctdb->pnn, c->dmaster, hdr->srcnode, header.dmaster, ctdb_hash(&key),
645 ctdb_db->db_id, hdr->generation, ctdb_db->generation,
646 (unsigned long long)c->rsn, (unsigned long long)header.rsn, c->hdr.reqid));
649 /* use the rsn from the sending node */
652 /* store the record flags from the sending node */
653 header.flags = record_flags;
655 /* check if the new dmaster is the lmaster, in which case we
656 skip the dmaster reply */
657 if (c->dmaster == ctdb->pnn) {
658 ctdb_become_dmaster(ctdb_db, hdr, key, data, c->rsn, record_flags);
660 ctdb_send_dmaster_reply(ctdb_db, &header, key, data, c->dmaster, hdr->reqid);
662 ret = ctdb_ltdb_unlock(ctdb_db, key);
664 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
669 static void ctdb_sticky_record_timeout(struct event_context *ev, struct timed_event *te,
670 struct timeval t, void *private_data)
672 struct ctdb_sticky_record *sr = talloc_get_type(private_data,
673 struct ctdb_sticky_record);
677 static void *ctdb_make_sticky_record_callback(void *parm, void *data)
680 DEBUG(DEBUG_ERR,("Already have sticky record registered. Free old %p and create new %p\n", data, parm));
687 ctdb_make_record_sticky(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key)
689 TALLOC_CTX *tmp_ctx = talloc_new(NULL);
691 struct ctdb_sticky_record *sr;
693 k = ctdb_key_to_idkey(tmp_ctx, key);
695 DEBUG(DEBUG_ERR,("Failed to allocate key for sticky record\n"));
696 talloc_free(tmp_ctx);
700 sr = trbt_lookuparray32(ctdb_db->sticky_records, k[0], &k[0]);
702 talloc_free(tmp_ctx);
706 sr = talloc(ctdb_db->sticky_records, struct ctdb_sticky_record);
708 talloc_free(tmp_ctx);
709 DEBUG(DEBUG_ERR,("Failed to allocate sticky record structure\n"));
714 sr->ctdb_db = ctdb_db;
717 DEBUG(DEBUG_ERR,("Make record sticky for %d seconds in db %s key:0x%08x.\n",
718 ctdb->tunable.sticky_duration,
719 ctdb_db->db_name, ctdb_hash(&key)));
721 trbt_insertarray32_callback(ctdb_db->sticky_records, k[0], &k[0], ctdb_make_sticky_record_callback, sr);
723 event_add_timed(ctdb->ev, sr, timeval_current_ofs(ctdb->tunable.sticky_duration, 0), ctdb_sticky_record_timeout, sr);
725 talloc_free(tmp_ctx);
729 struct pinned_down_requeue_handle {
730 struct ctdb_context *ctdb;
731 struct ctdb_req_header *hdr;
734 struct pinned_down_deferred_call {
735 struct ctdb_context *ctdb;
736 struct ctdb_req_header *hdr;
739 static void pinned_down_requeue(struct event_context *ev, struct timed_event *te,
740 struct timeval t, void *private_data)
742 struct pinned_down_requeue_handle *handle = talloc_get_type(private_data, struct pinned_down_requeue_handle);
743 struct ctdb_context *ctdb = handle->ctdb;
745 talloc_steal(ctdb, handle->hdr);
746 ctdb_call_input_pkt(ctdb, handle->hdr);
751 static int pinned_down_destructor(struct pinned_down_deferred_call *pinned_down)
753 struct ctdb_context *ctdb = pinned_down->ctdb;
754 struct pinned_down_requeue_handle *handle = talloc(ctdb, struct pinned_down_requeue_handle);
756 handle->ctdb = pinned_down->ctdb;
757 handle->hdr = pinned_down->hdr;
758 talloc_steal(handle, handle->hdr);
760 event_add_timed(ctdb->ev, handle, timeval_zero(), pinned_down_requeue, handle);
766 ctdb_defer_pinned_down_request(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_req_header *hdr)
768 TALLOC_CTX *tmp_ctx = talloc_new(NULL);
770 struct ctdb_sticky_record *sr;
771 struct pinned_down_deferred_call *pinned_down;
773 k = ctdb_key_to_idkey(tmp_ctx, key);
775 DEBUG(DEBUG_ERR,("Failed to allocate key for sticky record\n"));
776 talloc_free(tmp_ctx);
780 sr = trbt_lookuparray32(ctdb_db->sticky_records, k[0], &k[0]);
782 talloc_free(tmp_ctx);
786 talloc_free(tmp_ctx);
788 if (sr->pindown == NULL) {
792 pinned_down = talloc(sr->pindown, struct pinned_down_deferred_call);
793 if (pinned_down == NULL) {
794 DEBUG(DEBUG_ERR,("Failed to allocate structure for deferred pinned down request\n"));
798 pinned_down->ctdb = ctdb;
799 pinned_down->hdr = hdr;
801 talloc_set_destructor(pinned_down, pinned_down_destructor);
802 talloc_steal(pinned_down, hdr);
808 ctdb_update_db_stat_hot_keys(struct ctdb_db_context *ctdb_db, TDB_DATA key, int hopcount)
812 /* smallest value is always at index 0 */
813 if (hopcount <= ctdb_db->statistics.hot_keys[0].count) {
817 /* see if we already know this key */
818 for (i = 0; i < MAX_HOT_KEYS; i++) {
819 if (key.dsize != ctdb_db->statistics.hot_keys[i].key.dsize) {
822 if (memcmp(key.dptr, ctdb_db->statistics.hot_keys[i].key.dptr, key.dsize)) {
825 /* found an entry for this key */
826 if (hopcount <= ctdb_db->statistics.hot_keys[i].count) {
829 ctdb_db->statistics.hot_keys[i].count = hopcount;
833 if (ctdb_db->statistics.num_hot_keys < MAX_HOT_KEYS) {
834 id = ctdb_db->statistics.num_hot_keys;
835 ctdb_db->statistics.num_hot_keys++;
840 if (ctdb_db->statistics.hot_keys[id].key.dptr != NULL) {
841 talloc_free(ctdb_db->statistics.hot_keys[id].key.dptr);
843 ctdb_db->statistics.hot_keys[id].key.dsize = key.dsize;
844 ctdb_db->statistics.hot_keys[id].key.dptr = talloc_memdup(ctdb_db, key.dptr, key.dsize);
845 ctdb_db->statistics.hot_keys[id].count = hopcount;
846 DEBUG(DEBUG_NOTICE,("Updated hot key database=%s key=0x%08x id=%d hop_count=%d\n",
847 ctdb_db->db_name, ctdb_hash(&key), id, hopcount));
850 for (i = 1; i < MAX_HOT_KEYS; i++) {
851 if (ctdb_db->statistics.hot_keys[i].count == 0) {
854 if (ctdb_db->statistics.hot_keys[i].count < ctdb_db->statistics.hot_keys[0].count) {
855 hopcount = ctdb_db->statistics.hot_keys[i].count;
856 ctdb_db->statistics.hot_keys[i].count = ctdb_db->statistics.hot_keys[0].count;
857 ctdb_db->statistics.hot_keys[0].count = hopcount;
859 key = ctdb_db->statistics.hot_keys[i].key;
860 ctdb_db->statistics.hot_keys[i].key = ctdb_db->statistics.hot_keys[0].key;
861 ctdb_db->statistics.hot_keys[0].key = key;
867 called when a CTDB_REQ_CALL packet comes in
869 void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
871 struct ctdb_req_call *c = (struct ctdb_req_call *)hdr;
873 struct ctdb_reply_call *r;
875 struct ctdb_ltdb_header header;
876 struct ctdb_call *call;
877 struct ctdb_db_context *ctdb_db;
878 int tmp_count, bucket;
880 if (ctdb->methods == NULL) {
881 DEBUG(DEBUG_INFO,(__location__ " Failed ctdb_request_call. Transport is DOWN\n"));
885 ctdb_db = find_ctdb_db(ctdb, c->db_id);
887 ctdb_send_error(ctdb, hdr, -1,
888 "Unknown database in request. db_id==0x%08x",
893 if (hdr->generation != ctdb_db->generation) {
895 ("ctdb operation %u request %u from node %u to %u had an"
896 " invalid generation:%u while our generation is:%u\n",
897 hdr->operation, hdr->reqid, hdr->srcnode, hdr->destnode,
898 hdr->generation, ctdb_db->generation));
902 call = talloc(hdr, struct ctdb_call);
903 CTDB_NO_MEMORY_FATAL(ctdb, call);
905 call->call_id = c->callid;
906 call->key.dptr = c->data;
907 call->key.dsize = c->keylen;
908 call->call_data.dptr = c->data + c->keylen;
909 call->call_data.dsize = c->calldatalen;
910 call->reply_data.dptr = NULL;
911 call->reply_data.dsize = 0;
914 /* If this record is pinned down we should defer the
915 request until the pindown times out
917 if (ctdb_db->sticky) {
918 if (ctdb_defer_pinned_down_request(ctdb, ctdb_db, call->key, hdr) == 0) {
920 ("Defer request for pinned down record in %s\n", ctdb_db->db_name));
926 if (dmaster_defer_add(ctdb_db, hdr, call->key) == 0) {
931 /* determine if we are the dmaster for this key. This also
932 fetches the record data (if any), thus avoiding a 2nd fetch of the data
933 if the call will be answered locally */
935 ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, call->key, &header, hdr, &data,
936 ctdb_call_input_pkt, ctdb, false);
938 ctdb_send_error(ctdb, hdr, ret, "ltdb fetch failed in ctdb_request_call");
943 DEBUG(DEBUG_INFO,(__location__ " deferred ctdb_request_call\n"));
948 /* Dont do READONLY if we dont have a tracking database */
949 if ((c->flags & CTDB_WANT_READONLY) && !ctdb_db->readonly) {
950 c->flags &= ~CTDB_WANT_READONLY;
953 if (header.flags & CTDB_REC_RO_REVOKE_COMPLETE) {
954 header.flags &= ~CTDB_REC_RO_FLAGS;
955 CTDB_INCREMENT_STAT(ctdb, total_ro_revokes);
956 CTDB_INCREMENT_DB_STAT(ctdb_db, db_ro_revokes);
957 if (ctdb_ltdb_store(ctdb_db, call->key, &header, data) != 0) {
958 ctdb_fatal(ctdb, "Failed to write header with cleared REVOKE flag");
960 /* and clear out the tracking data */
961 if (tdb_delete(ctdb_db->rottdb, call->key) != 0) {
962 DEBUG(DEBUG_ERR,(__location__ " Failed to clear out trackingdb record\n"));
966 /* if we are revoking, we must defer all other calls until the revoke
969 if (header.flags & CTDB_REC_RO_REVOKING_READONLY) {
970 talloc_free(data.dptr);
971 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
973 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, call->key, hdr, ctdb_call_input_pkt, ctdb) != 0) {
974 ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
981 * If we are not the dmaster and are not hosting any delegations,
982 * then we redirect the request to the node than can answer it
983 * (the lmaster or the dmaster).
985 if ((header.dmaster != ctdb->pnn)
986 && (!(header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) ) {
987 talloc_free(data.dptr);
988 ctdb_call_send_redirect(ctdb, ctdb_db, call->key, c, &header);
990 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
992 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
998 if ( (!(c->flags & CTDB_WANT_READONLY))
999 && (header.flags & (CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY)) ) {
1000 header.flags |= CTDB_REC_RO_REVOKING_READONLY;
1001 if (ctdb_ltdb_store(ctdb_db, call->key, &header, data) != 0) {
1002 ctdb_fatal(ctdb, "Failed to store record with HAVE_DELEGATIONS set");
1004 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
1006 if (ctdb_start_revoke_ro_record(ctdb, ctdb_db, call->key, &header, data) != 0) {
1007 ctdb_fatal(ctdb, "Failed to start record revoke");
1009 talloc_free(data.dptr);
1011 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, call->key, hdr, ctdb_call_input_pkt, ctdb) != 0) {
1012 ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
1019 /* If this is the first request for delegation. bump rsn and set
1020 * the delegations flag
1022 if ((c->flags & CTDB_WANT_READONLY)
1023 && (c->callid == CTDB_FETCH_WITH_HEADER_FUNC)
1024 && (!(header.flags & CTDB_REC_RO_HAVE_DELEGATIONS))) {
1026 header.flags |= CTDB_REC_RO_HAVE_DELEGATIONS;
1027 if (ctdb_ltdb_store(ctdb_db, call->key, &header, data) != 0) {
1028 ctdb_fatal(ctdb, "Failed to store record with HAVE_DELEGATIONS set");
1031 if ((c->flags & CTDB_WANT_READONLY)
1032 && (call->call_id == CTDB_FETCH_WITH_HEADER_FUNC)) {
1035 tdata = tdb_fetch(ctdb_db->rottdb, call->key);
1036 if (ctdb_trackingdb_add_pnn(ctdb, &tdata, c->hdr.srcnode) != 0) {
1037 ctdb_fatal(ctdb, "Failed to add node to trackingdb");
1039 if (tdb_store(ctdb_db->rottdb, call->key, tdata, TDB_REPLACE) != 0) {
1040 ctdb_fatal(ctdb, "Failed to store trackingdb data");
1044 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
1046 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
1049 len = offsetof(struct ctdb_reply_call, data) + data.dsize + sizeof(struct ctdb_ltdb_header);
1050 r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REPLY_CALL, len,
1051 struct ctdb_reply_call);
1052 CTDB_NO_MEMORY_FATAL(ctdb, r);
1053 r->hdr.destnode = c->hdr.srcnode;
1054 r->hdr.reqid = c->hdr.reqid;
1055 r->hdr.generation = ctdb_db->generation;
1057 r->datalen = data.dsize + sizeof(struct ctdb_ltdb_header);
1059 header.flags |= CTDB_REC_RO_HAVE_READONLY;
1060 header.flags &= ~CTDB_REC_RO_HAVE_DELEGATIONS;
1061 memcpy(&r->data[0], &header, sizeof(struct ctdb_ltdb_header));
1064 memcpy(&r->data[sizeof(struct ctdb_ltdb_header)], data.dptr, data.dsize);
1067 ctdb_queue_packet(ctdb, &r->hdr);
1068 CTDB_INCREMENT_STAT(ctdb, total_ro_delegations);
1069 CTDB_INCREMENT_DB_STAT(ctdb_db, db_ro_delegations);
1076 CTDB_UPDATE_STAT(ctdb, max_hop_count, c->hopcount);
1077 tmp_count = c->hopcount;
1083 if (bucket >= MAX_COUNT_BUCKETS) {
1084 bucket = MAX_COUNT_BUCKETS - 1;
1086 CTDB_INCREMENT_STAT(ctdb, hop_count_bucket[bucket]);
1087 CTDB_INCREMENT_DB_STAT(ctdb_db, hop_count_bucket[bucket]);
1088 ctdb_update_db_stat_hot_keys(ctdb_db, call->key, c->hopcount);
1090 /* If this database supports sticky records, then check if the
1091 hopcount is big. If it is it means the record is hot and we
1092 should make it sticky.
1094 if (ctdb_db->sticky && c->hopcount >= ctdb->tunable.hopcount_make_sticky) {
1095 ctdb_make_record_sticky(ctdb, ctdb_db, call->key);
1099 /* Try if possible to migrate the record off to the caller node.
1100 * From the clients perspective a fetch of the data is just as
1101 * expensive as a migration.
1103 if (c->hdr.srcnode != ctdb->pnn) {
1104 if (ctdb_db->persistent_state) {
1105 DEBUG(DEBUG_INFO, (__location__ " refusing migration"
1106 " of key %s while transaction is active\n",
1107 (char *)call->key.dptr));
1109 DEBUG(DEBUG_DEBUG,("pnn %u starting migration of %08x to %u\n",
1110 ctdb->pnn, ctdb_hash(&(call->key)), c->hdr.srcnode));
1111 ctdb_call_send_dmaster(ctdb_db, c, &header, &(call->key), &data);
1112 talloc_free(data.dptr);
1114 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
1116 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
1123 ret = ctdb_call_local(ctdb_db, call, &header, hdr, &data, true);
1125 DEBUG(DEBUG_ERR,(__location__ " ctdb_call_local failed\n"));
1129 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
1131 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
1134 len = offsetof(struct ctdb_reply_call, data) + call->reply_data.dsize;
1135 r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REPLY_CALL, len,
1136 struct ctdb_reply_call);
1137 CTDB_NO_MEMORY_FATAL(ctdb, r);
1138 r->hdr.destnode = hdr->srcnode;
1139 r->hdr.reqid = hdr->reqid;
1140 r->hdr.generation = ctdb_db->generation;
1141 r->status = call->status;
1142 r->datalen = call->reply_data.dsize;
1143 if (call->reply_data.dsize) {
1144 memcpy(&r->data[0], call->reply_data.dptr, call->reply_data.dsize);
1147 ctdb_queue_packet(ctdb, &r->hdr);
1154 * called when a CTDB_REPLY_CALL packet comes in
1156 * This packet comes in response to a CTDB_REQ_CALL request packet. It
1157 * contains any reply data from the call
1159 void ctdb_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
1161 struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr;
1162 struct ctdb_call_state *state;
1164 state = reqid_find(ctdb->idr, hdr->reqid, struct ctdb_call_state);
1165 if (state == NULL) {
1166 DEBUG(DEBUG_ERR, (__location__ " reqid %u not found\n", hdr->reqid));
1170 if (hdr->reqid != state->reqid) {
1171 /* we found a record but it was the wrong one */
1172 DEBUG(DEBUG_ERR, ("Dropped orphaned call reply with reqid:%u\n",hdr->reqid));
1176 if (hdr->generation != state->generation) {
1178 ("ctdb operation %u request %u from node %u to %u had an"
1179 " invalid generation:%u while our generation is:%u\n",
1180 hdr->operation, hdr->reqid, hdr->srcnode, hdr->destnode,
1181 hdr->generation, state->generation));
1186 /* read only delegation processing */
1187 /* If we got a FETCH_WITH_HEADER we should check if this is a ro
1188 * delegation since we may need to update the record header
1190 if (state->c->callid == CTDB_FETCH_WITH_HEADER_FUNC) {
1191 struct ctdb_db_context *ctdb_db = state->ctdb_db;
1192 struct ctdb_ltdb_header *header = (struct ctdb_ltdb_header *)&c->data[0];
1193 struct ctdb_ltdb_header oldheader;
1194 TDB_DATA key, data, olddata;
1197 if (!(header->flags & CTDB_REC_RO_HAVE_READONLY)) {
1202 key.dsize = state->c->keylen;
1203 key.dptr = state->c->data;
1204 ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr,
1205 ctdb_call_input_pkt, ctdb, false);
1210 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock in ctdb_reply_call\n"));
1214 ret = ctdb_ltdb_fetch(ctdb_db, key, &oldheader, state, &olddata);
1216 DEBUG(DEBUG_ERR, ("Failed to fetch old record in ctdb_reply_call\n"));
1217 ctdb_ltdb_unlock(ctdb_db, key);
1221 if (header->rsn <= oldheader.rsn) {
1222 ctdb_ltdb_unlock(ctdb_db, key);
1226 if (c->datalen < sizeof(struct ctdb_ltdb_header)) {
1227 DEBUG(DEBUG_ERR,(__location__ " Got FETCH_WITH_HEADER reply with too little data: %d bytes\n", c->datalen));
1228 ctdb_ltdb_unlock(ctdb_db, key);
1232 data.dsize = c->datalen - sizeof(struct ctdb_ltdb_header);
1233 data.dptr = &c->data[sizeof(struct ctdb_ltdb_header)];
1234 ret = ctdb_ltdb_store(ctdb_db, key, header, data);
1236 DEBUG(DEBUG_ERR, ("Failed to store new record in ctdb_reply_call\n"));
1237 ctdb_ltdb_unlock(ctdb_db, key);
1241 ctdb_ltdb_unlock(ctdb_db, key);
1245 state->call->reply_data.dptr = c->data;
1246 state->call->reply_data.dsize = c->datalen;
1247 state->call->status = c->status;
1249 talloc_steal(state, c);
1251 state->state = CTDB_CALL_DONE;
1252 if (state->async.fn) {
1253 state->async.fn(state);
1259 * called when a CTDB_REPLY_DMASTER packet comes in
1261 * This packet comes in from the lmaster in response to a CTDB_REQ_CALL
1262 * request packet. It means that the current dmaster wants to give us
1265 void ctdb_reply_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
1267 struct ctdb_reply_dmaster *c = (struct ctdb_reply_dmaster *)hdr;
1268 struct ctdb_db_context *ctdb_db;
1270 uint32_t record_flags = 0;
1274 ctdb_db = find_ctdb_db(ctdb, c->db_id);
1275 if (ctdb_db == NULL) {
1276 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_reply_dmaster\n", c->db_id));
1280 if (hdr->generation != ctdb_db->generation) {
1282 ("ctdb operation %u request %u from node %u to %u had an"
1283 " invalid generation:%u while our generation is:%u\n",
1284 hdr->operation, hdr->reqid, hdr->srcnode, hdr->destnode,
1285 hdr->generation, ctdb_db->generation));
1290 key.dsize = c->keylen;
1291 data.dptr = &c->data[key.dsize];
1292 data.dsize = c->datalen;
1293 len = offsetof(struct ctdb_reply_dmaster, data) + key.dsize + data.dsize
1295 if (len <= c->hdr.length) {
1296 memcpy(&record_flags, &c->data[c->keylen + c->datalen],
1297 sizeof(record_flags));
1300 dmaster_defer_setup(ctdb_db, hdr, key);
1302 ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr,
1303 ctdb_call_input_pkt, ctdb, false);
1308 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock in ctdb_reply_dmaster\n"));
1312 ctdb_become_dmaster(ctdb_db, hdr, key, data, c->rsn, record_flags);
1317 called when a CTDB_REPLY_ERROR packet comes in
1319 void ctdb_reply_error(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
1321 struct ctdb_reply_error *c = (struct ctdb_reply_error *)hdr;
1322 struct ctdb_call_state *state;
1324 state = reqid_find(ctdb->idr, hdr->reqid, struct ctdb_call_state);
1325 if (state == NULL) {
1326 DEBUG(DEBUG_ERR,("pnn %u Invalid reqid %u in ctdb_reply_error\n",
1327 ctdb->pnn, hdr->reqid));
1331 if (hdr->reqid != state->reqid) {
1332 /* we found a record but it was the wrong one */
1333 DEBUG(DEBUG_ERR, ("Dropped orphaned error reply with reqid:%u\n",hdr->reqid));
1337 talloc_steal(state, c);
1339 state->state = CTDB_CALL_ERROR;
1340 state->errmsg = (char *)c->msg;
1341 if (state->async.fn) {
1342 state->async.fn(state);
1350 static int ctdb_call_destructor(struct ctdb_call_state *state)
1352 DLIST_REMOVE(state->ctdb_db->pending_calls, state);
1353 reqid_remove(state->ctdb_db->ctdb->idr, state->reqid);
1359 called when a ctdb_call needs to be resent after a reconfigure event
1361 static void ctdb_call_resend(struct ctdb_call_state *state)
1363 struct ctdb_context *ctdb = state->ctdb_db->ctdb;
1365 state->generation = state->ctdb_db->generation;
1367 /* use a new reqid, in case the old reply does eventually come in */
1368 reqid_remove(ctdb->idr, state->reqid);
1369 state->reqid = reqid_new(ctdb->idr, state);
1370 state->c->hdr.reqid = state->reqid;
1372 /* update the generation count for this request, so its valid with the new vnn_map */
1373 state->c->hdr.generation = state->generation;
1375 /* send the packet to ourselves, it will be redirected appropriately */
1376 state->c->hdr.destnode = ctdb->pnn;
1378 ctdb_queue_packet(ctdb, &state->c->hdr);
1379 DEBUG(DEBUG_NOTICE,("resent ctdb_call for db %s reqid %u generation %u\n",
1380 state->ctdb_db->db_name, state->reqid, state->generation));
1384 resend all pending calls on recovery
1386 void ctdb_call_resend_db(struct ctdb_db_context *ctdb_db)
1388 struct ctdb_call_state *state, *next;
1390 for (state = ctdb_db->pending_calls; state; state = next) {
1392 ctdb_call_resend(state);
1396 void ctdb_call_resend_all(struct ctdb_context *ctdb)
1398 struct ctdb_db_context *ctdb_db;
1400 for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
1401 ctdb_call_resend_db(ctdb_db);
1406 this allows the caller to setup a async.fn
1408 static void call_local_trigger(struct event_context *ev, struct timed_event *te,
1409 struct timeval t, void *private_data)
1411 struct ctdb_call_state *state = talloc_get_type(private_data, struct ctdb_call_state);
1412 if (state->async.fn) {
1413 state->async.fn(state);
1419 construct an event driven local ctdb_call
1421 this is used so that locally processed ctdb_call requests are processed
1422 in an event driven manner
1424 struct ctdb_call_state *ctdb_call_local_send(struct ctdb_db_context *ctdb_db,
1425 struct ctdb_call *call,
1426 struct ctdb_ltdb_header *header,
1429 struct ctdb_call_state *state;
1430 struct ctdb_context *ctdb = ctdb_db->ctdb;
1433 state = talloc_zero(ctdb_db, struct ctdb_call_state);
1434 CTDB_NO_MEMORY_NULL(ctdb, state);
1436 talloc_steal(state, data->dptr);
1438 state->state = CTDB_CALL_DONE;
1439 state->call = talloc(state, struct ctdb_call);
1440 CTDB_NO_MEMORY_NULL(ctdb, state->call);
1441 *(state->call) = *call;
1442 state->ctdb_db = ctdb_db;
1444 ret = ctdb_call_local(ctdb_db, state->call, header, state, data, true);
1446 DEBUG(DEBUG_DEBUG,("ctdb_call_local() failed, ignoring return code %d\n", ret));
1449 event_add_timed(ctdb->ev, state, timeval_zero(), call_local_trigger, state);
1456 make a remote ctdb call - async send. Called in daemon context.
1458 This constructs a ctdb_call request and queues it for processing.
1459 This call never blocks.
1461 struct ctdb_call_state *ctdb_daemon_call_send_remote(struct ctdb_db_context *ctdb_db,
1462 struct ctdb_call *call,
1463 struct ctdb_ltdb_header *header)
1466 struct ctdb_call_state *state;
1467 struct ctdb_context *ctdb = ctdb_db->ctdb;
1469 if (ctdb->methods == NULL) {
1470 DEBUG(DEBUG_INFO,(__location__ " Failed send packet. Transport is down\n"));
1474 state = talloc_zero(ctdb_db, struct ctdb_call_state);
1475 CTDB_NO_MEMORY_NULL(ctdb, state);
1476 state->call = talloc(state, struct ctdb_call);
1477 CTDB_NO_MEMORY_NULL(ctdb, state->call);
1479 state->reqid = reqid_new(ctdb->idr, state);
1480 state->ctdb_db = ctdb_db;
1481 talloc_set_destructor(state, ctdb_call_destructor);
1483 len = offsetof(struct ctdb_req_call, data) + call->key.dsize + call->call_data.dsize;
1484 state->c = ctdb_transport_allocate(ctdb, state, CTDB_REQ_CALL, len,
1485 struct ctdb_req_call);
1486 CTDB_NO_MEMORY_NULL(ctdb, state->c);
1487 state->c->hdr.destnode = header->dmaster;
1489 /* this limits us to 16k outstanding messages - not unreasonable */
1490 state->c->hdr.reqid = state->reqid;
1491 state->c->hdr.generation = ctdb_db->generation;
1492 state->c->flags = call->flags;
1493 state->c->db_id = ctdb_db->db_id;
1494 state->c->callid = call->call_id;
1495 state->c->hopcount = 0;
1496 state->c->keylen = call->key.dsize;
1497 state->c->calldatalen = call->call_data.dsize;
1498 memcpy(&state->c->data[0], call->key.dptr, call->key.dsize);
1499 memcpy(&state->c->data[call->key.dsize],
1500 call->call_data.dptr, call->call_data.dsize);
1501 *(state->call) = *call;
1502 state->call->call_data.dptr = &state->c->data[call->key.dsize];
1503 state->call->key.dptr = &state->c->data[0];
1505 state->state = CTDB_CALL_WAIT;
1506 state->generation = ctdb_db->generation;
1508 DLIST_ADD(ctdb_db->pending_calls, state);
1510 ctdb_queue_packet(ctdb, &state->c->hdr);
1516 make a remote ctdb call - async recv - called in daemon context
1518 This is called when the program wants to wait for a ctdb_call to complete and get the
1519 results. This call will block unless the call has already completed.
1521 int ctdb_daemon_call_recv(struct ctdb_call_state *state, struct ctdb_call *call)
1523 while (state->state < CTDB_CALL_DONE) {
1524 event_loop_once(state->ctdb_db->ctdb->ev);
1526 if (state->state != CTDB_CALL_DONE) {
1527 ctdb_set_error(state->ctdb_db->ctdb, "%s", state->errmsg);
1532 if (state->call->reply_data.dsize) {
1533 call->reply_data.dptr = talloc_memdup(call,
1534 state->call->reply_data.dptr,
1535 state->call->reply_data.dsize);
1536 call->reply_data.dsize = state->call->reply_data.dsize;
1538 call->reply_data.dptr = NULL;
1539 call->reply_data.dsize = 0;
1541 call->status = state->call->status;
1548 send a keepalive packet to the other node
1550 void ctdb_send_keepalive(struct ctdb_context *ctdb, uint32_t destnode)
1552 struct ctdb_req_keepalive *r;
1554 if (ctdb->methods == NULL) {
1555 DEBUG(DEBUG_INFO,(__location__ " Failed to send keepalive. Transport is DOWN\n"));
1559 r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_KEEPALIVE,
1560 sizeof(struct ctdb_req_keepalive),
1561 struct ctdb_req_keepalive);
1562 CTDB_NO_MEMORY_FATAL(ctdb, r);
1563 r->hdr.destnode = destnode;
1566 CTDB_INCREMENT_STAT(ctdb, keepalive_packets_sent);
1568 ctdb_queue_packet(ctdb, &r->hdr);
1575 struct revokechild_deferred_call {
1576 struct ctdb_context *ctdb;
1577 struct ctdb_req_header *hdr;
1578 deferred_requeue_fn fn;
1582 struct revokechild_handle {
1583 struct revokechild_handle *next, *prev;
1584 struct ctdb_context *ctdb;
1585 struct ctdb_db_context *ctdb_db;
1586 struct fd_event *fde;
1593 struct revokechild_requeue_handle {
1594 struct ctdb_context *ctdb;
1595 struct ctdb_req_header *hdr;
1596 deferred_requeue_fn fn;
1600 static void deferred_call_requeue(struct event_context *ev, struct timed_event *te,
1601 struct timeval t, void *private_data)
1603 struct revokechild_requeue_handle *requeue_handle = talloc_get_type(private_data, struct revokechild_requeue_handle);
1605 requeue_handle->fn(requeue_handle->ctx, requeue_handle->hdr);
1606 talloc_free(requeue_handle);
1609 static int deferred_call_destructor(struct revokechild_deferred_call *deferred_call)
1611 struct ctdb_context *ctdb = deferred_call->ctdb;
1612 struct revokechild_requeue_handle *requeue_handle = talloc(ctdb, struct revokechild_requeue_handle);
1613 struct ctdb_req_call *c = (struct ctdb_req_call *)deferred_call->hdr;
1615 requeue_handle->ctdb = ctdb;
1616 requeue_handle->hdr = deferred_call->hdr;
1617 requeue_handle->fn = deferred_call->fn;
1618 requeue_handle->ctx = deferred_call->ctx;
1619 talloc_steal(requeue_handle, requeue_handle->hdr);
1621 /* when revoking, any READONLY requests have 1 second grace to let read/write finish first */
1622 event_add_timed(ctdb->ev, requeue_handle, timeval_current_ofs(c->flags & CTDB_WANT_READONLY ? 1 : 0, 0), deferred_call_requeue, requeue_handle);
1628 static int revokechild_destructor(struct revokechild_handle *rc)
1630 if (rc->fde != NULL) {
1631 talloc_free(rc->fde);
1634 if (rc->fd[0] != -1) {
1637 if (rc->fd[1] != -1) {
1640 ctdb_kill(rc->ctdb, rc->child, SIGKILL);
1642 DLIST_REMOVE(rc->ctdb_db->revokechild_active, rc);
1646 static void revokechild_handler(struct event_context *ev, struct fd_event *fde,
1647 uint16_t flags, void *private_data)
1649 struct revokechild_handle *rc = talloc_get_type(private_data,
1650 struct revokechild_handle);
1654 ret = sys_read(rc->fd[0], &c, 1);
1656 DEBUG(DEBUG_ERR,("Failed to read status from revokechild. errno:%d\n", errno));
1662 DEBUG(DEBUG_ERR,("revokechild returned failure. status:%d\n", c));
1671 struct ctdb_revoke_state {
1672 struct ctdb_db_context *ctdb_db;
1674 struct ctdb_ltdb_header *header;
1681 static void update_record_cb(struct ctdb_client_control_state *state)
1683 struct ctdb_revoke_state *revoke_state;
1687 if (state == NULL) {
1690 revoke_state = state->async.private_data;
1692 state->async.fn = NULL;
1693 ret = ctdb_control_recv(state->ctdb, state, state, NULL, &res, NULL);
1694 if ((ret != 0) || (res != 0)) {
1695 DEBUG(DEBUG_ERR,("Recv for revoke update record failed ret:%d res:%d\n", ret, res));
1696 revoke_state->status = -1;
1699 revoke_state->count--;
1700 if (revoke_state->count <= 0) {
1701 revoke_state->finished = 1;
1705 static void revoke_send_cb(struct ctdb_context *ctdb, uint32_t pnn, void *private_data)
1707 struct ctdb_revoke_state *revoke_state = private_data;
1708 struct ctdb_client_control_state *state;
1710 state = ctdb_ctrl_updaterecord_send(ctdb, revoke_state, timeval_current_ofs(ctdb->tunable.control_timeout,0), pnn, revoke_state->ctdb_db, revoke_state->key, revoke_state->header, revoke_state->data);
1711 if (state == NULL) {
1712 DEBUG(DEBUG_ERR,("Failure to send update record to revoke readonly delegation\n"));
1713 revoke_state->status = -1;
1716 state->async.fn = update_record_cb;
1717 state->async.private_data = revoke_state;
1719 revoke_state->count++;
1723 static void ctdb_revoke_timeout_handler(struct event_context *ev, struct timed_event *te,
1724 struct timeval yt, void *private_data)
1726 struct ctdb_revoke_state *state = private_data;
1728 DEBUG(DEBUG_ERR,("Timed out waiting for revoke to finish\n"));
1729 state->finished = 1;
1733 static int ctdb_revoke_all_delegations(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA tdata, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
1735 struct ctdb_revoke_state *state = talloc_zero(ctdb, struct ctdb_revoke_state);
1736 struct ctdb_ltdb_header new_header;
1739 state->ctdb_db = ctdb_db;
1741 state->header = header;
1744 ctdb_trackingdb_traverse(ctdb, tdata, revoke_send_cb, state);
1746 event_add_timed(ctdb->ev, state, timeval_current_ofs(ctdb->tunable.control_timeout, 0), ctdb_revoke_timeout_handler, state);
1748 while (state->finished == 0) {
1749 event_loop_once(ctdb->ev);
1752 if (ctdb_ltdb_lock(ctdb_db, key) != 0) {
1753 DEBUG(DEBUG_ERR,("Failed to chainlock the database in revokechild\n"));
1757 if (ctdb_ltdb_fetch(ctdb_db, key, &new_header, state, &new_data) != 0) {
1758 ctdb_ltdb_unlock(ctdb_db, key);
1759 DEBUG(DEBUG_ERR,("Failed for fetch tdb record in revokechild\n"));
1764 if (new_header.rsn > header->rsn) {
1765 ctdb_ltdb_unlock(ctdb_db, key);
1766 DEBUG(DEBUG_ERR,("RSN too high in tdb record in revokechild\n"));
1770 if ( (new_header.flags & (CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_HAVE_DELEGATIONS)) != (CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_HAVE_DELEGATIONS) ) {
1771 ctdb_ltdb_unlock(ctdb_db, key);
1772 DEBUG(DEBUG_ERR,("Flags are wrong in tdb record in revokechild\n"));
1778 * If revoke on all nodes succeed, revoke is complete. Otherwise,
1779 * remove CTDB_REC_RO_REVOKING_READONLY flag and retry.
1781 if (state->status == 0) {
1783 new_header.flags |= CTDB_REC_RO_REVOKE_COMPLETE;
1785 DEBUG(DEBUG_NOTICE, ("Revoke all delegations failed, retrying.\n"));
1786 new_header.flags &= ~CTDB_REC_RO_REVOKING_READONLY;
1788 if (ctdb_ltdb_store(ctdb_db, key, &new_header, new_data) != 0) {
1789 ctdb_ltdb_unlock(ctdb_db, key);
1790 DEBUG(DEBUG_ERR,("Failed to write new record in revokechild\n"));
1794 ctdb_ltdb_unlock(ctdb_db, key);
1801 int ctdb_start_revoke_ro_record(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
1804 struct revokechild_handle *rc;
1805 pid_t parent = getpid();
1808 header->flags &= ~(CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY);
1809 header->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
1812 if ((rc = talloc_zero(ctdb_db, struct revokechild_handle)) == NULL) {
1813 DEBUG(DEBUG_ERR,("Failed to allocate revokechild_handle\n"));
1817 tdata = tdb_fetch(ctdb_db->rottdb, key);
1818 if (tdata.dsize > 0) {
1822 tdata.dptr = talloc_memdup(rc, tdata.dptr, tdata.dsize);
1828 rc->ctdb_db = ctdb_db;
1832 talloc_set_destructor(rc, revokechild_destructor);
1834 rc->key.dsize = key.dsize;
1835 rc->key.dptr = talloc_memdup(rc, key.dptr, key.dsize);
1836 if (rc->key.dptr == NULL) {
1837 DEBUG(DEBUG_ERR,("Failed to allocate key for revokechild_handle\n"));
1844 DEBUG(DEBUG_ERR,("Failed to allocate key for revokechild_handle\n"));
1850 rc->child = ctdb_fork(ctdb);
1851 if (rc->child == (pid_t)-1) {
1852 DEBUG(DEBUG_ERR,("Failed to fork child for revokechild\n"));
1857 if (rc->child == 0) {
1860 debug_extra = talloc_asprintf(NULL, "revokechild-%s:", ctdb_db->db_name);
1862 ctdb_set_process_name("ctdb_revokechild");
1863 if (switch_from_server_to_client(ctdb, "revokechild-%s", ctdb_db->db_name) != 0) {
1864 DEBUG(DEBUG_ERR,("Failed to switch from server to client for revokechild process\n"));
1866 goto child_finished;
1869 c = ctdb_revoke_all_delegations(ctdb, ctdb_db, tdata, key, header, data);
1872 sys_write(rc->fd[1], &c, 1);
1873 /* make sure we die when our parent dies */
1874 while (ctdb_kill(ctdb, parent, 0) == 0 || errno != ESRCH) {
1882 set_close_on_exec(rc->fd[0]);
1884 /* This is an active revokechild child process */
1885 DLIST_ADD_END(ctdb_db->revokechild_active, rc, NULL);
1887 rc->fde = event_add_fd(ctdb->ev, rc, rc->fd[0],
1888 EVENT_FD_READ, revokechild_handler,
1890 if (rc->fde == NULL) {
1891 DEBUG(DEBUG_ERR,("Failed to set up fd event for revokechild process\n"));
1894 tevent_fd_set_auto_close(rc->fde);
1899 int ctdb_add_revoke_deferred_call(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_req_header *hdr, deferred_requeue_fn fn, void *call_context)
1901 struct revokechild_handle *rc;
1902 struct revokechild_deferred_call *deferred_call;
1904 for (rc = ctdb_db->revokechild_active; rc; rc = rc->next) {
1905 if (rc->key.dsize == 0) {
1908 if (rc->key.dsize != key.dsize) {
1911 if (!memcmp(rc->key.dptr, key.dptr, key.dsize)) {
1917 DEBUG(DEBUG_ERR,("Failed to add deferred call to revoke list. revoke structure not found\n"));
1921 deferred_call = talloc(rc, struct revokechild_deferred_call);
1922 if (deferred_call == NULL) {
1923 DEBUG(DEBUG_ERR,("Failed to allocate deferred call structure for revoking record\n"));
1927 deferred_call->ctdb = ctdb;
1928 deferred_call->hdr = hdr;
1929 deferred_call->fn = fn;
1930 deferred_call->ctx = call_context;
1932 talloc_set_destructor(deferred_call, deferred_call_destructor);
1933 talloc_steal(deferred_call, hdr);