2 ctdb_call protocol code
4 Copyright (C) Andrew Tridgell 2006
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
20 see http://wiki.samba.org/index.php/Samba_%26_Clustering for
21 protocol design and packet details
24 #include "lib/tdb/include/tdb.h"
25 #include "lib/util/dlinklist.h"
26 #include "system/network.h"
27 #include "system/filesys.h"
28 #include "../include/ctdb_private.h"
29 #include "../common/rb_tree.h"
31 struct ctdb_sticky_record {
32 struct ctdb_context *ctdb;
33 struct ctdb_db_context *ctdb_db;
38 find the ctdb_db from a db index
40 struct ctdb_db_context *find_ctdb_db(struct ctdb_context *ctdb, uint32_t id)
42 struct ctdb_db_context *ctdb_db;
44 for (ctdb_db=ctdb->db_list; ctdb_db; ctdb_db=ctdb_db->next) {
45 if (ctdb_db->db_id == id) {
53 a varient of input packet that can be used in lock requeue
55 static void ctdb_call_input_pkt(void *p, struct ctdb_req_header *hdr)
57 struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
58 ctdb_input_pkt(ctdb, hdr);
65 static void ctdb_send_error(struct ctdb_context *ctdb,
66 struct ctdb_req_header *hdr, uint32_t status,
67 const char *fmt, ...) PRINTF_ATTRIBUTE(4,5);
68 static void ctdb_send_error(struct ctdb_context *ctdb,
69 struct ctdb_req_header *hdr, uint32_t status,
73 struct ctdb_reply_error *r;
77 if (ctdb->methods == NULL) {
78 DEBUG(DEBUG_INFO,(__location__ " Failed to send error. Transport is DOWN\n"));
83 msg = talloc_vasprintf(ctdb, fmt, ap);
85 ctdb_fatal(ctdb, "Unable to allocate error in ctdb_send_error\n");
89 msglen = strlen(msg)+1;
90 len = offsetof(struct ctdb_reply_error, msg);
91 r = ctdb_transport_allocate(ctdb, msg, CTDB_REPLY_ERROR, len + msglen,
92 struct ctdb_reply_error);
93 CTDB_NO_MEMORY_FATAL(ctdb, r);
95 r->hdr.destnode = hdr->srcnode;
96 r->hdr.reqid = hdr->reqid;
99 memcpy(&r->msg[0], msg, msglen);
101 ctdb_queue_packet(ctdb, &r->hdr);
108 * send a redirect reply
110 * The logic behind this function is this:
112 * A client wants to grab a record and sends a CTDB_REQ_CALL packet
113 * to its local ctdb (ctdb_request_call). If the node is not itself
114 * the record's DMASTER, it first redirects the packet to the
115 * record's LMASTER. The LMASTER then redirects the call packet to
116 * the current DMASTER. But there is a race: The record may have
117 * been migrated off the DMASTER while the redirected packet is
118 * on the wire (or in the local queue). So in case the record has
119 * migrated off the new destinaton of the call packet, instead of
120 * going back to the LMASTER to get the new DMASTER, we try to
121 * reduce round-trips by first chasing the record a couple of times
122 * before giving up the direct chase and finally going back to the
123 * LMASTER (again). Note that this works because of this: When
124 * a record is migrated off a node, then the new DMASTER is stored
125 * in the record's copy on the former DMASTER.
127 * The maximum number of attempts for direct chase to make before
128 * going back to the LMASTER is configurable by the tunable
129 * "MaxRedirectCount".
131 static void ctdb_call_send_redirect(struct ctdb_context *ctdb,
132 struct ctdb_db_context *ctdb_db,
134 struct ctdb_req_call *c,
135 struct ctdb_ltdb_header *header)
138 uint32_t lmaster = ctdb_lmaster(ctdb, &key);
140 c->hdr.destnode = lmaster;
141 if (ctdb->pnn == lmaster) {
142 c->hdr.destnode = header->dmaster;
146 if (c->hopcount%100 == 99) {
147 DEBUG(DEBUG_WARNING,("High hopcount %d dbid:0x%08x "
148 "key:0x%08x pnn:%d src:%d lmaster:%d "
149 "header->dmaster:%d dst:%d\n",
150 c->hopcount, ctdb_db->db_id, ctdb_hash(&key),
151 ctdb->pnn, c->hdr.srcnode, lmaster,
152 header->dmaster, c->hdr.destnode));
155 ctdb_queue_packet(ctdb, &c->hdr);
162 caller must have the chainlock before calling this routine. Caller must be
165 static void ctdb_send_dmaster_reply(struct ctdb_db_context *ctdb_db,
166 struct ctdb_ltdb_header *header,
167 TDB_DATA key, TDB_DATA data,
168 uint32_t new_dmaster,
171 struct ctdb_context *ctdb = ctdb_db->ctdb;
172 struct ctdb_reply_dmaster *r;
176 if (ctdb->pnn != ctdb_lmaster(ctdb, &key)) {
177 DEBUG(DEBUG_ALERT,(__location__ " Caller is not lmaster!\n"));
181 header->dmaster = new_dmaster;
182 ret = ctdb_ltdb_store(ctdb_db, key, header, data);
184 ctdb_fatal(ctdb, "ctdb_send_dmaster_reply unable to update dmaster");
188 if (ctdb->methods == NULL) {
189 ctdb_fatal(ctdb, "ctdb_send_dmaster_reply cant update dmaster since transport is down");
193 /* put the packet on a temporary context, allowing us to safely free
194 it below even if ctdb_reply_dmaster() has freed it already */
195 tmp_ctx = talloc_new(ctdb);
197 /* send the CTDB_REPLY_DMASTER */
198 len = offsetof(struct ctdb_reply_dmaster, data) + key.dsize + data.dsize + sizeof(uint32_t);
199 r = ctdb_transport_allocate(ctdb, tmp_ctx, CTDB_REPLY_DMASTER, len,
200 struct ctdb_reply_dmaster);
201 CTDB_NO_MEMORY_FATAL(ctdb, r);
203 r->hdr.destnode = new_dmaster;
204 r->hdr.reqid = reqid;
205 r->rsn = header->rsn;
206 r->keylen = key.dsize;
207 r->datalen = data.dsize;
208 r->db_id = ctdb_db->db_id;
209 memcpy(&r->data[0], key.dptr, key.dsize);
210 memcpy(&r->data[key.dsize], data.dptr, data.dsize);
211 memcpy(&r->data[key.dsize+data.dsize], &header->flags, sizeof(uint32_t));
213 ctdb_queue_packet(ctdb, &r->hdr);
215 talloc_free(tmp_ctx);
219 send a dmaster request (give another node the dmaster for a record)
221 This is always sent to the lmaster, which ensures that the lmaster
222 always knows who the dmaster is. The lmaster will then send a
223 CTDB_REPLY_DMASTER to the new dmaster
225 static void ctdb_call_send_dmaster(struct ctdb_db_context *ctdb_db,
226 struct ctdb_req_call *c,
227 struct ctdb_ltdb_header *header,
228 TDB_DATA *key, TDB_DATA *data)
230 struct ctdb_req_dmaster *r;
231 struct ctdb_context *ctdb = ctdb_db->ctdb;
233 uint32_t lmaster = ctdb_lmaster(ctdb, key);
235 if (ctdb->methods == NULL) {
236 ctdb_fatal(ctdb, "Failed ctdb_call_send_dmaster since transport is down");
240 if (data->dsize != 0) {
241 header->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
244 if (lmaster == ctdb->pnn) {
245 ctdb_send_dmaster_reply(ctdb_db, header, *key, *data,
246 c->hdr.srcnode, c->hdr.reqid);
250 len = offsetof(struct ctdb_req_dmaster, data) + key->dsize + data->dsize
252 r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_DMASTER, len,
253 struct ctdb_req_dmaster);
254 CTDB_NO_MEMORY_FATAL(ctdb, r);
255 r->hdr.destnode = lmaster;
256 r->hdr.reqid = c->hdr.reqid;
258 r->rsn = header->rsn;
259 r->dmaster = c->hdr.srcnode;
260 r->keylen = key->dsize;
261 r->datalen = data->dsize;
262 memcpy(&r->data[0], key->dptr, key->dsize);
263 memcpy(&r->data[key->dsize], data->dptr, data->dsize);
264 memcpy(&r->data[key->dsize + data->dsize], &header->flags, sizeof(uint32_t));
266 header->dmaster = c->hdr.srcnode;
267 if (ctdb_ltdb_store(ctdb_db, *key, header, *data) != 0) {
268 ctdb_fatal(ctdb, "Failed to store record in ctdb_call_send_dmaster");
271 ctdb_queue_packet(ctdb, &r->hdr);
276 static void ctdb_sticky_pindown_timeout(struct event_context *ev, struct timed_event *te,
277 struct timeval t, void *private_data)
279 struct ctdb_sticky_record *sr = talloc_get_type(private_data,
280 struct ctdb_sticky_record);
282 DEBUG(DEBUG_ERR,("Pindown timeout db:%s unstick record\n", sr->ctdb_db->db_name));
283 if (sr->pindown != NULL) {
284 talloc_free(sr->pindown);
290 ctdb_set_sticky_pindown(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key)
292 TALLOC_CTX *tmp_ctx = talloc_new(NULL);
294 struct ctdb_sticky_record *sr;
296 k = talloc_zero_size(tmp_ctx, ((key.dsize + 3) & 0xfffffffc) + 4);
298 DEBUG(DEBUG_ERR,("Failed to allocate key for sticky record\n"));
299 talloc_free(tmp_ctx);
303 k[0] = (key.dsize + 3) / 4 + 1;
304 memcpy(&k[1], key.dptr, key.dsize);
306 sr = trbt_lookuparray32(ctdb_db->sticky_records, k[0], &k[0]);
308 talloc_free(tmp_ctx);
312 talloc_free(tmp_ctx);
314 if (sr->pindown == NULL) {
315 DEBUG(DEBUG_ERR,("Pinning down record in %s for %d ms\n", ctdb_db->db_name, ctdb->tunable.sticky_pindown));
316 sr->pindown = talloc_new(sr);
317 if (sr->pindown == NULL) {
318 DEBUG(DEBUG_ERR,("Failed to allocate pindown context for sticky record\n"));
321 event_add_timed(ctdb->ev, sr->pindown, timeval_current_ofs(ctdb->tunable.sticky_pindown / 1000, (ctdb->tunable.sticky_pindown * 1000) % 1000000), ctdb_sticky_pindown_timeout, sr);
328 called when a CTDB_REPLY_DMASTER packet comes in, or when the lmaster
329 gets a CTDB_REQUEST_DMASTER for itself. We become the dmaster.
331 must be called with the chainlock held. This function releases the chainlock
333 static void ctdb_become_dmaster(struct ctdb_db_context *ctdb_db,
334 struct ctdb_req_header *hdr,
335 TDB_DATA key, TDB_DATA data,
336 uint64_t rsn, uint32_t record_flags)
338 struct ctdb_call_state *state;
339 struct ctdb_context *ctdb = ctdb_db->ctdb;
340 struct ctdb_ltdb_header header;
343 DEBUG(DEBUG_DEBUG,("pnn %u dmaster response %08x\n", ctdb->pnn, ctdb_hash(&key)));
346 header.rsn = rsn + 1;
347 header.dmaster = ctdb->pnn;
348 header.flags = record_flags;
350 state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_call_state);
353 if (state->call->flags & CTDB_CALL_FLAG_VACUUM_MIGRATION) {
355 * We temporarily add the VACUUM_MIGRATED flag to
356 * the record flags, so that ctdb_ltdb_store can
357 * decide whether the record should be stored or
360 header.flags |= CTDB_REC_FLAG_VACUUM_MIGRATED;
364 if (ctdb_ltdb_store(ctdb_db, key, &header, data) != 0) {
365 ctdb_fatal(ctdb, "ctdb_reply_dmaster store failed\n");
367 ret = ctdb_ltdb_unlock(ctdb_db, key);
369 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
374 /* we just became DMASTER and this database is "sticky",
375 see if the record is flagged as "hot" and set up a pin-down
376 context to stop migrations for a little while if so
378 if (ctdb_db->sticky) {
379 ctdb_set_sticky_pindown(ctdb, ctdb_db, key);
383 DEBUG(DEBUG_ERR,("pnn %u Invalid reqid %u in ctdb_become_dmaster from node %u\n",
384 ctdb->pnn, hdr->reqid, hdr->srcnode));
386 ret = ctdb_ltdb_unlock(ctdb_db, key);
388 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
393 if (key.dsize != state->call->key.dsize || memcmp(key.dptr, state->call->key.dptr, key.dsize)) {
394 DEBUG(DEBUG_ERR, ("Got bogus DMASTER packet reqid:%u from node %u. Key does not match key held in matching idr.\n", hdr->reqid, hdr->srcnode));
396 ret = ctdb_ltdb_unlock(ctdb_db, key);
398 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
403 if (hdr->reqid != state->reqid) {
404 /* we found a record but it was the wrong one */
405 DEBUG(DEBUG_ERR, ("Dropped orphan in ctdb_become_dmaster with reqid:%u\n from node %u", hdr->reqid, hdr->srcnode));
407 ret = ctdb_ltdb_unlock(ctdb_db, key);
409 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
414 ctdb_call_local(ctdb_db, state->call, &header, state, &data, true, ctdb->pnn);
416 ret = ctdb_ltdb_unlock(ctdb_db, state->call->key);
418 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
421 state->state = CTDB_CALL_DONE;
422 if (state->async.fn) {
423 state->async.fn(state);
430 called when a CTDB_REQ_DMASTER packet comes in
432 this comes into the lmaster for a record when the current dmaster
433 wants to give up the dmaster role and give it to someone else
435 void ctdb_request_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
437 struct ctdb_req_dmaster *c = (struct ctdb_req_dmaster *)hdr;
438 TDB_DATA key, data, data2;
439 struct ctdb_ltdb_header header;
440 struct ctdb_db_context *ctdb_db;
441 uint32_t record_flags = 0;
446 key.dsize = c->keylen;
447 data.dptr = c->data + c->keylen;
448 data.dsize = c->datalen;
449 len = offsetof(struct ctdb_req_dmaster, data) + key.dsize + data.dsize
451 if (len <= c->hdr.length) {
452 record_flags = *(uint32_t *)&c->data[c->keylen + c->datalen];
455 ctdb_db = find_ctdb_db(ctdb, c->db_id);
457 ctdb_send_error(ctdb, hdr, -1,
458 "Unknown database in request. db_id==0x%08x",
463 /* fetch the current record */
464 ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header, hdr, &data2,
465 ctdb_call_input_pkt, ctdb, False);
467 ctdb_fatal(ctdb, "ctdb_req_dmaster failed to fetch record");
471 DEBUG(DEBUG_INFO,(__location__ " deferring ctdb_request_dmaster\n"));
475 if (ctdb_lmaster(ctdb, &key) != ctdb->pnn) {
476 DEBUG(DEBUG_ALERT,("pnn %u dmaster request to non-lmaster lmaster=%u gen=%u curgen=%u\n",
477 ctdb->pnn, ctdb_lmaster(ctdb, &key),
478 hdr->generation, ctdb->vnn_map->generation));
479 ctdb_fatal(ctdb, "ctdb_req_dmaster to non-lmaster");
482 DEBUG(DEBUG_DEBUG,("pnn %u dmaster request on %08x for %u from %u\n",
483 ctdb->pnn, ctdb_hash(&key), c->dmaster, c->hdr.srcnode));
485 /* its a protocol error if the sending node is not the current dmaster */
486 if (header.dmaster != hdr->srcnode) {
487 DEBUG(DEBUG_ALERT,("pnn %u dmaster request for new-dmaster %u from non-master %u real-dmaster=%u key %08x dbid 0x%08x gen=%u curgen=%u c->rsn=%llu header.rsn=%llu reqid=%u keyval=0x%08x\n",
488 ctdb->pnn, c->dmaster, hdr->srcnode, header.dmaster, ctdb_hash(&key),
489 ctdb_db->db_id, hdr->generation, ctdb->vnn_map->generation,
490 (unsigned long long)c->rsn, (unsigned long long)header.rsn, c->hdr.reqid,
491 (key.dsize >= 4)?(*(uint32_t *)key.dptr):0));
492 if (header.rsn != 0 || header.dmaster != ctdb->pnn) {
493 DEBUG(DEBUG_ERR,("ctdb_req_dmaster from non-master. Force a recovery.\n"));
495 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
496 ctdb_ltdb_unlock(ctdb_db, key);
501 if (header.rsn > c->rsn) {
502 DEBUG(DEBUG_ALERT,("pnn %u dmaster request with older RSN new-dmaster %u from %u real-dmaster=%u key %08x dbid 0x%08x gen=%u curgen=%u c->rsn=%llu header.rsn=%llu reqid=%u\n",
503 ctdb->pnn, c->dmaster, hdr->srcnode, header.dmaster, ctdb_hash(&key),
504 ctdb_db->db_id, hdr->generation, ctdb->vnn_map->generation,
505 (unsigned long long)c->rsn, (unsigned long long)header.rsn, c->hdr.reqid));
508 /* use the rsn from the sending node */
511 /* store the record flags from the sending node */
512 header.flags = record_flags;
514 /* check if the new dmaster is the lmaster, in which case we
515 skip the dmaster reply */
516 if (c->dmaster == ctdb->pnn) {
517 ctdb_become_dmaster(ctdb_db, hdr, key, data, c->rsn, record_flags);
519 ctdb_send_dmaster_reply(ctdb_db, &header, key, data, c->dmaster, hdr->reqid);
521 ret = ctdb_ltdb_unlock(ctdb_db, key);
523 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
528 static void ctdb_sticky_record_timeout(struct event_context *ev, struct timed_event *te,
529 struct timeval t, void *private_data)
531 struct ctdb_sticky_record *sr = talloc_get_type(private_data,
532 struct ctdb_sticky_record);
536 static void *ctdb_make_sticky_record_callback(void *parm, void *data)
539 DEBUG(DEBUG_ERR,("Already have sticky record registered. Free old %p and create new %p\n", data, parm));
546 ctdb_make_record_sticky(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key)
548 TALLOC_CTX *tmp_ctx = talloc_new(NULL);
550 struct ctdb_sticky_record *sr;
552 k = talloc_zero_size(tmp_ctx, ((key.dsize + 3) & 0xfffffffc) + 4);
554 DEBUG(DEBUG_ERR,("Failed to allocate key for sticky record\n"));
555 talloc_free(tmp_ctx);
559 k[0] = (key.dsize + 3) / 4 + 1;
560 memcpy(&k[1], key.dptr, key.dsize);
562 sr = trbt_lookuparray32(ctdb_db->sticky_records, k[0], &k[0]);
564 talloc_free(tmp_ctx);
568 sr = talloc(ctdb_db->sticky_records, struct ctdb_sticky_record);
570 talloc_free(tmp_ctx);
571 DEBUG(DEBUG_ERR,("Failed to allocate sticky record structure\n"));
576 sr->ctdb_db = ctdb_db;
579 DEBUG(DEBUG_ERR,("Make record sticky in db %s\n", ctdb_db->db_name));
581 trbt_insertarray32_callback(ctdb_db->sticky_records, k[0], &k[0], ctdb_make_sticky_record_callback, sr);
583 event_add_timed(ctdb->ev, sr, timeval_current_ofs(ctdb->tunable.sticky_duration, 0), ctdb_sticky_record_timeout, sr);
585 talloc_free(tmp_ctx);
589 struct pinned_down_requeue_handle {
590 struct ctdb_context *ctdb;
591 struct ctdb_req_header *hdr;
594 struct pinned_down_deferred_call {
595 struct ctdb_context *ctdb;
596 struct ctdb_req_header *hdr;
599 static void pinned_down_requeue(struct event_context *ev, struct timed_event *te,
600 struct timeval t, void *private_data)
602 struct pinned_down_requeue_handle *handle = talloc_get_type(private_data, struct pinned_down_requeue_handle);
603 struct ctdb_context *ctdb = handle->ctdb;
605 talloc_steal(ctdb, handle->hdr);
606 ctdb_call_input_pkt(ctdb, handle->hdr);
611 static int pinned_down_destructor(struct pinned_down_deferred_call *pinned_down)
613 struct ctdb_context *ctdb = pinned_down->ctdb;
614 struct pinned_down_requeue_handle *handle = talloc(ctdb, struct pinned_down_requeue_handle);
616 handle->ctdb = pinned_down->ctdb;
617 handle->hdr = pinned_down->hdr;
618 talloc_steal(handle, handle->hdr);
620 event_add_timed(ctdb->ev, handle, timeval_zero(), pinned_down_requeue, handle);
626 ctdb_defer_pinned_down_request(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_req_header *hdr)
628 TALLOC_CTX *tmp_ctx = talloc_new(NULL);
630 struct ctdb_sticky_record *sr;
631 struct pinned_down_deferred_call *pinned_down;
633 k = talloc_zero_size(tmp_ctx, ((key.dsize + 3) & 0xfffffffc) + 4);
635 DEBUG(DEBUG_ERR,("Failed to allocate key for sticky record\n"));
636 talloc_free(tmp_ctx);
640 k[0] = (key.dsize + 3) / 4 + 1;
641 memcpy(&k[1], key.dptr, key.dsize);
643 sr = trbt_lookuparray32(ctdb_db->sticky_records, k[0], &k[0]);
645 talloc_free(tmp_ctx);
649 talloc_free(tmp_ctx);
651 if (sr->pindown == NULL) {
655 pinned_down = talloc(sr->pindown, struct pinned_down_deferred_call);
656 if (pinned_down == NULL) {
657 DEBUG(DEBUG_ERR,("Failed to allocate structure for deferred pinned down request\n"));
661 pinned_down->ctdb = ctdb;
662 pinned_down->hdr = hdr;
664 talloc_set_destructor(pinned_down, pinned_down_destructor);
665 talloc_steal(pinned_down, hdr);
671 called when a CTDB_REQ_CALL packet comes in
673 void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
675 struct ctdb_req_call *c = (struct ctdb_req_call *)hdr;
677 struct ctdb_reply_call *r;
679 struct ctdb_ltdb_header header;
680 struct ctdb_call *call;
681 struct ctdb_db_context *ctdb_db;
682 int tmp_count, bucket;
684 if (ctdb->methods == NULL) {
685 DEBUG(DEBUG_INFO,(__location__ " Failed ctdb_request_call. Transport is DOWN\n"));
690 ctdb_db = find_ctdb_db(ctdb, c->db_id);
692 ctdb_send_error(ctdb, hdr, -1,
693 "Unknown database in request. db_id==0x%08x",
698 call = talloc(hdr, struct ctdb_call);
699 CTDB_NO_MEMORY_FATAL(ctdb, call);
701 call->call_id = c->callid;
702 call->key.dptr = c->data;
703 call->key.dsize = c->keylen;
704 call->call_data.dptr = c->data + c->keylen;
705 call->call_data.dsize = c->calldatalen;
706 call->reply_data.dptr = NULL;
707 call->reply_data.dsize = 0;
710 /* If this record is pinned down we should defer the
711 request until the pindown times out
713 if (ctdb_db->sticky) {
714 if (ctdb_defer_pinned_down_request(ctdb, ctdb_db, call->key, hdr) == 0) {
715 DEBUG(DEBUG_WARNING,("Defer request for pinned down record in %s\n", ctdb_db->db_name));
721 /* determine if we are the dmaster for this key. This also
722 fetches the record data (if any), thus avoiding a 2nd fetch of the data
723 if the call will be answered locally */
725 ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, call->key, &header, hdr, &data,
726 ctdb_call_input_pkt, ctdb, False);
728 ctdb_send_error(ctdb, hdr, ret, "ltdb fetch failed in ctdb_request_call");
732 DEBUG(DEBUG_INFO,(__location__ " deferred ctdb_request_call\n"));
736 /* Dont do READONLY if we dont have a tracking database */
737 if ((c->flags & CTDB_WANT_READONLY) && !ctdb_db->readonly) {
738 c->flags &= ~CTDB_WANT_READONLY;
741 if (header.flags & CTDB_REC_RO_REVOKE_COMPLETE) {
742 header.flags &= ~(CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY|CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_REVOKE_COMPLETE);
743 CTDB_INCREMENT_STAT(ctdb, total_ro_revokes);
744 CTDB_INCREMENT_DB_STAT(ctdb_db, db_ro_revokes);
745 if (ctdb_ltdb_store(ctdb_db, call->key, &header, data) != 0) {
746 ctdb_fatal(ctdb, "Failed to write header with cleared REVOKE flag");
748 /* and clear out the tracking data */
749 if (tdb_delete(ctdb_db->rottdb, call->key) != 0) {
750 DEBUG(DEBUG_ERR,(__location__ " Failed to clear out trackingdb record\n"));
754 /* if we are revoking, we must defer all other calls until the revoke
757 if (header.flags & CTDB_REC_RO_REVOKING_READONLY) {
758 talloc_free(data.dptr);
759 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
761 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, call->key, hdr, ctdb_call_input_pkt, ctdb) != 0) {
762 ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
768 /* if we are not the dmaster and are not hosting any delegations,
769 then send a redirect to the requesting node */
770 if ((header.dmaster != ctdb->pnn)
771 && (!(header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) ) {
772 talloc_free(data.dptr);
773 ctdb_call_send_redirect(ctdb, ctdb_db, call->key, c, &header);
775 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
777 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
782 if ( (!(c->flags & CTDB_WANT_READONLY))
783 && (header.flags & (CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY)) ) {
784 header.flags |= CTDB_REC_RO_REVOKING_READONLY;
785 if (ctdb_ltdb_store(ctdb_db, call->key, &header, data) != 0) {
786 ctdb_fatal(ctdb, "Failed to store record with HAVE_DELEGATIONS set");
788 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
790 if (ctdb_start_revoke_ro_record(ctdb, ctdb_db, call->key, &header, data) != 0) {
791 ctdb_fatal(ctdb, "Failed to start record revoke");
793 talloc_free(data.dptr);
795 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, call->key, hdr, ctdb_call_input_pkt, ctdb) != 0) {
796 ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
803 /* If this is the first request for delegation. bump rsn and set
804 * the delegations flag
806 if ((c->flags & CTDB_WANT_READONLY)
807 && (c->callid == CTDB_FETCH_WITH_HEADER_FUNC)
808 && (!(header.flags & CTDB_REC_RO_HAVE_DELEGATIONS))) {
810 header.flags |= CTDB_REC_RO_HAVE_DELEGATIONS;
811 if (ctdb_ltdb_store(ctdb_db, call->key, &header, data) != 0) {
812 ctdb_fatal(ctdb, "Failed to store record with HAVE_DELEGATIONS set");
815 if ((c->flags & CTDB_WANT_READONLY)
816 && (call->call_id == CTDB_FETCH_WITH_HEADER_FUNC)) {
819 tdata = tdb_fetch(ctdb_db->rottdb, call->key);
820 if (ctdb_trackingdb_add_pnn(ctdb, &tdata, c->hdr.srcnode) != 0) {
821 ctdb_fatal(ctdb, "Failed to add node to trackingdb");
823 if (tdb_store(ctdb_db->rottdb, call->key, tdata, TDB_REPLACE) != 0) {
824 ctdb_fatal(ctdb, "Failed to store trackingdb data");
828 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
830 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
833 len = offsetof(struct ctdb_reply_call, data) + data.dsize + sizeof(struct ctdb_ltdb_header);
834 r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REPLY_CALL, len,
835 struct ctdb_reply_call);
836 CTDB_NO_MEMORY_FATAL(ctdb, r);
837 r->hdr.destnode = c->hdr.srcnode;
838 r->hdr.reqid = c->hdr.reqid;
840 r->datalen = data.dsize + sizeof(struct ctdb_ltdb_header);
842 header.flags |= CTDB_REC_RO_HAVE_READONLY;
843 header.flags &= ~CTDB_REC_RO_HAVE_DELEGATIONS;
844 memcpy(&r->data[0], &header, sizeof(struct ctdb_ltdb_header));
847 memcpy(&r->data[sizeof(struct ctdb_ltdb_header)], data.dptr, data.dsize);
850 ctdb_queue_packet(ctdb, &r->hdr);
851 CTDB_INCREMENT_STAT(ctdb, total_ro_delegations);
852 CTDB_INCREMENT_DB_STAT(ctdb_db, db_ro_delegations);
858 CTDB_UPDATE_STAT(ctdb, max_hop_count, c->hopcount);
859 tmp_count = c->hopcount;
865 if (bucket >= MAX_COUNT_BUCKETS) {
866 bucket = MAX_COUNT_BUCKETS - 1;
868 CTDB_INCREMENT_STAT(ctdb, hop_count_bucket[bucket]);
869 CTDB_INCREMENT_DB_STAT(ctdb_db, hop_count_bucket[bucket]);
872 /* If this database supports sticky records, then check if the
873 hopcount is big. If it is it means the record is hot and we
874 should make it sticky.
876 if (ctdb_db->sticky && c->hopcount >= ctdb->tunable.hopcount_make_sticky) {
877 DEBUG(DEBUG_ERR, ("Hot record in database %s. Hopcount is %d. Make record sticky for %d seconds\n", ctdb_db->db_name, c->hopcount, ctdb->tunable.sticky_duration));
878 ctdb_make_record_sticky(ctdb, ctdb_db, call->key);
882 /* if this nodes has done enough consecutive calls on the same record
883 then give them the record
884 or if the node requested an immediate migration
886 if ( c->hdr.srcnode != ctdb->pnn &&
887 ((header.laccessor == c->hdr.srcnode
888 && header.lacount >= ctdb->tunable.max_lacount
889 && ctdb->tunable.max_lacount != 0)
890 || (c->flags & CTDB_IMMEDIATE_MIGRATION)) ) {
891 if (ctdb_db->transaction_active) {
892 DEBUG(DEBUG_INFO, (__location__ " refusing migration"
893 " of key %s while transaction is active\n",
894 (char *)call->key.dptr));
896 DEBUG(DEBUG_DEBUG,("pnn %u starting migration of %08x to %u\n",
897 ctdb->pnn, ctdb_hash(&(call->key)), c->hdr.srcnode));
898 ctdb_call_send_dmaster(ctdb_db, c, &header, &(call->key), &data);
899 talloc_free(data.dptr);
901 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
903 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
909 ret = ctdb_call_local(ctdb_db, call, &header, hdr, &data, true, c->hdr.srcnode);
911 DEBUG(DEBUG_ERR,(__location__ " ctdb_call_local failed\n"));
915 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
917 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
920 len = offsetof(struct ctdb_reply_call, data) + call->reply_data.dsize;
921 r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REPLY_CALL, len,
922 struct ctdb_reply_call);
923 CTDB_NO_MEMORY_FATAL(ctdb, r);
924 r->hdr.destnode = hdr->srcnode;
925 r->hdr.reqid = hdr->reqid;
926 r->status = call->status;
927 r->datalen = call->reply_data.dsize;
928 if (call->reply_data.dsize) {
929 memcpy(&r->data[0], call->reply_data.dptr, call->reply_data.dsize);
932 ctdb_queue_packet(ctdb, &r->hdr);
938 called when a CTDB_REPLY_CALL packet comes in
940 This packet comes in response to a CTDB_REQ_CALL request packet. It
941 contains any reply data from the call
943 void ctdb_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
945 struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr;
946 struct ctdb_call_state *state;
948 state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_call_state);
950 DEBUG(DEBUG_ERR, (__location__ " reqid %u not found\n", hdr->reqid));
954 if (hdr->reqid != state->reqid) {
955 /* we found a record but it was the wrong one */
956 DEBUG(DEBUG_ERR, ("Dropped orphaned call reply with reqid:%u\n",hdr->reqid));
961 /* read only delegation processing */
962 /* If we got a FETCH_WITH_HEADER we should check if this is a ro
963 * delegation since we may need to update the record header
965 if (state->c->callid == CTDB_FETCH_WITH_HEADER_FUNC) {
966 struct ctdb_db_context *ctdb_db = state->ctdb_db;
967 struct ctdb_ltdb_header *header = (struct ctdb_ltdb_header *)&c->data[0];
968 struct ctdb_ltdb_header oldheader;
969 TDB_DATA key, data, olddata;
972 if (!(header->flags & CTDB_REC_RO_HAVE_READONLY)) {
977 key.dsize = state->c->keylen;
978 key.dptr = state->c->data;
979 ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr,
980 ctdb_call_input_pkt, ctdb, False);
985 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock in ctdb_reply_call\n"));
989 ret = ctdb_ltdb_fetch(ctdb_db, key, &oldheader, state, &olddata);
991 DEBUG(DEBUG_ERR, ("Failed to fetch old record in ctdb_reply_call\n"));
992 ctdb_ltdb_unlock(ctdb_db, key);
996 if (header->rsn <= oldheader.rsn) {
997 ctdb_ltdb_unlock(ctdb_db, key);
1001 if (c->datalen < sizeof(struct ctdb_ltdb_header)) {
1002 DEBUG(DEBUG_ERR,(__location__ " Got FETCH_WITH_HEADER reply with too little data: %d bytes\n", c->datalen));
1003 ctdb_ltdb_unlock(ctdb_db, key);
1007 data.dsize = c->datalen - sizeof(struct ctdb_ltdb_header);
1008 data.dptr = &c->data[sizeof(struct ctdb_ltdb_header)];
1009 ret = ctdb_ltdb_store(ctdb_db, key, header, data);
1011 DEBUG(DEBUG_ERR, ("Failed to store new record in ctdb_reply_call\n"));
1012 ctdb_ltdb_unlock(ctdb_db, key);
1016 ctdb_ltdb_unlock(ctdb_db, key);
1020 state->call->reply_data.dptr = c->data;
1021 state->call->reply_data.dsize = c->datalen;
1022 state->call->status = c->status;
1024 talloc_steal(state, c);
1026 state->state = CTDB_CALL_DONE;
1027 if (state->async.fn) {
1028 state->async.fn(state);
1034 called when a CTDB_REPLY_DMASTER packet comes in
1036 This packet comes in from the lmaster response to a CTDB_REQ_CALL
1037 request packet. It means that the current dmaster wants to give us
1040 void ctdb_reply_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
1042 struct ctdb_reply_dmaster *c = (struct ctdb_reply_dmaster *)hdr;
1043 struct ctdb_db_context *ctdb_db;
1045 uint32_t record_flags = 0;
1049 ctdb_db = find_ctdb_db(ctdb, c->db_id);
1050 if (ctdb_db == NULL) {
1051 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_reply_dmaster\n", c->db_id));
1056 key.dsize = c->keylen;
1057 data.dptr = &c->data[key.dsize];
1058 data.dsize = c->datalen;
1059 len = offsetof(struct ctdb_reply_dmaster, data) + key.dsize + data.dsize
1061 if (len <= c->hdr.length) {
1062 record_flags = *(uint32_t *)&c->data[c->keylen + c->datalen];
1065 ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr,
1066 ctdb_call_input_pkt, ctdb, False);
1071 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock in ctdb_reply_dmaster\n"));
1075 ctdb_become_dmaster(ctdb_db, hdr, key, data, c->rsn, record_flags);
1080 called when a CTDB_REPLY_ERROR packet comes in
1082 void ctdb_reply_error(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
1084 struct ctdb_reply_error *c = (struct ctdb_reply_error *)hdr;
1085 struct ctdb_call_state *state;
1087 state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_call_state);
1088 if (state == NULL) {
1089 DEBUG(DEBUG_ERR,("pnn %u Invalid reqid %u in ctdb_reply_error\n",
1090 ctdb->pnn, hdr->reqid));
1094 if (hdr->reqid != state->reqid) {
1095 /* we found a record but it was the wrong one */
1096 DEBUG(DEBUG_ERR, ("Dropped orphaned error reply with reqid:%u\n",hdr->reqid));
1100 talloc_steal(state, c);
1102 state->state = CTDB_CALL_ERROR;
1103 state->errmsg = (char *)c->msg;
1104 if (state->async.fn) {
1105 state->async.fn(state);
1113 static int ctdb_call_destructor(struct ctdb_call_state *state)
1115 DLIST_REMOVE(state->ctdb_db->ctdb->pending_calls, state);
1116 ctdb_reqid_remove(state->ctdb_db->ctdb, state->reqid);
1122 called when a ctdb_call needs to be resent after a reconfigure event
1124 static void ctdb_call_resend(struct ctdb_call_state *state)
1126 struct ctdb_context *ctdb = state->ctdb_db->ctdb;
1128 state->generation = ctdb->vnn_map->generation;
1130 /* use a new reqid, in case the old reply does eventually come in */
1131 ctdb_reqid_remove(ctdb, state->reqid);
1132 state->reqid = ctdb_reqid_new(ctdb, state);
1133 state->c->hdr.reqid = state->reqid;
1135 /* update the generation count for this request, so its valid with the new vnn_map */
1136 state->c->hdr.generation = state->generation;
1138 /* send the packet to ourselves, it will be redirected appropriately */
1139 state->c->hdr.destnode = ctdb->pnn;
1141 ctdb_queue_packet(ctdb, &state->c->hdr);
1142 DEBUG(DEBUG_NOTICE,("resent ctdb_call\n"));
1146 resend all pending calls on recovery
1148 void ctdb_call_resend_all(struct ctdb_context *ctdb)
1150 struct ctdb_call_state *state, *next;
1151 for (state=ctdb->pending_calls;state;state=next) {
1153 ctdb_call_resend(state);
1158 this allows the caller to setup a async.fn
1160 static void call_local_trigger(struct event_context *ev, struct timed_event *te,
1161 struct timeval t, void *private_data)
1163 struct ctdb_call_state *state = talloc_get_type(private_data, struct ctdb_call_state);
1164 if (state->async.fn) {
1165 state->async.fn(state);
1171 construct an event driven local ctdb_call
1173 this is used so that locally processed ctdb_call requests are processed
1174 in an event driven manner
1176 struct ctdb_call_state *ctdb_call_local_send(struct ctdb_db_context *ctdb_db,
1177 struct ctdb_call *call,
1178 struct ctdb_ltdb_header *header,
1181 struct ctdb_call_state *state;
1182 struct ctdb_context *ctdb = ctdb_db->ctdb;
1185 state = talloc_zero(ctdb_db, struct ctdb_call_state);
1186 CTDB_NO_MEMORY_NULL(ctdb, state);
1188 talloc_steal(state, data->dptr);
1190 state->state = CTDB_CALL_DONE;
1191 state->call = talloc(state, struct ctdb_call);
1192 CTDB_NO_MEMORY_NULL(ctdb, state->call);
1193 *(state->call) = *call;
1194 state->ctdb_db = ctdb_db;
1196 ret = ctdb_call_local(ctdb_db, state->call, header, state, data, true, ctdb->pnn);
1198 DEBUG(DEBUG_DEBUG,("ctdb_call_local() failed, ignoring return code %d\n", ret));
1201 event_add_timed(ctdb->ev, state, timeval_zero(), call_local_trigger, state);
1208 make a remote ctdb call - async send. Called in daemon context.
1210 This constructs a ctdb_call request and queues it for processing.
1211 This call never blocks.
1213 struct ctdb_call_state *ctdb_daemon_call_send_remote(struct ctdb_db_context *ctdb_db,
1214 struct ctdb_call *call,
1215 struct ctdb_ltdb_header *header)
1218 struct ctdb_call_state *state;
1219 struct ctdb_context *ctdb = ctdb_db->ctdb;
1221 if (ctdb->methods == NULL) {
1222 DEBUG(DEBUG_INFO,(__location__ " Failed send packet. Transport is down\n"));
1226 state = talloc_zero(ctdb_db, struct ctdb_call_state);
1227 CTDB_NO_MEMORY_NULL(ctdb, state);
1228 state->call = talloc(state, struct ctdb_call);
1229 CTDB_NO_MEMORY_NULL(ctdb, state->call);
1231 state->reqid = ctdb_reqid_new(ctdb, state);
1232 state->ctdb_db = ctdb_db;
1233 talloc_set_destructor(state, ctdb_call_destructor);
1235 len = offsetof(struct ctdb_req_call, data) + call->key.dsize + call->call_data.dsize;
1236 state->c = ctdb_transport_allocate(ctdb, state, CTDB_REQ_CALL, len,
1237 struct ctdb_req_call);
1238 CTDB_NO_MEMORY_NULL(ctdb, state->c);
1239 state->c->hdr.destnode = header->dmaster;
1241 /* this limits us to 16k outstanding messages - not unreasonable */
1242 state->c->hdr.reqid = state->reqid;
1243 state->c->flags = call->flags;
1244 state->c->db_id = ctdb_db->db_id;
1245 state->c->callid = call->call_id;
1246 state->c->hopcount = 0;
1247 state->c->keylen = call->key.dsize;
1248 state->c->calldatalen = call->call_data.dsize;
1249 memcpy(&state->c->data[0], call->key.dptr, call->key.dsize);
1250 memcpy(&state->c->data[call->key.dsize],
1251 call->call_data.dptr, call->call_data.dsize);
1252 *(state->call) = *call;
1253 state->call->call_data.dptr = &state->c->data[call->key.dsize];
1254 state->call->key.dptr = &state->c->data[0];
1256 state->state = CTDB_CALL_WAIT;
1257 state->generation = ctdb->vnn_map->generation;
1259 DLIST_ADD(ctdb->pending_calls, state);
1261 ctdb_queue_packet(ctdb, &state->c->hdr);
1267 make a remote ctdb call - async recv - called in daemon context
1269 This is called when the program wants to wait for a ctdb_call to complete and get the
1270 results. This call will block unless the call has already completed.
1272 int ctdb_daemon_call_recv(struct ctdb_call_state *state, struct ctdb_call *call)
1274 while (state->state < CTDB_CALL_DONE) {
1275 event_loop_once(state->ctdb_db->ctdb->ev);
1277 if (state->state != CTDB_CALL_DONE) {
1278 ctdb_set_error(state->ctdb_db->ctdb, "%s", state->errmsg);
1283 if (state->call->reply_data.dsize) {
1284 call->reply_data.dptr = talloc_memdup(call,
1285 state->call->reply_data.dptr,
1286 state->call->reply_data.dsize);
1287 call->reply_data.dsize = state->call->reply_data.dsize;
1289 call->reply_data.dptr = NULL;
1290 call->reply_data.dsize = 0;
1292 call->status = state->call->status;
1299 send a keepalive packet to the other node
1301 void ctdb_send_keepalive(struct ctdb_context *ctdb, uint32_t destnode)
1303 struct ctdb_req_keepalive *r;
1305 if (ctdb->methods == NULL) {
1306 DEBUG(DEBUG_INFO,(__location__ " Failed to send keepalive. Transport is DOWN\n"));
1310 r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_KEEPALIVE,
1311 sizeof(struct ctdb_req_keepalive),
1312 struct ctdb_req_keepalive);
1313 CTDB_NO_MEMORY_FATAL(ctdb, r);
1314 r->hdr.destnode = destnode;
1317 CTDB_INCREMENT_STAT(ctdb, keepalive_packets_sent);
1319 ctdb_queue_packet(ctdb, &r->hdr);
1326 struct revokechild_deferred_call {
1327 struct ctdb_context *ctdb;
1328 struct ctdb_req_header *hdr;
1329 deferred_requeue_fn fn;
1333 struct revokechild_handle {
1334 struct revokechild_handle *next, *prev;
1335 struct ctdb_context *ctdb;
1336 struct ctdb_db_context *ctdb_db;
1337 struct fd_event *fde;
1344 struct revokechild_requeue_handle {
1345 struct ctdb_context *ctdb;
1346 struct ctdb_req_header *hdr;
1347 deferred_requeue_fn fn;
1351 static void deferred_call_requeue(struct event_context *ev, struct timed_event *te,
1352 struct timeval t, void *private_data)
1354 struct revokechild_requeue_handle *requeue_handle = talloc_get_type(private_data, struct revokechild_requeue_handle);
1356 requeue_handle->fn(requeue_handle->ctx, requeue_handle->hdr);
1357 talloc_free(requeue_handle);
1360 static int deferred_call_destructor(struct revokechild_deferred_call *deferred_call)
1362 struct ctdb_context *ctdb = deferred_call->ctdb;
1363 struct revokechild_requeue_handle *requeue_handle = talloc(ctdb, struct revokechild_requeue_handle);
1364 struct ctdb_req_call *c = (struct ctdb_req_call *)deferred_call->hdr;
1366 requeue_handle->ctdb = ctdb;
1367 requeue_handle->hdr = deferred_call->hdr;
1368 requeue_handle->fn = deferred_call->fn;
1369 requeue_handle->ctx = deferred_call->ctx;
1370 talloc_steal(requeue_handle, requeue_handle->hdr);
1372 /* when revoking, any READONLY requests have 1 second grace to let read/write finish first */
1373 event_add_timed(ctdb->ev, requeue_handle, timeval_current_ofs(c->flags & CTDB_WANT_READONLY ? 1 : 0, 0), deferred_call_requeue, requeue_handle);
1379 static int revokechild_destructor(struct revokechild_handle *rc)
1381 if (rc->fde != NULL) {
1382 talloc_free(rc->fde);
1385 if (rc->fd[0] != -1) {
1388 if (rc->fd[1] != -1) {
1391 kill(rc->child, SIGKILL);
1393 DLIST_REMOVE(rc->ctdb_db->revokechild_active, rc);
1397 static void revokechild_handler(struct event_context *ev, struct fd_event *fde,
1398 uint16_t flags, void *private_data)
1400 struct revokechild_handle *rc = talloc_get_type(private_data,
1401 struct revokechild_handle);
1405 ret = read(rc->fd[0], &c, 1);
1407 DEBUG(DEBUG_ERR,("Failed to read status from revokechild. errno:%d\n", errno));
1413 DEBUG(DEBUG_ERR,("revokechild returned failure. status:%d\n", c));
1422 struct ctdb_revoke_state {
1423 struct ctdb_db_context *ctdb_db;
1425 struct ctdb_ltdb_header *header;
1432 static void update_record_cb(struct ctdb_client_control_state *state)
1434 struct ctdb_revoke_state *revoke_state;
1438 if (state == NULL) {
1441 revoke_state = state->async.private_data;
1443 state->async.fn = NULL;
1444 ret = ctdb_control_recv(state->ctdb, state, state, NULL, &res, NULL);
1445 if ((ret != 0) || (res != 0)) {
1446 DEBUG(DEBUG_ERR,("Recv for revoke update record failed ret:%d res:%d\n", ret, res));
1447 revoke_state->status = -1;
1450 revoke_state->count--;
1451 if (revoke_state->count <= 0) {
1452 revoke_state->finished = 1;
1456 static void revoke_send_cb(struct ctdb_context *ctdb, uint32_t pnn, void *private_data)
1458 struct ctdb_revoke_state *revoke_state = private_data;
1459 struct ctdb_client_control_state *state;
1461 state = ctdb_ctrl_updaterecord_send(ctdb, revoke_state, timeval_current_ofs(5,0), pnn, revoke_state->ctdb_db, revoke_state->key, revoke_state->header, revoke_state->data);
1462 if (state == NULL) {
1463 DEBUG(DEBUG_ERR,("Failure to send update record to revoke readonly delegation\n"));
1464 revoke_state->status = -1;
1467 state->async.fn = update_record_cb;
1468 state->async.private_data = revoke_state;
1470 revoke_state->count++;
1474 static void ctdb_revoke_timeout_handler(struct event_context *ev, struct timed_event *te,
1475 struct timeval yt, void *private_data)
1477 struct ctdb_revoke_state *state = private_data;
1479 DEBUG(DEBUG_ERR,("Timed out waiting for revoke to finish\n"));
1480 state->finished = 1;
1484 static int ctdb_revoke_all_delegations(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA tdata, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
1486 struct ctdb_revoke_state *state = talloc_zero(ctdb, struct ctdb_revoke_state);
1489 state->ctdb_db = ctdb_db;
1491 state->header = header;
1494 ctdb_trackingdb_traverse(ctdb, tdata, revoke_send_cb, state);
1496 event_add_timed(ctdb->ev, state, timeval_current_ofs(5, 0), ctdb_revoke_timeout_handler, state);
1498 while (state->finished == 0) {
1499 event_loop_once(ctdb->ev);
1502 status = state->status;
1505 struct ctdb_ltdb_header new_header;
1508 if (ctdb_ltdb_lock(ctdb_db, key) != 0) {
1509 DEBUG(DEBUG_ERR,("Failed to chainlock the database in revokechild\n"));
1513 if (ctdb_ltdb_fetch(ctdb_db, key, &new_header, state, &new_data) != 0) {
1514 ctdb_ltdb_unlock(ctdb_db, key);
1515 DEBUG(DEBUG_ERR,("Failed for fetch tdb record in revokechild\n"));
1520 if (new_header.rsn > header->rsn) {
1521 ctdb_ltdb_unlock(ctdb_db, key);
1522 DEBUG(DEBUG_ERR,("RSN too high in tdb record in revokechild\n"));
1526 if ( (new_header.flags & (CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_HAVE_DELEGATIONS)) != (CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_HAVE_DELEGATIONS) ) {
1527 ctdb_ltdb_unlock(ctdb_db, key);
1528 DEBUG(DEBUG_ERR,("Flags are wrong in tdb record in revokechild\n"));
1533 new_header.flags |= CTDB_REC_RO_REVOKE_COMPLETE;
1534 if (ctdb_ltdb_store(ctdb_db, key, &new_header, new_data) != 0) {
1535 ctdb_ltdb_unlock(ctdb_db, key);
1536 DEBUG(DEBUG_ERR,("Failed to write new record in revokechild\n"));
1540 ctdb_ltdb_unlock(ctdb_db, key);
1548 int ctdb_start_revoke_ro_record(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
1551 struct revokechild_handle *rc;
1552 pid_t parent = getpid();
1555 header->flags &= ~(CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY);
1556 header->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
1559 if ((rc = talloc_zero(ctdb_db, struct revokechild_handle)) == NULL) {
1560 DEBUG(DEBUG_ERR,("Failed to allocate revokechild_handle\n"));
1564 tdata = tdb_fetch(ctdb_db->rottdb, key);
1565 if (tdata.dsize > 0) {
1569 tdata.dptr = talloc_memdup(rc, tdata.dptr, tdata.dsize);
1575 rc->ctdb_db = ctdb_db;
1579 talloc_set_destructor(rc, revokechild_destructor);
1581 rc->key.dsize = key.dsize;
1582 rc->key.dptr = talloc_memdup(rc, key.dptr, key.dsize);
1583 if (rc->key.dptr == NULL) {
1584 DEBUG(DEBUG_ERR,("Failed to allocate key for revokechild_handle\n"));
1591 DEBUG(DEBUG_ERR,("Failed to allocate key for revokechild_handle\n"));
1597 rc->child = ctdb_fork(ctdb);
1598 if (rc->child == (pid_t)-1) {
1599 DEBUG(DEBUG_ERR,("Failed to fork child for revokechild\n"));
1604 if (rc->child == 0) {
1607 debug_extra = talloc_asprintf(NULL, "revokechild-%s:", ctdb_db->db_name);
1609 if (switch_from_server_to_client(ctdb, "revokechild-%s", ctdb_db->db_name) != 0) {
1610 DEBUG(DEBUG_ERR,("Failed to switch from server to client for revokechild process\n"));
1612 goto child_finished;
1615 c = ctdb_revoke_all_delegations(ctdb, ctdb_db, tdata, key, header, data);
1618 write(rc->fd[1], &c, 1);
1619 /* make sure we die when our parent dies */
1620 while (kill(parent, 0) == 0 || errno != ESRCH) {
1628 set_close_on_exec(rc->fd[0]);
1630 /* This is an active revokechild child process */
1631 DLIST_ADD_END(ctdb_db->revokechild_active, rc, NULL);
1633 rc->fde = event_add_fd(ctdb->ev, rc, rc->fd[0],
1634 EVENT_FD_READ, revokechild_handler,
1636 if (rc->fde == NULL) {
1637 DEBUG(DEBUG_ERR,("Failed to set up fd event for revokechild process\n"));
1640 tevent_fd_set_auto_close(rc->fde);
1645 int ctdb_add_revoke_deferred_call(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_req_header *hdr, deferred_requeue_fn fn, void *call_context)
1647 struct revokechild_handle *rc;
1648 struct revokechild_deferred_call *deferred_call;
1650 for (rc = ctdb_db->revokechild_active; rc; rc = rc->next) {
1651 if (rc->key.dsize == 0) {
1654 if (rc->key.dsize != key.dsize) {
1657 if (!memcmp(rc->key.dptr, key.dptr, key.dsize)) {
1663 DEBUG(DEBUG_ERR,("Failed to add deferred call to revoke list. revoke structure not found\n"));
1667 deferred_call = talloc(rc, struct revokechild_deferred_call);
1668 if (deferred_call == NULL) {
1669 DEBUG(DEBUG_ERR,("Failed to allocate deferred call structure for revoking record\n"));
1673 deferred_call->ctdb = ctdb;
1674 deferred_call->hdr = hdr;
1675 deferred_call->fn = fn;
1676 deferred_call->ctx = call_context;
1678 talloc_set_destructor(deferred_call, deferred_call_destructor);
1679 talloc_steal(deferred_call, hdr);