4 Copyright (C) Andrew Tridgell 2007
5 Copyright (C) Ronnie Sahlberg 2007
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 3 of the License, or
10 (at your option) any later version.
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
17 You should have received a copy of the GNU General Public License
18 along with this program; if not, see <http://www.gnu.org/licenses/>.
22 #include "lib/events/events.h"
23 #include "system/filesys.h"
24 #include "system/wait.h"
26 #include "lib/tdb/include/tdb.h"
27 #include "../include/ctdb_private.h"
29 struct ctdb_persistent_state {
30 struct ctdb_context *ctdb;
31 struct ctdb_req_control *c;
38 called when a node has acknowledged a ctdb_control_update_record call
40 static void ctdb_persistent_callback(struct ctdb_context *ctdb,
41 int32_t status, TDB_DATA data,
45 struct ctdb_persistent_state *state = talloc_get_type(private_data,
46 struct ctdb_persistent_state);
49 DEBUG(DEBUG_ERR,("ctdb_persistent_callback failed with status %d (%s)\n",
51 state->status = status;
52 state->errormsg = errormsg;
55 if (state->num_pending == 0) {
56 ctdb_request_control_reply(state->ctdb, state->c, NULL, state->status, state->errormsg);
62 called if persistent store times out
64 static void ctdb_persistent_store_timeout(struct event_context *ev, struct timed_event *te,
65 struct timeval t, void *private_data)
67 struct ctdb_persistent_state *state = talloc_get_type(private_data, struct ctdb_persistent_state);
69 ctdb_request_control_reply(state->ctdb, state->c, NULL, -1, "timeout in ctdb_persistent_state");
75 store a set of persistent records - called from a ctdb client when it has updated
76 some records in a persistent database. The client will have the record
77 locked for the duration of this call. The client is the dmaster when
80 int32_t ctdb_control_trans2_commit(struct ctdb_context *ctdb,
81 struct ctdb_req_control *c,
82 TDB_DATA recdata, bool *async_reply)
84 struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
85 struct ctdb_persistent_state *state;
89 DEBUG(DEBUG_ERR,(__location__ " can not match persistent_store to a client. Returning error\n"));
93 /* handling num_persistent_updates is a bit strange -
95 1) very old clients, which never called CTDB_CONTROL_START_PERSISTENT_UPDATE
96 They don't expect num_persistent_updates to be used at all
98 2) less old clients, which uses CTDB_CONTROL_START_PERSISTENT_UPDATE, and expected
99 this commit to then decrement it
101 3) new clients which use TRANS2 commit functions, and
102 expect this function to increment the counter, and
103 then have it decremented in ctdb_control_trans2_error
104 or ctdb_control_trans2_finished
106 if (c->opcode == CTDB_CONTROL_PERSISTENT_STORE) {
107 if (client->num_persistent_updates > 0) {
108 client->num_persistent_updates--;
111 client->num_persistent_updates++;
114 state = talloc_zero(ctdb, struct ctdb_persistent_state);
115 CTDB_NO_MEMORY(ctdb, state);
120 for (i=0;i<ctdb->vnn_map->size;i++) {
121 struct ctdb_node *node = ctdb->nodes[ctdb->vnn_map->map[i]];
124 /* only send to active nodes */
125 if (node->flags & NODE_FLAGS_INACTIVE) {
129 /* don't send to ourselves */
130 if (node->pnn == ctdb->pnn) {
134 ret = ctdb_daemon_send_control(ctdb, node->pnn, 0, CTDB_CONTROL_UPDATE_RECORD,
135 c->client_id, 0, recdata,
136 ctdb_persistent_callback, state);
138 DEBUG(DEBUG_ERR,("Unable to send CTDB_CONTROL_UPDATE_RECORD to pnn %u\n", node->pnn));
143 state->num_pending++;
146 if (state->num_pending == 0) {
151 /* we need to wait for the replies */
154 /* need to keep the control structure around */
155 talloc_steal(state, c);
157 /* but we won't wait forever */
158 event_add_timed(ctdb->ev, state,
159 timeval_current_ofs(ctdb->tunable.control_timeout, 0),
160 ctdb_persistent_store_timeout, state);
166 struct ctdb_persistent_write_state {
167 struct ctdb_db_context *ctdb_db;
168 struct ctdb_marshall_buffer *m;
169 struct ctdb_req_control *c;
174 called from a child process to write the data
176 static int ctdb_persistent_store(struct ctdb_persistent_write_state *state)
179 struct ctdb_rec_data *rec = NULL;
180 struct ctdb_marshall_buffer *m = state->m;
182 ret = tdb_transaction_start(state->ctdb_db->ltdb->tdb);
184 DEBUG(DEBUG_ERR,("Failed to start transaction for db_id 0x%08x in ctdb_persistent_store\n",
185 state->ctdb_db->db_id));
189 for (i=0;i<m->count;i++) {
190 struct ctdb_ltdb_header oldheader;
191 struct ctdb_ltdb_header header;
194 rec = ctdb_marshall_loop_next(m, rec, NULL, &header, &key, &data);
197 DEBUG(DEBUG_ERR,("Failed to get next record %d for db_id 0x%08x in ctdb_persistent_store\n",
198 i, state->ctdb_db->db_id));
202 /* fetch the old header and ensure the rsn is less than the new rsn */
203 ret = ctdb_ltdb_fetch(state->ctdb_db, key, &oldheader, NULL, NULL);
205 DEBUG(DEBUG_ERR,("Failed to fetch old record for db_id 0x%08x in ctdb_persistent_store\n",
206 state->ctdb_db->db_id));
210 if (oldheader.rsn >= header.rsn) {
211 DEBUG(DEBUG_CRIT,("existing header for db_id 0x%08x has larger RSN %llu than new RSN %llu in ctdb_persistent_store\n",
212 state->ctdb_db->db_id,
213 (unsigned long long)oldheader.rsn, (unsigned long long)header.rsn));
217 ret = ctdb_ltdb_store(state->ctdb_db, key, &header, data);
219 DEBUG(DEBUG_CRIT,("Failed to store record for db_id 0x%08x in ctdb_persistent_store\n",
220 state->ctdb_db->db_id));
225 ret = tdb_transaction_commit(state->ctdb_db->ltdb->tdb);
227 DEBUG(DEBUG_ERR,("Failed to commit transaction for db_id 0x%08x in ctdb_persistent_store\n",
228 state->ctdb_db->db_id));
235 tdb_transaction_cancel(state->ctdb_db->ltdb->tdb);
241 called when we the child has completed the persistent write
244 static void ctdb_persistent_write_callback(int status, void *private_data)
246 struct ctdb_persistent_write_state *state = talloc_get_type(private_data,
247 struct ctdb_persistent_write_state);
250 ctdb_request_control_reply(state->ctdb_db->ctdb, state->c, NULL, status, NULL);
256 called if our lockwait child times out
258 static void ctdb_persistent_lock_timeout(struct event_context *ev, struct timed_event *te,
259 struct timeval t, void *private_data)
261 struct ctdb_persistent_write_state *state = talloc_get_type(private_data,
262 struct ctdb_persistent_write_state);
263 ctdb_request_control_reply(state->ctdb_db->ctdb, state->c, NULL, -1, "timeout in ctdb_persistent_lock");
267 struct childwrite_handle {
268 struct ctdb_context *ctdb;
269 struct ctdb_db_context *ctdb_db;
270 struct fd_event *fde;
274 void (*callback)(int, void *);
275 struct timeval start_time;
278 static int childwrite_destructor(struct childwrite_handle *h)
280 h->ctdb->statistics.pending_childwrite_calls--;
281 kill(h->child, SIGKILL);
285 /* called when the child process has finished writing the record to the
288 static void childwrite_handler(struct event_context *ev, struct fd_event *fde,
289 uint16_t flags, void *private_data)
291 struct childwrite_handle *h = talloc_get_type(private_data,
292 struct childwrite_handle);
293 void *p = h->private_data;
294 void (*callback)(int, void *) = h->callback;
295 pid_t child = h->child;
296 TALLOC_CTX *tmp_ctx = talloc_new(ev);
300 ctdb_latency(&h->ctdb->statistics.max_childwrite_latency, h->start_time);
301 h->ctdb->statistics.pending_childwrite_calls--;
303 /* the handle needs to go away when the context is gone - when
304 the handle goes away this implicitly closes the pipe, which
306 talloc_steal(tmp_ctx, h);
308 talloc_set_destructor(h, NULL);
310 ret = read(h->fd[0], &c, 1);
312 DEBUG(DEBUG_ERR, (__location__ " Read returned %d. Childwrite failed\n", ret));
318 kill(child, SIGKILL);
319 talloc_free(tmp_ctx);
322 /* this creates a child process which will take out a tdb transaction
323 and write the record to the database.
325 struct childwrite_handle *ctdb_childwrite(struct ctdb_db_context *ctdb_db,
326 void (*callback)(int, void *private_data),
327 struct ctdb_persistent_write_state *state)
329 struct childwrite_handle *result;
331 pid_t parent = getpid();
333 ctdb_db->ctdb->statistics.childwrite_calls++;
334 ctdb_db->ctdb->statistics.pending_childwrite_calls++;
336 if (!(result = talloc_zero(state, struct childwrite_handle))) {
337 ctdb_db->ctdb->statistics.pending_childwrite_calls--;
341 ret = pipe(result->fd);
345 ctdb_db->ctdb->statistics.pending_childwrite_calls--;
349 result->child = fork();
351 if (result->child == (pid_t)-1) {
352 close(result->fd[0]);
353 close(result->fd[1]);
355 ctdb_db->ctdb->statistics.pending_childwrite_calls--;
359 result->callback = callback;
360 result->private_data = state;
361 result->ctdb = ctdb_db->ctdb;
362 result->ctdb_db = ctdb_db;
364 if (result->child == 0) {
367 close(result->fd[0]);
368 ret = ctdb_persistent_store(state);
370 DEBUG(DEBUG_ERR, (__location__ " Failed to write persistent data\n"));
374 write(result->fd[1], &c, 1);
376 /* make sure we die when our parent dies */
377 while (kill(parent, 0) == 0 || errno != ESRCH) {
383 close(result->fd[1]);
384 talloc_set_destructor(result, childwrite_destructor);
386 result->fde = event_add_fd(ctdb_db->ctdb->ev, result, result->fd[0],
387 EVENT_FD_READ|EVENT_FD_AUTOCLOSE, childwrite_handler,
389 if (result->fde == NULL) {
391 ctdb_db->ctdb->statistics.pending_childwrite_calls--;
395 result->start_time = timeval_current();
401 update a record on this node if the new record has a higher rsn than the
404 int32_t ctdb_control_update_record(struct ctdb_context *ctdb,
405 struct ctdb_req_control *c, TDB_DATA recdata,
408 struct ctdb_db_context *ctdb_db;
409 struct ctdb_persistent_write_state *state;
410 struct childwrite_handle *handle;
411 struct ctdb_marshall_buffer *m = (struct ctdb_marshall_buffer *)recdata.dptr;
413 if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
414 DEBUG(DEBUG_INFO,("rejecting ctdb_control_update_record when recovery active\n"));
418 ctdb_db = find_ctdb_db(ctdb, m->db_id);
419 if (ctdb_db == NULL) {
420 DEBUG(DEBUG_ERR,("Unknown database 0x%08x in ctdb_control_update_record\n", m->db_id));
424 state = talloc(ctdb, struct ctdb_persistent_write_state);
425 CTDB_NO_MEMORY(ctdb, state);
427 state->ctdb_db = ctdb_db;
431 /* create a child process to take out a transaction and
434 handle = ctdb_childwrite(ctdb_db, ctdb_persistent_write_callback, state);
435 if (handle == NULL) {
436 DEBUG(DEBUG_ERR,("Failed to setup childwrite handler in ctdb_control_update_record\n"));
441 /* we need to wait for the replies */
444 /* need to keep the control structure around */
445 talloc_steal(state, c);
447 /* but we won't wait forever */
448 event_add_timed(ctdb->ev, state, timeval_current_ofs(ctdb->tunable.control_timeout, 0),
449 ctdb_persistent_lock_timeout, state);
456 called when a client has finished a local commit in a transaction to
457 a persistent database
459 int32_t ctdb_control_trans2_finished(struct ctdb_context *ctdb,
460 struct ctdb_req_control *c)
462 struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
464 if (client->num_persistent_updates == 0) {
465 DEBUG(DEBUG_ERR, (__location__ " ERROR: num_persistent_updates == 0\n"));
468 client->num_persistent_updates--;
474 called when a client gets an error committing its database
475 during a transaction commit
477 int32_t ctdb_control_trans2_error(struct ctdb_context *ctdb,
478 struct ctdb_req_control *c)
480 struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
482 if (client->num_persistent_updates == 0) {
483 DEBUG(DEBUG_ERR, (__location__ " ERROR: num_persistent_updates == 0\n"));
486 client->num_persistent_updates--;
488 DEBUG(DEBUG_ERR,(__location__ " Forcing recovery\n"));
489 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
496 backwards compatibility:
498 start a persistent store operation. passing both the key, header and
499 data to the daemon. If the client disconnects before it has issued
500 a persistent_update call to the daemon we trigger a full recovery
501 to ensure the databases are brought back in sync.
502 for now we ignore the recdata that the client has passed to us.
504 int32_t ctdb_control_start_persistent_update(struct ctdb_context *ctdb,
505 struct ctdb_req_control *c,
508 struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
510 if (client == NULL) {
511 DEBUG(DEBUG_ERR,(__location__ " can not match start_persistent_update to a client. Returning error\n"));
515 client->num_persistent_updates++;
521 backwards compatibility:
523 called to tell ctdbd that it is no longer doing a persistent update
525 int32_t ctdb_control_cancel_persistent_update(struct ctdb_context *ctdb,
526 struct ctdb_req_control *c,
529 struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
531 if (client == NULL) {
532 DEBUG(DEBUG_ERR,(__location__ " can not match cancel_persistent_update to a client. Returning error\n"));
536 if (client->num_persistent_updates > 0) {
537 client->num_persistent_updates--;
545 backwards compatibility:
547 single record varient of ctdb_control_trans2_commit for older clients
549 int32_t ctdb_control_persistent_store(struct ctdb_context *ctdb,
550 struct ctdb_req_control *c,
551 TDB_DATA recdata, bool *async_reply)
553 struct ctdb_marshall_buffer *m;
554 struct ctdb_rec_data *rec = (struct ctdb_rec_data *)recdata.dptr;
557 if (recdata.dsize != offsetof(struct ctdb_rec_data, data) +
558 rec->keylen + rec->datalen) {
559 DEBUG(DEBUG_ERR, (__location__ " Bad data size in recdata\n"));
563 key.dptr = &rec->data[0];
564 key.dsize = rec->keylen;
565 data.dptr = &rec->data[rec->keylen];
566 data.dsize = rec->datalen;
568 m = ctdb_marshall_add(c, NULL, rec->reqid, rec->reqid, key, NULL, data);
569 CTDB_NO_MEMORY(ctdb, m);
571 return ctdb_control_trans2_commit(ctdb, c, ctdb_marshall_finish(m), async_reply);