struct ctdb_marshall_buffer *reply;
struct ctdb_rec_data *rec;
int i;
+ int32_t transaction_active = 0;
TALLOC_CTX *tmp_ctx = talloc_new(recdb);
ret = ctdb_ctrl_pulldb(ctdb, srcnode, dbid, CTDB_LMASTER_ANY, tmp_ctx,
}
rec = (struct ctdb_rec_data *)&reply->data[0];
+
+ if (persistent) {
+ transaction_active = ctdb_ctrl_transaction_active(ctdb, srcnode,
+ dbid);
+ if (transaction_active == -1) {
+ DEBUG(DEBUG_ERR, (__location__ " error calling "
+ "ctdb_ctrl_transaction_active to node"
+ " %u\n", srcnode));
+ talloc_free(tmp_ctx);
+ return -1;
+ }
+ }
for (i=0;
i<reply->count;
}
header = *(struct ctdb_ltdb_header *)existing.dptr;
free(existing.dptr);
- if (!(header.rsn < hdr->rsn ||
- (header.dmaster != ctdb->recovery_master && header.rsn == hdr->rsn))) {
- continue;
+ if (!persistent) {
+ if (!(header.rsn < hdr->rsn ||
+ (header.dmaster != ctdb->recovery_master && header.rsn == hdr->rsn)))
+ {
+ continue;
+ }
+ } else {
+ if (header.lacount == (uint32_t)-1) {
+ /*
+ * skip record if the stored copy came
+ * from a node with active transaction
+ */
+ continue;
+ }
+
+ if ((header.rsn >= hdr->rsn) &&
+ !transaction_active)
+ {
+ continue;
+ }
}
}
-
+
+ if (persistent) {
+ /*
+ * Misuse the lacount field to signal
+ * that we got the record from a node
+ * that has a transaction running.
+ */
+ if (transaction_active) {
+ hdr->lacount = (uint32_t)-1;
+ } else {
+ hdr->lacount = 0;
+ }
+ }
+
if (tdb_store(recdb->tdb, key, data, TDB_REPLACE) != 0) {
DEBUG(DEBUG_CRIT,(__location__ " Failed to store record\n"));
talloc_free(tmp_ctx);
hdr = (struct ctdb_ltdb_header *)data.dptr;
if (!params->persistent) {
hdr->dmaster = params->ctdb->pnn;
+ } else {
+ /*
+ * Clear the lacount field that had been misused
+ * when pulling the db in order to keep track of
+ * whether the node had a transaction running.
+ */
+ hdr->lacount = 0;
}
/* add the record to the blob ready to send to the nodes */