2 ctdb_call protocol code
4 Copyright (C) Andrew Tridgell 2006
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
20 see http://wiki.samba.org/index.php/Samba_%26_Clustering for
21 protocol design and packet details
25 #include "lib/util/dlinklist.h"
26 #include "system/network.h"
27 #include "system/filesys.h"
28 #include "../include/ctdb_private.h"
29 #include "../common/rb_tree.h"
31 struct ctdb_sticky_record {
32 struct ctdb_context *ctdb;
33 struct ctdb_db_context *ctdb_db;
38 find the ctdb_db from a db index
40 struct ctdb_db_context *find_ctdb_db(struct ctdb_context *ctdb, uint32_t id)
42 struct ctdb_db_context *ctdb_db;
44 for (ctdb_db=ctdb->db_list; ctdb_db; ctdb_db=ctdb_db->next) {
45 if (ctdb_db->db_id == id) {
53 a varient of input packet that can be used in lock requeue
55 static void ctdb_call_input_pkt(void *p, struct ctdb_req_header *hdr)
57 struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
58 ctdb_input_pkt(ctdb, hdr);
65 static void ctdb_send_error(struct ctdb_context *ctdb,
66 struct ctdb_req_header *hdr, uint32_t status,
67 const char *fmt, ...) PRINTF_ATTRIBUTE(4,5);
68 static void ctdb_send_error(struct ctdb_context *ctdb,
69 struct ctdb_req_header *hdr, uint32_t status,
73 struct ctdb_reply_error *r;
77 if (ctdb->methods == NULL) {
78 DEBUG(DEBUG_INFO,(__location__ " Failed to send error. Transport is DOWN\n"));
83 msg = talloc_vasprintf(ctdb, fmt, ap);
85 ctdb_fatal(ctdb, "Unable to allocate error in ctdb_send_error\n");
89 msglen = strlen(msg)+1;
90 len = offsetof(struct ctdb_reply_error, msg);
91 r = ctdb_transport_allocate(ctdb, msg, CTDB_REPLY_ERROR, len + msglen,
92 struct ctdb_reply_error);
93 CTDB_NO_MEMORY_FATAL(ctdb, r);
95 r->hdr.destnode = hdr->srcnode;
96 r->hdr.reqid = hdr->reqid;
99 memcpy(&r->msg[0], msg, msglen);
101 ctdb_queue_packet(ctdb, &r->hdr);
108 * send a redirect reply
110 * The logic behind this function is this:
112 * A client wants to grab a record and sends a CTDB_REQ_CALL packet
113 * to its local ctdb (ctdb_request_call). If the node is not itself
114 * the record's DMASTER, it first redirects the packet to the
115 * record's LMASTER. The LMASTER then redirects the call packet to
116 * the current DMASTER. Note that this works because of this: When
117 * a record is migrated off a node, then the new DMASTER is stored
118 * in the record's copy on the former DMASTER.
120 static void ctdb_call_send_redirect(struct ctdb_context *ctdb,
121 struct ctdb_db_context *ctdb_db,
123 struct ctdb_req_call *c,
124 struct ctdb_ltdb_header *header)
126 uint32_t lmaster = ctdb_lmaster(ctdb, &key);
128 c->hdr.destnode = lmaster;
129 if (ctdb->pnn == lmaster) {
130 c->hdr.destnode = header->dmaster;
134 if (c->hopcount%100 > 95) {
135 DEBUG(DEBUG_WARNING,("High hopcount %d dbid:%s "
136 "key:0x%08x reqid=%08x pnn:%d src:%d lmaster:%d "
137 "header->dmaster:%d dst:%d\n",
138 c->hopcount, ctdb_db->db_name, ctdb_hash(&key),
139 c->hdr.reqid, ctdb->pnn, c->hdr.srcnode, lmaster,
140 header->dmaster, c->hdr.destnode));
143 ctdb_queue_packet(ctdb, &c->hdr);
150 caller must have the chainlock before calling this routine. Caller must be
153 static void ctdb_send_dmaster_reply(struct ctdb_db_context *ctdb_db,
154 struct ctdb_ltdb_header *header,
155 TDB_DATA key, TDB_DATA data,
156 uint32_t new_dmaster,
159 struct ctdb_context *ctdb = ctdb_db->ctdb;
160 struct ctdb_reply_dmaster *r;
164 if (ctdb->pnn != ctdb_lmaster(ctdb, &key)) {
165 DEBUG(DEBUG_ALERT,(__location__ " Caller is not lmaster!\n"));
169 header->dmaster = new_dmaster;
170 ret = ctdb_ltdb_store(ctdb_db, key, header, data);
172 ctdb_fatal(ctdb, "ctdb_send_dmaster_reply unable to update dmaster");
176 if (ctdb->methods == NULL) {
177 ctdb_fatal(ctdb, "ctdb_send_dmaster_reply cant update dmaster since transport is down");
181 /* put the packet on a temporary context, allowing us to safely free
182 it below even if ctdb_reply_dmaster() has freed it already */
183 tmp_ctx = talloc_new(ctdb);
185 /* send the CTDB_REPLY_DMASTER */
186 len = offsetof(struct ctdb_reply_dmaster, data) + key.dsize + data.dsize + sizeof(uint32_t);
187 r = ctdb_transport_allocate(ctdb, tmp_ctx, CTDB_REPLY_DMASTER, len,
188 struct ctdb_reply_dmaster);
189 CTDB_NO_MEMORY_FATAL(ctdb, r);
191 r->hdr.destnode = new_dmaster;
192 r->hdr.reqid = reqid;
193 r->hdr.generation = ctdb_db->generation;
194 r->rsn = header->rsn;
195 r->keylen = key.dsize;
196 r->datalen = data.dsize;
197 r->db_id = ctdb_db->db_id;
198 memcpy(&r->data[0], key.dptr, key.dsize);
199 memcpy(&r->data[key.dsize], data.dptr, data.dsize);
200 memcpy(&r->data[key.dsize+data.dsize], &header->flags, sizeof(uint32_t));
202 ctdb_queue_packet(ctdb, &r->hdr);
204 talloc_free(tmp_ctx);
208 send a dmaster request (give another node the dmaster for a record)
210 This is always sent to the lmaster, which ensures that the lmaster
211 always knows who the dmaster is. The lmaster will then send a
212 CTDB_REPLY_DMASTER to the new dmaster
214 static void ctdb_call_send_dmaster(struct ctdb_db_context *ctdb_db,
215 struct ctdb_req_call *c,
216 struct ctdb_ltdb_header *header,
217 TDB_DATA *key, TDB_DATA *data)
219 struct ctdb_req_dmaster *r;
220 struct ctdb_context *ctdb = ctdb_db->ctdb;
222 uint32_t lmaster = ctdb_lmaster(ctdb, key);
224 if (ctdb->methods == NULL) {
225 ctdb_fatal(ctdb, "Failed ctdb_call_send_dmaster since transport is down");
229 if (data->dsize != 0) {
230 header->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
233 if (lmaster == ctdb->pnn) {
234 ctdb_send_dmaster_reply(ctdb_db, header, *key, *data,
235 c->hdr.srcnode, c->hdr.reqid);
239 len = offsetof(struct ctdb_req_dmaster, data) + key->dsize + data->dsize
241 r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_DMASTER, len,
242 struct ctdb_req_dmaster);
243 CTDB_NO_MEMORY_FATAL(ctdb, r);
244 r->hdr.destnode = lmaster;
245 r->hdr.reqid = c->hdr.reqid;
246 r->hdr.generation = ctdb_db->generation;
248 r->rsn = header->rsn;
249 r->dmaster = c->hdr.srcnode;
250 r->keylen = key->dsize;
251 r->datalen = data->dsize;
252 memcpy(&r->data[0], key->dptr, key->dsize);
253 memcpy(&r->data[key->dsize], data->dptr, data->dsize);
254 memcpy(&r->data[key->dsize + data->dsize], &header->flags, sizeof(uint32_t));
256 header->dmaster = c->hdr.srcnode;
257 if (ctdb_ltdb_store(ctdb_db, *key, header, *data) != 0) {
258 ctdb_fatal(ctdb, "Failed to store record in ctdb_call_send_dmaster");
261 ctdb_queue_packet(ctdb, &r->hdr);
266 static void ctdb_sticky_pindown_timeout(struct event_context *ev, struct timed_event *te,
267 struct timeval t, void *private_data)
269 struct ctdb_sticky_record *sr = talloc_get_type(private_data,
270 struct ctdb_sticky_record);
272 DEBUG(DEBUG_ERR,("Pindown timeout db:%s unstick record\n", sr->ctdb_db->db_name));
273 if (sr->pindown != NULL) {
274 talloc_free(sr->pindown);
280 ctdb_set_sticky_pindown(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key)
282 TALLOC_CTX *tmp_ctx = talloc_new(NULL);
284 struct ctdb_sticky_record *sr;
286 k = ctdb_key_to_idkey(tmp_ctx, key);
288 DEBUG(DEBUG_ERR,("Failed to allocate key for sticky record\n"));
289 talloc_free(tmp_ctx);
293 sr = trbt_lookuparray32(ctdb_db->sticky_records, k[0], &k[0]);
295 talloc_free(tmp_ctx);
299 talloc_free(tmp_ctx);
301 if (sr->pindown == NULL) {
302 DEBUG(DEBUG_ERR,("Pinning down record in %s for %d ms\n", ctdb_db->db_name, ctdb->tunable.sticky_pindown));
303 sr->pindown = talloc_new(sr);
304 if (sr->pindown == NULL) {
305 DEBUG(DEBUG_ERR,("Failed to allocate pindown context for sticky record\n"));
308 event_add_timed(ctdb->ev, sr->pindown, timeval_current_ofs(ctdb->tunable.sticky_pindown / 1000, (ctdb->tunable.sticky_pindown * 1000) % 1000000), ctdb_sticky_pindown_timeout, sr);
315 called when a CTDB_REPLY_DMASTER packet comes in, or when the lmaster
316 gets a CTDB_REQUEST_DMASTER for itself. We become the dmaster.
318 must be called with the chainlock held. This function releases the chainlock
320 static void ctdb_become_dmaster(struct ctdb_db_context *ctdb_db,
321 struct ctdb_req_header *hdr,
322 TDB_DATA key, TDB_DATA data,
323 uint64_t rsn, uint32_t record_flags)
325 struct ctdb_call_state *state;
326 struct ctdb_context *ctdb = ctdb_db->ctdb;
327 struct ctdb_ltdb_header header;
330 DEBUG(DEBUG_DEBUG,("pnn %u dmaster response %08x\n", ctdb->pnn, ctdb_hash(&key)));
334 header.dmaster = ctdb->pnn;
335 header.flags = record_flags;
337 state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_call_state);
340 if (state->call->flags & CTDB_CALL_FLAG_VACUUM_MIGRATION) {
342 * We temporarily add the VACUUM_MIGRATED flag to
343 * the record flags, so that ctdb_ltdb_store can
344 * decide whether the record should be stored or
347 header.flags |= CTDB_REC_FLAG_VACUUM_MIGRATED;
351 if (ctdb_ltdb_store(ctdb_db, key, &header, data) != 0) {
352 ctdb_fatal(ctdb, "ctdb_reply_dmaster store failed\n");
354 ret = ctdb_ltdb_unlock(ctdb_db, key);
356 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
361 /* we just became DMASTER and this database is "sticky",
362 see if the record is flagged as "hot" and set up a pin-down
363 context to stop migrations for a little while if so
365 if (ctdb_db->sticky) {
366 ctdb_set_sticky_pindown(ctdb, ctdb_db, key);
370 DEBUG(DEBUG_ERR,("pnn %u Invalid reqid %u in ctdb_become_dmaster from node %u\n",
371 ctdb->pnn, hdr->reqid, hdr->srcnode));
373 ret = ctdb_ltdb_unlock(ctdb_db, key);
375 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
380 if (key.dsize != state->call->key.dsize || memcmp(key.dptr, state->call->key.dptr, key.dsize)) {
381 DEBUG(DEBUG_ERR, ("Got bogus DMASTER packet reqid:%u from node %u. Key does not match key held in matching idr.\n", hdr->reqid, hdr->srcnode));
383 ret = ctdb_ltdb_unlock(ctdb_db, key);
385 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
390 if (hdr->reqid != state->reqid) {
391 /* we found a record but it was the wrong one */
392 DEBUG(DEBUG_ERR, ("Dropped orphan in ctdb_become_dmaster with reqid:%u\n from node %u", hdr->reqid, hdr->srcnode));
394 ret = ctdb_ltdb_unlock(ctdb_db, key);
396 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
401 ctdb_call_local(ctdb_db, state->call, &header, state, &data, true);
403 ret = ctdb_ltdb_unlock(ctdb_db, state->call->key);
405 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
408 state->state = CTDB_CALL_DONE;
409 if (state->async.fn) {
410 state->async.fn(state);
414 struct dmaster_defer_call {
415 struct dmaster_defer_call *next, *prev;
416 struct ctdb_context *ctdb;
417 struct ctdb_req_header *hdr;
420 struct dmaster_defer_queue {
421 struct ctdb_db_context *ctdb_db;
423 struct dmaster_defer_call *deferred_calls;
426 static void dmaster_defer_reprocess(struct tevent_context *ev,
427 struct tevent_timer *te,
431 struct dmaster_defer_call *call = talloc_get_type(
432 private_data, struct dmaster_defer_call);
434 ctdb_input_pkt(call->ctdb, call->hdr);
438 static int dmaster_defer_queue_destructor(struct dmaster_defer_queue *ddq)
440 /* Ignore requests, if database recovery happens in-between. */
441 if (ddq->generation != ddq->ctdb_db->generation) {
445 while (ddq->deferred_calls != NULL) {
446 struct dmaster_defer_call *call = ddq->deferred_calls;
448 DLIST_REMOVE(ddq->deferred_calls, call);
450 talloc_steal(call->ctdb, call);
451 tevent_add_timer(call->ctdb->ev, call, timeval_zero(),
452 dmaster_defer_reprocess, call);
457 static void *insert_ddq_callback(void *parm, void *data)
466 * This function is used to reigster a key in database that needs to be updated.
467 * Any requests for that key should get deferred till this is completed.
469 static int dmaster_defer_setup(struct ctdb_db_context *ctdb_db,
470 struct ctdb_req_header *hdr,
474 struct dmaster_defer_queue *ddq;
476 k = ctdb_key_to_idkey(hdr, key);
478 DEBUG(DEBUG_ERR, ("Failed to allocate key for dmaster defer setup\n"));
483 ddq = trbt_lookuparray32(ctdb_db->defer_dmaster, k[0], k);
485 if (ddq->generation == ctdb_db->generation) {
490 /* Recovery ocurred - get rid of old queue. All the deferred
491 * requests will be resent anyway from ctdb_call_resend_db.
496 ddq = talloc(hdr, struct dmaster_defer_queue);
498 DEBUG(DEBUG_ERR, ("Failed to allocate dmaster defer queue\n"));
502 ddq->ctdb_db = ctdb_db;
503 ddq->generation = hdr->generation;
504 ddq->deferred_calls = NULL;
506 trbt_insertarray32_callback(ctdb_db->defer_dmaster, k[0], k,
507 insert_ddq_callback, ddq);
508 talloc_set_destructor(ddq, dmaster_defer_queue_destructor);
514 static int dmaster_defer_add(struct ctdb_db_context *ctdb_db,
515 struct ctdb_req_header *hdr,
518 struct dmaster_defer_queue *ddq;
519 struct dmaster_defer_call *call;
522 k = ctdb_key_to_idkey(hdr, key);
524 DEBUG(DEBUG_ERR, ("Failed to allocate key for dmaster defer add\n"));
528 ddq = trbt_lookuparray32(ctdb_db->defer_dmaster, k[0], k);
536 if (ddq->generation != hdr->generation) {
537 talloc_set_destructor(ddq, NULL);
542 call = talloc(ddq, struct dmaster_defer_call);
544 DEBUG(DEBUG_ERR, ("Failed to allocate dmaster defer call\n"));
548 call->ctdb = ctdb_db->ctdb;
549 call->hdr = talloc_steal(call, hdr);
551 DLIST_ADD_END(ddq->deferred_calls, call, NULL);
557 called when a CTDB_REQ_DMASTER packet comes in
559 this comes into the lmaster for a record when the current dmaster
560 wants to give up the dmaster role and give it to someone else
562 void ctdb_request_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
564 struct ctdb_req_dmaster *c = (struct ctdb_req_dmaster *)hdr;
565 TDB_DATA key, data, data2;
566 struct ctdb_ltdb_header header;
567 struct ctdb_db_context *ctdb_db;
568 uint32_t record_flags = 0;
572 ctdb_db = find_ctdb_db(ctdb, c->db_id);
574 ctdb_send_error(ctdb, hdr, -1,
575 "Unknown database in request. db_id==0x%08x",
580 if (hdr->generation != ctdb_db->generation) {
582 ("ctdb operation %u request %u from node %u to %u had an"
583 " invalid generation:%u while our generation is:%u\n",
584 hdr->operation, hdr->reqid, hdr->srcnode, hdr->destnode,
585 hdr->generation, ctdb_db->generation));
590 key.dsize = c->keylen;
591 data.dptr = c->data + c->keylen;
592 data.dsize = c->datalen;
593 len = offsetof(struct ctdb_req_dmaster, data) + key.dsize + data.dsize
595 if (len <= c->hdr.length) {
596 memcpy(&record_flags, &c->data[c->keylen + c->datalen],
597 sizeof(record_flags));
600 dmaster_defer_setup(ctdb_db, hdr, key);
602 /* fetch the current record */
603 ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header, hdr, &data2,
604 ctdb_call_input_pkt, ctdb, false);
606 ctdb_fatal(ctdb, "ctdb_req_dmaster failed to fetch record");
610 DEBUG(DEBUG_INFO,(__location__ " deferring ctdb_request_dmaster\n"));
614 if (ctdb_lmaster(ctdb, &key) != ctdb->pnn) {
615 DEBUG(DEBUG_ALERT,("pnn %u dmaster request to non-lmaster lmaster=%u gen=%u curgen=%u\n",
616 ctdb->pnn, ctdb_lmaster(ctdb, &key),
617 hdr->generation, ctdb->vnn_map->generation));
618 ctdb_fatal(ctdb, "ctdb_req_dmaster to non-lmaster");
621 DEBUG(DEBUG_DEBUG,("pnn %u dmaster request on %08x for %u from %u\n",
622 ctdb->pnn, ctdb_hash(&key), c->dmaster, c->hdr.srcnode));
624 /* its a protocol error if the sending node is not the current dmaster */
625 if (header.dmaster != hdr->srcnode) {
626 DEBUG(DEBUG_ALERT,("pnn %u dmaster request for new-dmaster %u from non-master %u real-dmaster=%u key %08x dbid 0x%08x gen=%u curgen=%u c->rsn=%llu header.rsn=%llu reqid=%u keyval=0x%08x\n",
627 ctdb->pnn, c->dmaster, hdr->srcnode, header.dmaster, ctdb_hash(&key),
628 ctdb_db->db_id, hdr->generation, ctdb_db->generation,
629 (unsigned long long)c->rsn, (unsigned long long)header.rsn, c->hdr.reqid,
630 (key.dsize >= 4)?(*(uint32_t *)key.dptr):0));
631 if (header.rsn != 0 || header.dmaster != ctdb->pnn) {
632 DEBUG(DEBUG_ERR,("ctdb_req_dmaster from non-master. Force a recovery.\n"));
634 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
635 ctdb_ltdb_unlock(ctdb_db, key);
640 if (header.rsn > c->rsn) {
641 DEBUG(DEBUG_ALERT,("pnn %u dmaster request with older RSN new-dmaster %u from %u real-dmaster=%u key %08x dbid 0x%08x gen=%u curgen=%u c->rsn=%llu header.rsn=%llu reqid=%u\n",
642 ctdb->pnn, c->dmaster, hdr->srcnode, header.dmaster, ctdb_hash(&key),
643 ctdb_db->db_id, hdr->generation, ctdb_db->generation,
644 (unsigned long long)c->rsn, (unsigned long long)header.rsn, c->hdr.reqid));
647 /* use the rsn from the sending node */
650 /* store the record flags from the sending node */
651 header.flags = record_flags;
653 /* check if the new dmaster is the lmaster, in which case we
654 skip the dmaster reply */
655 if (c->dmaster == ctdb->pnn) {
656 ctdb_become_dmaster(ctdb_db, hdr, key, data, c->rsn, record_flags);
658 ctdb_send_dmaster_reply(ctdb_db, &header, key, data, c->dmaster, hdr->reqid);
660 ret = ctdb_ltdb_unlock(ctdb_db, key);
662 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
667 static void ctdb_sticky_record_timeout(struct event_context *ev, struct timed_event *te,
668 struct timeval t, void *private_data)
670 struct ctdb_sticky_record *sr = talloc_get_type(private_data,
671 struct ctdb_sticky_record);
675 static void *ctdb_make_sticky_record_callback(void *parm, void *data)
678 DEBUG(DEBUG_ERR,("Already have sticky record registered. Free old %p and create new %p\n", data, parm));
685 ctdb_make_record_sticky(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key)
687 TALLOC_CTX *tmp_ctx = talloc_new(NULL);
689 struct ctdb_sticky_record *sr;
691 k = ctdb_key_to_idkey(tmp_ctx, key);
693 DEBUG(DEBUG_ERR,("Failed to allocate key for sticky record\n"));
694 talloc_free(tmp_ctx);
698 sr = trbt_lookuparray32(ctdb_db->sticky_records, k[0], &k[0]);
700 talloc_free(tmp_ctx);
704 sr = talloc(ctdb_db->sticky_records, struct ctdb_sticky_record);
706 talloc_free(tmp_ctx);
707 DEBUG(DEBUG_ERR,("Failed to allocate sticky record structure\n"));
712 sr->ctdb_db = ctdb_db;
715 DEBUG(DEBUG_ERR,("Make record sticky for %d seconds in db %s key:0x%08x.\n",
716 ctdb->tunable.sticky_duration,
717 ctdb_db->db_name, ctdb_hash(&key)));
719 trbt_insertarray32_callback(ctdb_db->sticky_records, k[0], &k[0], ctdb_make_sticky_record_callback, sr);
721 event_add_timed(ctdb->ev, sr, timeval_current_ofs(ctdb->tunable.sticky_duration, 0), ctdb_sticky_record_timeout, sr);
723 talloc_free(tmp_ctx);
727 struct pinned_down_requeue_handle {
728 struct ctdb_context *ctdb;
729 struct ctdb_req_header *hdr;
732 struct pinned_down_deferred_call {
733 struct ctdb_context *ctdb;
734 struct ctdb_req_header *hdr;
737 static void pinned_down_requeue(struct event_context *ev, struct timed_event *te,
738 struct timeval t, void *private_data)
740 struct pinned_down_requeue_handle *handle = talloc_get_type(private_data, struct pinned_down_requeue_handle);
741 struct ctdb_context *ctdb = handle->ctdb;
743 talloc_steal(ctdb, handle->hdr);
744 ctdb_call_input_pkt(ctdb, handle->hdr);
749 static int pinned_down_destructor(struct pinned_down_deferred_call *pinned_down)
751 struct ctdb_context *ctdb = pinned_down->ctdb;
752 struct pinned_down_requeue_handle *handle = talloc(ctdb, struct pinned_down_requeue_handle);
754 handle->ctdb = pinned_down->ctdb;
755 handle->hdr = pinned_down->hdr;
756 talloc_steal(handle, handle->hdr);
758 event_add_timed(ctdb->ev, handle, timeval_zero(), pinned_down_requeue, handle);
764 ctdb_defer_pinned_down_request(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_req_header *hdr)
766 TALLOC_CTX *tmp_ctx = talloc_new(NULL);
768 struct ctdb_sticky_record *sr;
769 struct pinned_down_deferred_call *pinned_down;
771 k = ctdb_key_to_idkey(tmp_ctx, key);
773 DEBUG(DEBUG_ERR,("Failed to allocate key for sticky record\n"));
774 talloc_free(tmp_ctx);
778 sr = trbt_lookuparray32(ctdb_db->sticky_records, k[0], &k[0]);
780 talloc_free(tmp_ctx);
784 talloc_free(tmp_ctx);
786 if (sr->pindown == NULL) {
790 pinned_down = talloc(sr->pindown, struct pinned_down_deferred_call);
791 if (pinned_down == NULL) {
792 DEBUG(DEBUG_ERR,("Failed to allocate structure for deferred pinned down request\n"));
796 pinned_down->ctdb = ctdb;
797 pinned_down->hdr = hdr;
799 talloc_set_destructor(pinned_down, pinned_down_destructor);
800 talloc_steal(pinned_down, hdr);
806 ctdb_update_db_stat_hot_keys(struct ctdb_db_context *ctdb_db, TDB_DATA key, int hopcount)
810 /* smallest value is always at index 0 */
811 if (hopcount <= ctdb_db->statistics.hot_keys[0].count) {
815 /* see if we already know this key */
816 for (i = 0; i < MAX_HOT_KEYS; i++) {
817 if (key.dsize != ctdb_db->statistics.hot_keys[i].key.dsize) {
820 if (memcmp(key.dptr, ctdb_db->statistics.hot_keys[i].key.dptr, key.dsize)) {
823 /* found an entry for this key */
824 if (hopcount <= ctdb_db->statistics.hot_keys[i].count) {
827 ctdb_db->statistics.hot_keys[i].count = hopcount;
831 if (ctdb_db->statistics.num_hot_keys < MAX_HOT_KEYS) {
832 id = ctdb_db->statistics.num_hot_keys;
833 ctdb_db->statistics.num_hot_keys++;
838 if (ctdb_db->statistics.hot_keys[id].key.dptr != NULL) {
839 talloc_free(ctdb_db->statistics.hot_keys[id].key.dptr);
841 ctdb_db->statistics.hot_keys[id].key.dsize = key.dsize;
842 ctdb_db->statistics.hot_keys[id].key.dptr = talloc_memdup(ctdb_db, key.dptr, key.dsize);
843 ctdb_db->statistics.hot_keys[id].count = hopcount;
844 DEBUG(DEBUG_NOTICE,("Updated hot key database=%s key=0x%08x id=%d hop_count=%d\n",
845 ctdb_db->db_name, ctdb_hash(&key), id, hopcount));
848 for (i = 1; i < MAX_HOT_KEYS; i++) {
849 if (ctdb_db->statistics.hot_keys[i].count == 0) {
852 if (ctdb_db->statistics.hot_keys[i].count < ctdb_db->statistics.hot_keys[0].count) {
853 hopcount = ctdb_db->statistics.hot_keys[i].count;
854 ctdb_db->statistics.hot_keys[i].count = ctdb_db->statistics.hot_keys[0].count;
855 ctdb_db->statistics.hot_keys[0].count = hopcount;
857 key = ctdb_db->statistics.hot_keys[i].key;
858 ctdb_db->statistics.hot_keys[i].key = ctdb_db->statistics.hot_keys[0].key;
859 ctdb_db->statistics.hot_keys[0].key = key;
865 called when a CTDB_REQ_CALL packet comes in
867 void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
869 struct ctdb_req_call *c = (struct ctdb_req_call *)hdr;
871 struct ctdb_reply_call *r;
873 struct ctdb_ltdb_header header;
874 struct ctdb_call *call;
875 struct ctdb_db_context *ctdb_db;
876 int tmp_count, bucket;
878 if (ctdb->methods == NULL) {
879 DEBUG(DEBUG_INFO,(__location__ " Failed ctdb_request_call. Transport is DOWN\n"));
883 ctdb_db = find_ctdb_db(ctdb, c->db_id);
885 ctdb_send_error(ctdb, hdr, -1,
886 "Unknown database in request. db_id==0x%08x",
891 if (hdr->generation != ctdb_db->generation) {
893 ("ctdb operation %u request %u from node %u to %u had an"
894 " invalid generation:%u while our generation is:%u\n",
895 hdr->operation, hdr->reqid, hdr->srcnode, hdr->destnode,
896 hdr->generation, ctdb_db->generation));
900 call = talloc(hdr, struct ctdb_call);
901 CTDB_NO_MEMORY_FATAL(ctdb, call);
903 call->call_id = c->callid;
904 call->key.dptr = c->data;
905 call->key.dsize = c->keylen;
906 call->call_data.dptr = c->data + c->keylen;
907 call->call_data.dsize = c->calldatalen;
908 call->reply_data.dptr = NULL;
909 call->reply_data.dsize = 0;
912 /* If this record is pinned down we should defer the
913 request until the pindown times out
915 if (ctdb_db->sticky) {
916 if (ctdb_defer_pinned_down_request(ctdb, ctdb_db, call->key, hdr) == 0) {
918 ("Defer request for pinned down record in %s\n", ctdb_db->db_name));
924 if (dmaster_defer_add(ctdb_db, hdr, call->key) == 0) {
929 /* determine if we are the dmaster for this key. This also
930 fetches the record data (if any), thus avoiding a 2nd fetch of the data
931 if the call will be answered locally */
933 ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, call->key, &header, hdr, &data,
934 ctdb_call_input_pkt, ctdb, false);
936 ctdb_send_error(ctdb, hdr, ret, "ltdb fetch failed in ctdb_request_call");
941 DEBUG(DEBUG_INFO,(__location__ " deferred ctdb_request_call\n"));
946 /* Dont do READONLY if we dont have a tracking database */
947 if ((c->flags & CTDB_WANT_READONLY) && !ctdb_db->readonly) {
948 c->flags &= ~CTDB_WANT_READONLY;
951 if (header.flags & CTDB_REC_RO_REVOKE_COMPLETE) {
952 header.flags &= ~CTDB_REC_RO_FLAGS;
953 CTDB_INCREMENT_STAT(ctdb, total_ro_revokes);
954 CTDB_INCREMENT_DB_STAT(ctdb_db, db_ro_revokes);
955 if (ctdb_ltdb_store(ctdb_db, call->key, &header, data) != 0) {
956 ctdb_fatal(ctdb, "Failed to write header with cleared REVOKE flag");
958 /* and clear out the tracking data */
959 if (tdb_delete(ctdb_db->rottdb, call->key) != 0) {
960 DEBUG(DEBUG_ERR,(__location__ " Failed to clear out trackingdb record\n"));
964 /* if we are revoking, we must defer all other calls until the revoke
967 if (header.flags & CTDB_REC_RO_REVOKING_READONLY) {
968 talloc_free(data.dptr);
969 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
971 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, call->key, hdr, ctdb_call_input_pkt, ctdb) != 0) {
972 ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
979 * If we are not the dmaster and are not hosting any delegations,
980 * then we redirect the request to the node than can answer it
981 * (the lmaster or the dmaster).
983 if ((header.dmaster != ctdb->pnn)
984 && (!(header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) ) {
985 talloc_free(data.dptr);
986 ctdb_call_send_redirect(ctdb, ctdb_db, call->key, c, &header);
988 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
990 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
996 if ( (!(c->flags & CTDB_WANT_READONLY))
997 && (header.flags & (CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY)) ) {
998 header.flags |= CTDB_REC_RO_REVOKING_READONLY;
999 if (ctdb_ltdb_store(ctdb_db, call->key, &header, data) != 0) {
1000 ctdb_fatal(ctdb, "Failed to store record with HAVE_DELEGATIONS set");
1002 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
1004 if (ctdb_start_revoke_ro_record(ctdb, ctdb_db, call->key, &header, data) != 0) {
1005 ctdb_fatal(ctdb, "Failed to start record revoke");
1007 talloc_free(data.dptr);
1009 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, call->key, hdr, ctdb_call_input_pkt, ctdb) != 0) {
1010 ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
1017 /* If this is the first request for delegation. bump rsn and set
1018 * the delegations flag
1020 if ((c->flags & CTDB_WANT_READONLY)
1021 && (c->callid == CTDB_FETCH_WITH_HEADER_FUNC)
1022 && (!(header.flags & CTDB_REC_RO_HAVE_DELEGATIONS))) {
1024 header.flags |= CTDB_REC_RO_HAVE_DELEGATIONS;
1025 if (ctdb_ltdb_store(ctdb_db, call->key, &header, data) != 0) {
1026 ctdb_fatal(ctdb, "Failed to store record with HAVE_DELEGATIONS set");
1029 if ((c->flags & CTDB_WANT_READONLY)
1030 && (call->call_id == CTDB_FETCH_WITH_HEADER_FUNC)) {
1033 tdata = tdb_fetch(ctdb_db->rottdb, call->key);
1034 if (ctdb_trackingdb_add_pnn(ctdb, &tdata, c->hdr.srcnode) != 0) {
1035 ctdb_fatal(ctdb, "Failed to add node to trackingdb");
1037 if (tdb_store(ctdb_db->rottdb, call->key, tdata, TDB_REPLACE) != 0) {
1038 ctdb_fatal(ctdb, "Failed to store trackingdb data");
1042 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
1044 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
1047 len = offsetof(struct ctdb_reply_call, data) + data.dsize + sizeof(struct ctdb_ltdb_header);
1048 r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REPLY_CALL, len,
1049 struct ctdb_reply_call);
1050 CTDB_NO_MEMORY_FATAL(ctdb, r);
1051 r->hdr.destnode = c->hdr.srcnode;
1052 r->hdr.reqid = c->hdr.reqid;
1053 r->hdr.generation = ctdb_db->generation;
1055 r->datalen = data.dsize + sizeof(struct ctdb_ltdb_header);
1057 header.flags |= CTDB_REC_RO_HAVE_READONLY;
1058 header.flags &= ~CTDB_REC_RO_HAVE_DELEGATIONS;
1059 memcpy(&r->data[0], &header, sizeof(struct ctdb_ltdb_header));
1062 memcpy(&r->data[sizeof(struct ctdb_ltdb_header)], data.dptr, data.dsize);
1065 ctdb_queue_packet(ctdb, &r->hdr);
1066 CTDB_INCREMENT_STAT(ctdb, total_ro_delegations);
1067 CTDB_INCREMENT_DB_STAT(ctdb_db, db_ro_delegations);
1074 CTDB_UPDATE_STAT(ctdb, max_hop_count, c->hopcount);
1075 tmp_count = c->hopcount;
1081 if (bucket >= MAX_COUNT_BUCKETS) {
1082 bucket = MAX_COUNT_BUCKETS - 1;
1084 CTDB_INCREMENT_STAT(ctdb, hop_count_bucket[bucket]);
1085 CTDB_INCREMENT_DB_STAT(ctdb_db, hop_count_bucket[bucket]);
1086 ctdb_update_db_stat_hot_keys(ctdb_db, call->key, c->hopcount);
1088 /* If this database supports sticky records, then check if the
1089 hopcount is big. If it is it means the record is hot and we
1090 should make it sticky.
1092 if (ctdb_db->sticky && c->hopcount >= ctdb->tunable.hopcount_make_sticky) {
1093 ctdb_make_record_sticky(ctdb, ctdb_db, call->key);
1097 /* Try if possible to migrate the record off to the caller node.
1098 * From the clients perspective a fetch of the data is just as
1099 * expensive as a migration.
1101 if (c->hdr.srcnode != ctdb->pnn) {
1102 if (ctdb_db->persistent_state) {
1103 DEBUG(DEBUG_INFO, (__location__ " refusing migration"
1104 " of key %s while transaction is active\n",
1105 (char *)call->key.dptr));
1107 DEBUG(DEBUG_DEBUG,("pnn %u starting migration of %08x to %u\n",
1108 ctdb->pnn, ctdb_hash(&(call->key)), c->hdr.srcnode));
1109 ctdb_call_send_dmaster(ctdb_db, c, &header, &(call->key), &data);
1110 talloc_free(data.dptr);
1112 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
1114 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
1121 ret = ctdb_call_local(ctdb_db, call, &header, hdr, &data, true);
1123 DEBUG(DEBUG_ERR,(__location__ " ctdb_call_local failed\n"));
1127 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
1129 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
1132 len = offsetof(struct ctdb_reply_call, data) + call->reply_data.dsize;
1133 r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REPLY_CALL, len,
1134 struct ctdb_reply_call);
1135 CTDB_NO_MEMORY_FATAL(ctdb, r);
1136 r->hdr.destnode = hdr->srcnode;
1137 r->hdr.reqid = hdr->reqid;
1138 r->hdr.generation = ctdb_db->generation;
1139 r->status = call->status;
1140 r->datalen = call->reply_data.dsize;
1141 if (call->reply_data.dsize) {
1142 memcpy(&r->data[0], call->reply_data.dptr, call->reply_data.dsize);
1145 ctdb_queue_packet(ctdb, &r->hdr);
1152 * called when a CTDB_REPLY_CALL packet comes in
1154 * This packet comes in response to a CTDB_REQ_CALL request packet. It
1155 * contains any reply data from the call
1157 void ctdb_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
1159 struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr;
1160 struct ctdb_call_state *state;
1162 state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_call_state);
1163 if (state == NULL) {
1164 DEBUG(DEBUG_ERR, (__location__ " reqid %u not found\n", hdr->reqid));
1168 if (hdr->reqid != state->reqid) {
1169 /* we found a record but it was the wrong one */
1170 DEBUG(DEBUG_ERR, ("Dropped orphaned call reply with reqid:%u\n",hdr->reqid));
1174 if (hdr->generation != state->generation) {
1176 ("ctdb operation %u request %u from node %u to %u had an"
1177 " invalid generation:%u while our generation is:%u\n",
1178 hdr->operation, hdr->reqid, hdr->srcnode, hdr->destnode,
1179 hdr->generation, state->generation));
1184 /* read only delegation processing */
1185 /* If we got a FETCH_WITH_HEADER we should check if this is a ro
1186 * delegation since we may need to update the record header
1188 if (state->c->callid == CTDB_FETCH_WITH_HEADER_FUNC) {
1189 struct ctdb_db_context *ctdb_db = state->ctdb_db;
1190 struct ctdb_ltdb_header *header = (struct ctdb_ltdb_header *)&c->data[0];
1191 struct ctdb_ltdb_header oldheader;
1192 TDB_DATA key, data, olddata;
1195 if (!(header->flags & CTDB_REC_RO_HAVE_READONLY)) {
1200 key.dsize = state->c->keylen;
1201 key.dptr = state->c->data;
1202 ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr,
1203 ctdb_call_input_pkt, ctdb, false);
1208 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock in ctdb_reply_call\n"));
1212 ret = ctdb_ltdb_fetch(ctdb_db, key, &oldheader, state, &olddata);
1214 DEBUG(DEBUG_ERR, ("Failed to fetch old record in ctdb_reply_call\n"));
1215 ctdb_ltdb_unlock(ctdb_db, key);
1219 if (header->rsn <= oldheader.rsn) {
1220 ctdb_ltdb_unlock(ctdb_db, key);
1224 if (c->datalen < sizeof(struct ctdb_ltdb_header)) {
1225 DEBUG(DEBUG_ERR,(__location__ " Got FETCH_WITH_HEADER reply with too little data: %d bytes\n", c->datalen));
1226 ctdb_ltdb_unlock(ctdb_db, key);
1230 data.dsize = c->datalen - sizeof(struct ctdb_ltdb_header);
1231 data.dptr = &c->data[sizeof(struct ctdb_ltdb_header)];
1232 ret = ctdb_ltdb_store(ctdb_db, key, header, data);
1234 DEBUG(DEBUG_ERR, ("Failed to store new record in ctdb_reply_call\n"));
1235 ctdb_ltdb_unlock(ctdb_db, key);
1239 ctdb_ltdb_unlock(ctdb_db, key);
1243 state->call->reply_data.dptr = c->data;
1244 state->call->reply_data.dsize = c->datalen;
1245 state->call->status = c->status;
1247 talloc_steal(state, c);
1249 state->state = CTDB_CALL_DONE;
1250 if (state->async.fn) {
1251 state->async.fn(state);
1257 * called when a CTDB_REPLY_DMASTER packet comes in
1259 * This packet comes in from the lmaster in response to a CTDB_REQ_CALL
1260 * request packet. It means that the current dmaster wants to give us
1263 void ctdb_reply_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
1265 struct ctdb_reply_dmaster *c = (struct ctdb_reply_dmaster *)hdr;
1266 struct ctdb_db_context *ctdb_db;
1268 uint32_t record_flags = 0;
1272 ctdb_db = find_ctdb_db(ctdb, c->db_id);
1273 if (ctdb_db == NULL) {
1274 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_reply_dmaster\n", c->db_id));
1278 if (hdr->generation != ctdb_db->generation) {
1280 ("ctdb operation %u request %u from node %u to %u had an"
1281 " invalid generation:%u while our generation is:%u\n",
1282 hdr->operation, hdr->reqid, hdr->srcnode, hdr->destnode,
1283 hdr->generation, ctdb_db->generation));
1288 key.dsize = c->keylen;
1289 data.dptr = &c->data[key.dsize];
1290 data.dsize = c->datalen;
1291 len = offsetof(struct ctdb_reply_dmaster, data) + key.dsize + data.dsize
1293 if (len <= c->hdr.length) {
1294 memcpy(&record_flags, &c->data[c->keylen + c->datalen],
1295 sizeof(record_flags));
1298 dmaster_defer_setup(ctdb_db, hdr, key);
1300 ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr,
1301 ctdb_call_input_pkt, ctdb, false);
1306 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock in ctdb_reply_dmaster\n"));
1310 ctdb_become_dmaster(ctdb_db, hdr, key, data, c->rsn, record_flags);
1315 called when a CTDB_REPLY_ERROR packet comes in
1317 void ctdb_reply_error(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
1319 struct ctdb_reply_error *c = (struct ctdb_reply_error *)hdr;
1320 struct ctdb_call_state *state;
1322 state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_call_state);
1323 if (state == NULL) {
1324 DEBUG(DEBUG_ERR,("pnn %u Invalid reqid %u in ctdb_reply_error\n",
1325 ctdb->pnn, hdr->reqid));
1329 if (hdr->reqid != state->reqid) {
1330 /* we found a record but it was the wrong one */
1331 DEBUG(DEBUG_ERR, ("Dropped orphaned error reply with reqid:%u\n",hdr->reqid));
1335 talloc_steal(state, c);
1337 state->state = CTDB_CALL_ERROR;
1338 state->errmsg = (char *)c->msg;
1339 if (state->async.fn) {
1340 state->async.fn(state);
1348 static int ctdb_call_destructor(struct ctdb_call_state *state)
1350 DLIST_REMOVE(state->ctdb_db->pending_calls, state);
1351 ctdb_reqid_remove(state->ctdb_db->ctdb, state->reqid);
1357 called when a ctdb_call needs to be resent after a reconfigure event
1359 static void ctdb_call_resend(struct ctdb_call_state *state)
1361 struct ctdb_context *ctdb = state->ctdb_db->ctdb;
1363 state->generation = state->ctdb_db->generation;
1365 /* use a new reqid, in case the old reply does eventually come in */
1366 ctdb_reqid_remove(ctdb, state->reqid);
1367 state->reqid = ctdb_reqid_new(ctdb, state);
1368 state->c->hdr.reqid = state->reqid;
1370 /* update the generation count for this request, so its valid with the new vnn_map */
1371 state->c->hdr.generation = state->generation;
1373 /* send the packet to ourselves, it will be redirected appropriately */
1374 state->c->hdr.destnode = ctdb->pnn;
1376 ctdb_queue_packet(ctdb, &state->c->hdr);
1377 DEBUG(DEBUG_NOTICE,("resent ctdb_call\n"));
1381 resend all pending calls on recovery
1383 void ctdb_call_resend_db(struct ctdb_db_context *ctdb_db)
1385 struct ctdb_call_state *state, *next;
1387 for (state = ctdb_db->pending_calls; state; state = next) {
1389 ctdb_call_resend(state);
1393 void ctdb_call_resend_all(struct ctdb_context *ctdb)
1395 struct ctdb_db_context *ctdb_db;
1397 for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
1398 ctdb_call_resend_db(ctdb_db);
1403 this allows the caller to setup a async.fn
1405 static void call_local_trigger(struct event_context *ev, struct timed_event *te,
1406 struct timeval t, void *private_data)
1408 struct ctdb_call_state *state = talloc_get_type(private_data, struct ctdb_call_state);
1409 if (state->async.fn) {
1410 state->async.fn(state);
1416 construct an event driven local ctdb_call
1418 this is used so that locally processed ctdb_call requests are processed
1419 in an event driven manner
1421 struct ctdb_call_state *ctdb_call_local_send(struct ctdb_db_context *ctdb_db,
1422 struct ctdb_call *call,
1423 struct ctdb_ltdb_header *header,
1426 struct ctdb_call_state *state;
1427 struct ctdb_context *ctdb = ctdb_db->ctdb;
1430 state = talloc_zero(ctdb_db, struct ctdb_call_state);
1431 CTDB_NO_MEMORY_NULL(ctdb, state);
1433 talloc_steal(state, data->dptr);
1435 state->state = CTDB_CALL_DONE;
1436 state->call = talloc(state, struct ctdb_call);
1437 CTDB_NO_MEMORY_NULL(ctdb, state->call);
1438 *(state->call) = *call;
1439 state->ctdb_db = ctdb_db;
1441 ret = ctdb_call_local(ctdb_db, state->call, header, state, data, true);
1443 DEBUG(DEBUG_DEBUG,("ctdb_call_local() failed, ignoring return code %d\n", ret));
1446 event_add_timed(ctdb->ev, state, timeval_zero(), call_local_trigger, state);
1453 make a remote ctdb call - async send. Called in daemon context.
1455 This constructs a ctdb_call request and queues it for processing.
1456 This call never blocks.
1458 struct ctdb_call_state *ctdb_daemon_call_send_remote(struct ctdb_db_context *ctdb_db,
1459 struct ctdb_call *call,
1460 struct ctdb_ltdb_header *header)
1463 struct ctdb_call_state *state;
1464 struct ctdb_context *ctdb = ctdb_db->ctdb;
1466 if (ctdb->methods == NULL) {
1467 DEBUG(DEBUG_INFO,(__location__ " Failed send packet. Transport is down\n"));
1471 state = talloc_zero(ctdb_db, struct ctdb_call_state);
1472 CTDB_NO_MEMORY_NULL(ctdb, state);
1473 state->call = talloc(state, struct ctdb_call);
1474 CTDB_NO_MEMORY_NULL(ctdb, state->call);
1476 state->reqid = ctdb_reqid_new(ctdb, state);
1477 state->ctdb_db = ctdb_db;
1478 talloc_set_destructor(state, ctdb_call_destructor);
1480 len = offsetof(struct ctdb_req_call, data) + call->key.dsize + call->call_data.dsize;
1481 state->c = ctdb_transport_allocate(ctdb, state, CTDB_REQ_CALL, len,
1482 struct ctdb_req_call);
1483 CTDB_NO_MEMORY_NULL(ctdb, state->c);
1484 state->c->hdr.destnode = header->dmaster;
1486 /* this limits us to 16k outstanding messages - not unreasonable */
1487 state->c->hdr.reqid = state->reqid;
1488 state->c->hdr.generation = ctdb_db->generation;
1489 state->c->flags = call->flags;
1490 state->c->db_id = ctdb_db->db_id;
1491 state->c->callid = call->call_id;
1492 state->c->hopcount = 0;
1493 state->c->keylen = call->key.dsize;
1494 state->c->calldatalen = call->call_data.dsize;
1495 memcpy(&state->c->data[0], call->key.dptr, call->key.dsize);
1496 memcpy(&state->c->data[call->key.dsize],
1497 call->call_data.dptr, call->call_data.dsize);
1498 *(state->call) = *call;
1499 state->call->call_data.dptr = &state->c->data[call->key.dsize];
1500 state->call->key.dptr = &state->c->data[0];
1502 state->state = CTDB_CALL_WAIT;
1503 state->generation = ctdb_db->generation;
1505 DLIST_ADD(ctdb_db->pending_calls, state);
1507 ctdb_queue_packet(ctdb, &state->c->hdr);
1513 make a remote ctdb call - async recv - called in daemon context
1515 This is called when the program wants to wait for a ctdb_call to complete and get the
1516 results. This call will block unless the call has already completed.
1518 int ctdb_daemon_call_recv(struct ctdb_call_state *state, struct ctdb_call *call)
1520 while (state->state < CTDB_CALL_DONE) {
1521 event_loop_once(state->ctdb_db->ctdb->ev);
1523 if (state->state != CTDB_CALL_DONE) {
1524 ctdb_set_error(state->ctdb_db->ctdb, "%s", state->errmsg);
1529 if (state->call->reply_data.dsize) {
1530 call->reply_data.dptr = talloc_memdup(call,
1531 state->call->reply_data.dptr,
1532 state->call->reply_data.dsize);
1533 call->reply_data.dsize = state->call->reply_data.dsize;
1535 call->reply_data.dptr = NULL;
1536 call->reply_data.dsize = 0;
1538 call->status = state->call->status;
1545 send a keepalive packet to the other node
1547 void ctdb_send_keepalive(struct ctdb_context *ctdb, uint32_t destnode)
1549 struct ctdb_req_keepalive *r;
1551 if (ctdb->methods == NULL) {
1552 DEBUG(DEBUG_INFO,(__location__ " Failed to send keepalive. Transport is DOWN\n"));
1556 r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_KEEPALIVE,
1557 sizeof(struct ctdb_req_keepalive),
1558 struct ctdb_req_keepalive);
1559 CTDB_NO_MEMORY_FATAL(ctdb, r);
1560 r->hdr.destnode = destnode;
1563 CTDB_INCREMENT_STAT(ctdb, keepalive_packets_sent);
1565 ctdb_queue_packet(ctdb, &r->hdr);
1572 struct revokechild_deferred_call {
1573 struct ctdb_context *ctdb;
1574 struct ctdb_req_header *hdr;
1575 deferred_requeue_fn fn;
1579 struct revokechild_handle {
1580 struct revokechild_handle *next, *prev;
1581 struct ctdb_context *ctdb;
1582 struct ctdb_db_context *ctdb_db;
1583 struct fd_event *fde;
1590 struct revokechild_requeue_handle {
1591 struct ctdb_context *ctdb;
1592 struct ctdb_req_header *hdr;
1593 deferred_requeue_fn fn;
1597 static void deferred_call_requeue(struct event_context *ev, struct timed_event *te,
1598 struct timeval t, void *private_data)
1600 struct revokechild_requeue_handle *requeue_handle = talloc_get_type(private_data, struct revokechild_requeue_handle);
1602 requeue_handle->fn(requeue_handle->ctx, requeue_handle->hdr);
1603 talloc_free(requeue_handle);
1606 static int deferred_call_destructor(struct revokechild_deferred_call *deferred_call)
1608 struct ctdb_context *ctdb = deferred_call->ctdb;
1609 struct revokechild_requeue_handle *requeue_handle = talloc(ctdb, struct revokechild_requeue_handle);
1610 struct ctdb_req_call *c = (struct ctdb_req_call *)deferred_call->hdr;
1612 requeue_handle->ctdb = ctdb;
1613 requeue_handle->hdr = deferred_call->hdr;
1614 requeue_handle->fn = deferred_call->fn;
1615 requeue_handle->ctx = deferred_call->ctx;
1616 talloc_steal(requeue_handle, requeue_handle->hdr);
1618 /* when revoking, any READONLY requests have 1 second grace to let read/write finish first */
1619 event_add_timed(ctdb->ev, requeue_handle, timeval_current_ofs(c->flags & CTDB_WANT_READONLY ? 1 : 0, 0), deferred_call_requeue, requeue_handle);
1625 static int revokechild_destructor(struct revokechild_handle *rc)
1627 if (rc->fde != NULL) {
1628 talloc_free(rc->fde);
1631 if (rc->fd[0] != -1) {
1634 if (rc->fd[1] != -1) {
1637 ctdb_kill(rc->ctdb, rc->child, SIGKILL);
1639 DLIST_REMOVE(rc->ctdb_db->revokechild_active, rc);
1643 static void revokechild_handler(struct event_context *ev, struct fd_event *fde,
1644 uint16_t flags, void *private_data)
1646 struct revokechild_handle *rc = talloc_get_type(private_data,
1647 struct revokechild_handle);
1651 ret = sys_read(rc->fd[0], &c, 1);
1653 DEBUG(DEBUG_ERR,("Failed to read status from revokechild. errno:%d\n", errno));
1659 DEBUG(DEBUG_ERR,("revokechild returned failure. status:%d\n", c));
1668 struct ctdb_revoke_state {
1669 struct ctdb_db_context *ctdb_db;
1671 struct ctdb_ltdb_header *header;
1678 static void update_record_cb(struct ctdb_client_control_state *state)
1680 struct ctdb_revoke_state *revoke_state;
1684 if (state == NULL) {
1687 revoke_state = state->async.private_data;
1689 state->async.fn = NULL;
1690 ret = ctdb_control_recv(state->ctdb, state, state, NULL, &res, NULL);
1691 if ((ret != 0) || (res != 0)) {
1692 DEBUG(DEBUG_ERR,("Recv for revoke update record failed ret:%d res:%d\n", ret, res));
1693 revoke_state->status = -1;
1696 revoke_state->count--;
1697 if (revoke_state->count <= 0) {
1698 revoke_state->finished = 1;
1702 static void revoke_send_cb(struct ctdb_context *ctdb, uint32_t pnn, void *private_data)
1704 struct ctdb_revoke_state *revoke_state = private_data;
1705 struct ctdb_client_control_state *state;
1707 state = ctdb_ctrl_updaterecord_send(ctdb, revoke_state, timeval_current_ofs(ctdb->tunable.control_timeout,0), pnn, revoke_state->ctdb_db, revoke_state->key, revoke_state->header, revoke_state->data);
1708 if (state == NULL) {
1709 DEBUG(DEBUG_ERR,("Failure to send update record to revoke readonly delegation\n"));
1710 revoke_state->status = -1;
1713 state->async.fn = update_record_cb;
1714 state->async.private_data = revoke_state;
1716 revoke_state->count++;
1720 static void ctdb_revoke_timeout_handler(struct event_context *ev, struct timed_event *te,
1721 struct timeval yt, void *private_data)
1723 struct ctdb_revoke_state *state = private_data;
1725 DEBUG(DEBUG_ERR,("Timed out waiting for revoke to finish\n"));
1726 state->finished = 1;
1730 static int ctdb_revoke_all_delegations(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA tdata, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
1732 struct ctdb_revoke_state *state = talloc_zero(ctdb, struct ctdb_revoke_state);
1733 struct ctdb_ltdb_header new_header;
1736 state->ctdb_db = ctdb_db;
1738 state->header = header;
1741 ctdb_trackingdb_traverse(ctdb, tdata, revoke_send_cb, state);
1743 event_add_timed(ctdb->ev, state, timeval_current_ofs(ctdb->tunable.control_timeout, 0), ctdb_revoke_timeout_handler, state);
1745 while (state->finished == 0) {
1746 event_loop_once(ctdb->ev);
1749 if (ctdb_ltdb_lock(ctdb_db, key) != 0) {
1750 DEBUG(DEBUG_ERR,("Failed to chainlock the database in revokechild\n"));
1754 if (ctdb_ltdb_fetch(ctdb_db, key, &new_header, state, &new_data) != 0) {
1755 ctdb_ltdb_unlock(ctdb_db, key);
1756 DEBUG(DEBUG_ERR,("Failed for fetch tdb record in revokechild\n"));
1761 if (new_header.rsn > header->rsn) {
1762 ctdb_ltdb_unlock(ctdb_db, key);
1763 DEBUG(DEBUG_ERR,("RSN too high in tdb record in revokechild\n"));
1767 if ( (new_header.flags & (CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_HAVE_DELEGATIONS)) != (CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_HAVE_DELEGATIONS) ) {
1768 ctdb_ltdb_unlock(ctdb_db, key);
1769 DEBUG(DEBUG_ERR,("Flags are wrong in tdb record in revokechild\n"));
1775 * If revoke on all nodes succeed, revoke is complete. Otherwise,
1776 * remove CTDB_REC_RO_REVOKING_READONLY flag and retry.
1778 if (state->status == 0) {
1780 new_header.flags |= CTDB_REC_RO_REVOKE_COMPLETE;
1782 DEBUG(DEBUG_NOTICE, ("Revoke all delegations failed, retrying.\n"));
1783 new_header.flags &= ~CTDB_REC_RO_REVOKING_READONLY;
1785 if (ctdb_ltdb_store(ctdb_db, key, &new_header, new_data) != 0) {
1786 ctdb_ltdb_unlock(ctdb_db, key);
1787 DEBUG(DEBUG_ERR,("Failed to write new record in revokechild\n"));
1791 ctdb_ltdb_unlock(ctdb_db, key);
1798 int ctdb_start_revoke_ro_record(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
1801 struct revokechild_handle *rc;
1802 pid_t parent = getpid();
1805 header->flags &= ~(CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY);
1806 header->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
1809 if ((rc = talloc_zero(ctdb_db, struct revokechild_handle)) == NULL) {
1810 DEBUG(DEBUG_ERR,("Failed to allocate revokechild_handle\n"));
1814 tdata = tdb_fetch(ctdb_db->rottdb, key);
1815 if (tdata.dsize > 0) {
1819 tdata.dptr = talloc_memdup(rc, tdata.dptr, tdata.dsize);
1825 rc->ctdb_db = ctdb_db;
1829 talloc_set_destructor(rc, revokechild_destructor);
1831 rc->key.dsize = key.dsize;
1832 rc->key.dptr = talloc_memdup(rc, key.dptr, key.dsize);
1833 if (rc->key.dptr == NULL) {
1834 DEBUG(DEBUG_ERR,("Failed to allocate key for revokechild_handle\n"));
1841 DEBUG(DEBUG_ERR,("Failed to allocate key for revokechild_handle\n"));
1847 rc->child = ctdb_fork(ctdb);
1848 if (rc->child == (pid_t)-1) {
1849 DEBUG(DEBUG_ERR,("Failed to fork child for revokechild\n"));
1854 if (rc->child == 0) {
1857 debug_extra = talloc_asprintf(NULL, "revokechild-%s:", ctdb_db->db_name);
1859 ctdb_set_process_name("ctdb_revokechild");
1860 if (switch_from_server_to_client(ctdb, "revokechild-%s", ctdb_db->db_name) != 0) {
1861 DEBUG(DEBUG_ERR,("Failed to switch from server to client for revokechild process\n"));
1863 goto child_finished;
1866 c = ctdb_revoke_all_delegations(ctdb, ctdb_db, tdata, key, header, data);
1869 sys_write(rc->fd[1], &c, 1);
1870 /* make sure we die when our parent dies */
1871 while (ctdb_kill(ctdb, parent, 0) == 0 || errno != ESRCH) {
1879 set_close_on_exec(rc->fd[0]);
1881 /* This is an active revokechild child process */
1882 DLIST_ADD_END(ctdb_db->revokechild_active, rc, NULL);
1884 rc->fde = event_add_fd(ctdb->ev, rc, rc->fd[0],
1885 EVENT_FD_READ, revokechild_handler,
1887 if (rc->fde == NULL) {
1888 DEBUG(DEBUG_ERR,("Failed to set up fd event for revokechild process\n"));
1891 tevent_fd_set_auto_close(rc->fde);
1896 int ctdb_add_revoke_deferred_call(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_req_header *hdr, deferred_requeue_fn fn, void *call_context)
1898 struct revokechild_handle *rc;
1899 struct revokechild_deferred_call *deferred_call;
1901 for (rc = ctdb_db->revokechild_active; rc; rc = rc->next) {
1902 if (rc->key.dsize == 0) {
1905 if (rc->key.dsize != key.dsize) {
1908 if (!memcmp(rc->key.dptr, key.dptr, key.dsize)) {
1914 DEBUG(DEBUG_ERR,("Failed to add deferred call to revoke list. revoke structure not found\n"));
1918 deferred_call = talloc(rc, struct revokechild_deferred_call);
1919 if (deferred_call == NULL) {
1920 DEBUG(DEBUG_ERR,("Failed to allocate deferred call structure for revoking record\n"));
1924 deferred_call->ctdb = ctdb;
1925 deferred_call->hdr = hdr;
1926 deferred_call->fn = fn;
1927 deferred_call->ctx = call_context;
1929 talloc_set_destructor(deferred_call, deferred_call_destructor);
1930 talloc_steal(deferred_call, hdr);