77cff9c6e3211cb4fb15d3d02c53b00008cef79f
[sahlberg/ctdb.git] / server / ctdb_persistent.c
1 /* 
2    persistent store logic
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "lib/events/events.h"
23 #include "system/filesys.h"
24 #include "system/wait.h"
25 #include "db_wrap.h"
26 #include "lib/tdb/include/tdb.h"
27 #include "../include/ctdb_private.h"
28
29 struct ctdb_persistent_state {
30         struct ctdb_context *ctdb;
31         struct ctdb_req_control *c;
32         const char *errormsg;
33         uint32_t num_pending;
34         int32_t status;
35 };
36
37 /*
38   called when a node has acknowledged a ctdb_control_update_record call
39  */
40 static void ctdb_persistent_callback(struct ctdb_context *ctdb,
41                                      int32_t status, TDB_DATA data, 
42                                      const char *errormsg,
43                                      void *private_data)
44 {
45         struct ctdb_persistent_state *state = talloc_get_type(private_data, 
46                                                               struct ctdb_persistent_state);
47
48         if (status != 0) {
49                 DEBUG(DEBUG_ERR,("ctdb_persistent_callback failed with status %d (%s)\n",
50                          status, errormsg));
51                 state->status = status;
52                 state->errormsg = errormsg;
53         }
54         state->num_pending--;
55         if (state->num_pending == 0) {
56                 ctdb_request_control_reply(state->ctdb, state->c, NULL, state->status, state->errormsg);
57                 talloc_free(state);
58         }
59 }
60
61 /*
62   called if persistent store times out
63  */
64 static void ctdb_persistent_store_timeout(struct event_context *ev, struct timed_event *te, 
65                                          struct timeval t, void *private_data)
66 {
67         struct ctdb_persistent_state *state = talloc_get_type(private_data, struct ctdb_persistent_state);
68         
69         ctdb_request_control_reply(state->ctdb, state->c, NULL, -1, "timeout in ctdb_persistent_state");
70
71         talloc_free(state);
72 }
73
74 /*
75   store a set of persistent records - called from a ctdb client when it has updated
76   some records in a persistent database. The client will have the record
77   locked for the duration of this call. The client is the dmaster when 
78   this call is made
79  */
80 int32_t ctdb_control_trans2_commit(struct ctdb_context *ctdb, 
81                                    struct ctdb_req_control *c, 
82                                    TDB_DATA recdata, bool *async_reply)
83 {
84         struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
85         struct ctdb_persistent_state *state;
86         int i;
87
88         if (client == NULL) {
89                 DEBUG(DEBUG_ERR,(__location__ " can not match persistent_store to a client. Returning error\n"));
90                 return -1;
91         }
92
93         /* handling num_persistent_updates is a bit strange - 
94            there are 3 cases
95              1) very old clients, which never called CTDB_CONTROL_START_PERSISTENT_UPDATE
96                 They don't expect num_persistent_updates to be used at all
97
98              2) less old clients, which uses CTDB_CONTROL_START_PERSISTENT_UPDATE, and expected
99                 this commit to then decrement it
100
101              3) new clients which use TRANS2 commit functions, and
102                 expect this function to increment the counter, and
103                 then have it decremented in ctdb_control_trans2_error
104                 or ctdb_control_trans2_finished
105         */
106         if (c->opcode == CTDB_CONTROL_PERSISTENT_STORE) {
107                 if (client->num_persistent_updates > 0) {
108                         client->num_persistent_updates--;
109                 }               
110         } else {
111                 client->num_persistent_updates++;
112         }
113
114         state = talloc_zero(ctdb, struct ctdb_persistent_state);
115         CTDB_NO_MEMORY(ctdb, state);
116
117         state->ctdb = ctdb;
118         state->c    = c;
119
120         for (i=0;i<ctdb->vnn_map->size;i++) {
121                 struct ctdb_node *node = ctdb->nodes[ctdb->vnn_map->map[i]];
122                 int ret;
123
124                 /* only send to active nodes */
125                 if (node->flags & NODE_FLAGS_INACTIVE) {
126                         continue;
127                 }
128
129                 /* don't send to ourselves */
130                 if (node->pnn == ctdb->pnn) {
131                         continue;
132                 }
133                 
134                 ret = ctdb_daemon_send_control(ctdb, node->pnn, 0, CTDB_CONTROL_UPDATE_RECORD,
135                                                c->client_id, 0, recdata, 
136                                                ctdb_persistent_callback, state);
137                 if (ret == -1) {
138                         DEBUG(DEBUG_ERR,("Unable to send CTDB_CONTROL_UPDATE_RECORD to pnn %u\n", node->pnn));
139                         talloc_free(state);
140                         return -1;
141                 }
142
143                 state->num_pending++;
144         }
145
146         if (state->num_pending == 0) {
147                 talloc_free(state);
148                 return 0;
149         }
150         
151         /* we need to wait for the replies */
152         *async_reply = true;
153
154         /* need to keep the control structure around */
155         talloc_steal(state, c);
156
157         /* but we won't wait forever */
158         event_add_timed(ctdb->ev, state, 
159                         timeval_current_ofs(ctdb->tunable.control_timeout, 0),
160                         ctdb_persistent_store_timeout, state);
161
162         return 0;
163 }
164
165
166 struct ctdb_persistent_write_state {
167         struct ctdb_db_context *ctdb_db;
168         struct ctdb_marshall_buffer *m;
169         struct ctdb_req_control *c;
170 };
171
172
173 /*
174   called from a child process to write the data
175  */
176 static int ctdb_persistent_store(struct ctdb_persistent_write_state *state)
177 {
178         int ret, i;
179         struct ctdb_rec_data *rec = NULL;
180         struct ctdb_marshall_buffer *m = state->m;
181
182         ret = tdb_transaction_start(state->ctdb_db->ltdb->tdb);
183         if (ret == -1) {
184                 DEBUG(DEBUG_ERR,("Failed to start transaction for db_id 0x%08x in ctdb_persistent_store\n",
185                                  state->ctdb_db->db_id));
186                 return -1;
187         }
188
189         for (i=0;i<m->count;i++) {
190                 struct ctdb_ltdb_header oldheader;
191                 struct ctdb_ltdb_header header;
192                 TDB_DATA key, data;
193
194                 rec = ctdb_marshall_loop_next(m, rec, NULL, &header, &key, &data);
195                 
196                 if (rec == NULL) {
197                         DEBUG(DEBUG_ERR,("Failed to get next record %d for db_id 0x%08x in ctdb_persistent_store\n",
198                                          i, state->ctdb_db->db_id));
199                         goto failed;                    
200                 }
201
202                 /* fetch the old header and ensure the rsn is less than the new rsn */
203                 ret = ctdb_ltdb_fetch(state->ctdb_db, key, &oldheader, NULL, NULL);
204                 if (ret != 0) {
205                         DEBUG(DEBUG_ERR,("Failed to fetch old record for db_id 0x%08x in ctdb_persistent_store\n",
206                                          state->ctdb_db->db_id));
207                         goto failed;
208                 }
209
210                 if (oldheader.rsn >= header.rsn) {
211                         DEBUG(DEBUG_CRIT,("existing header for db_id 0x%08x has larger RSN %llu than new RSN %llu in ctdb_persistent_store\n",
212                                           state->ctdb_db->db_id, 
213                                           (unsigned long long)oldheader.rsn, (unsigned long long)header.rsn));
214                         goto failed;
215                 }
216
217                 ret = ctdb_ltdb_store(state->ctdb_db, key, &header, data);
218                 if (ret != 0) {
219                         DEBUG(DEBUG_CRIT,("Failed to store record for db_id 0x%08x in ctdb_persistent_store\n", 
220                                           state->ctdb_db->db_id));
221                         return -1;
222                 }
223         }
224
225         ret = tdb_transaction_commit(state->ctdb_db->ltdb->tdb);
226         if (ret == -1) {
227                 DEBUG(DEBUG_ERR,("Failed to commit transaction for db_id 0x%08x in ctdb_persistent_store\n",
228                                  state->ctdb_db->db_id));
229                 return -1;
230         }
231
232         return 0;
233         
234 failed:
235         tdb_transaction_cancel(state->ctdb_db->ltdb->tdb);
236         return -1;
237 }
238
239
240 /*
241   called when we the child has completed the persistent write
242   on our behalf
243  */
244 static void ctdb_persistent_write_callback(int status, void *private_data)
245 {
246         struct ctdb_persistent_write_state *state = talloc_get_type(private_data, 
247                                                                    struct ctdb_persistent_write_state);
248
249
250         ctdb_request_control_reply(state->ctdb_db->ctdb, state->c, NULL, status, NULL);
251
252         talloc_free(state);
253 }
254
255 /*
256   called if our lockwait child times out
257  */
258 static void ctdb_persistent_lock_timeout(struct event_context *ev, struct timed_event *te, 
259                                          struct timeval t, void *private_data)
260 {
261         struct ctdb_persistent_write_state *state = talloc_get_type(private_data, 
262                                                                    struct ctdb_persistent_write_state);
263         ctdb_request_control_reply(state->ctdb_db->ctdb, state->c, NULL, -1, "timeout in ctdb_persistent_lock");
264         talloc_free(state);
265 }
266
267 struct childwrite_handle {
268         struct ctdb_context *ctdb;
269         struct ctdb_db_context *ctdb_db;
270         struct fd_event *fde;
271         int fd[2];
272         pid_t child;
273         void *private_data;
274         void (*callback)(int, void *);
275         struct timeval start_time;
276 };
277
278 static int childwrite_destructor(struct childwrite_handle *h)
279 {
280         h->ctdb->statistics.pending_childwrite_calls--;
281         kill(h->child, SIGKILL);
282         return 0;
283 }
284
285 /* called when the child process has finished writing the record to the
286    database
287 */
288 static void childwrite_handler(struct event_context *ev, struct fd_event *fde, 
289                              uint16_t flags, void *private_data)
290 {
291         struct childwrite_handle *h = talloc_get_type(private_data, 
292                                                      struct childwrite_handle);
293         void *p = h->private_data;
294         void (*callback)(int, void *) = h->callback;
295         pid_t child = h->child;
296         TALLOC_CTX *tmp_ctx = talloc_new(ev);
297         int ret;
298         char c;
299
300         ctdb_latency(&h->ctdb->statistics.max_childwrite_latency, h->start_time);
301         h->ctdb->statistics.pending_childwrite_calls--;
302
303         /* the handle needs to go away when the context is gone - when
304            the handle goes away this implicitly closes the pipe, which
305            kills the child */
306         talloc_steal(tmp_ctx, h);
307
308         talloc_set_destructor(h, NULL);
309
310         ret = read(h->fd[0], &c, 1);
311         if (ret < 1) {
312                 DEBUG(DEBUG_ERR, (__location__ " Read returned %d. Childwrite failed\n", ret));
313                 c = 1;
314         }
315
316         callback(c, p);
317
318         kill(child, SIGKILL);
319         talloc_free(tmp_ctx);
320 }
321
322 /* this creates a child process which will take out a tdb transaction
323    and write the record to the database.
324 */
325 struct childwrite_handle *ctdb_childwrite(struct ctdb_db_context *ctdb_db,
326                                 void (*callback)(int, void *private_data),
327                                 struct ctdb_persistent_write_state *state)
328 {
329         struct childwrite_handle *result;
330         int ret;
331         pid_t parent = getpid();
332
333         ctdb_db->ctdb->statistics.childwrite_calls++;
334         ctdb_db->ctdb->statistics.pending_childwrite_calls++;
335
336         if (!(result = talloc_zero(state, struct childwrite_handle))) {
337                 ctdb_db->ctdb->statistics.pending_childwrite_calls--;
338                 return NULL;
339         }
340
341         ret = pipe(result->fd);
342
343         if (ret != 0) {
344                 talloc_free(result);
345                 ctdb_db->ctdb->statistics.pending_childwrite_calls--;
346                 return NULL;
347         }
348
349         result->child = fork();
350
351         if (result->child == (pid_t)-1) {
352                 close(result->fd[0]);
353                 close(result->fd[1]);
354                 talloc_free(result);
355                 ctdb_db->ctdb->statistics.pending_childwrite_calls--;
356                 return NULL;
357         }
358
359         result->callback = callback;
360         result->private_data = state;
361         result->ctdb = ctdb_db->ctdb;
362         result->ctdb_db = ctdb_db;
363
364         if (result->child == 0) {
365                 char c = 0;
366
367                 close(result->fd[0]);
368                 ret = ctdb_persistent_store(state);
369                 if (ret != 0) {
370                         DEBUG(DEBUG_ERR, (__location__ " Failed to write persistent data\n"));
371                         c = 1;
372                 }
373
374                 write(result->fd[1], &c, 1);
375
376                 /* make sure we die when our parent dies */
377                 while (kill(parent, 0) == 0 || errno != ESRCH) {
378                         sleep(5);
379                 }
380                 _exit(0);
381         }
382
383         close(result->fd[1]);
384         talloc_set_destructor(result, childwrite_destructor);
385
386         result->fde = event_add_fd(ctdb_db->ctdb->ev, result, result->fd[0],
387                                    EVENT_FD_READ|EVENT_FD_AUTOCLOSE, childwrite_handler,
388                                    (void *)result);
389         if (result->fde == NULL) {
390                 talloc_free(result);
391                 ctdb_db->ctdb->statistics.pending_childwrite_calls--;
392                 return NULL;
393         }
394
395         result->start_time = timeval_current();
396
397         return result;
398 }
399
400 /* 
401    update a record on this node if the new record has a higher rsn than the
402    current record
403  */
404 int32_t ctdb_control_update_record(struct ctdb_context *ctdb, 
405                                    struct ctdb_req_control *c, TDB_DATA recdata, 
406                                    bool *async_reply)
407 {
408         struct ctdb_db_context *ctdb_db;
409         struct ctdb_persistent_write_state *state;
410         struct childwrite_handle *handle;
411         struct ctdb_marshall_buffer *m = (struct ctdb_marshall_buffer *)recdata.dptr;
412
413         if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
414                 DEBUG(DEBUG_INFO,("rejecting ctdb_control_update_record when recovery active\n"));
415                 return -1;
416         }
417
418         ctdb_db = find_ctdb_db(ctdb, m->db_id);
419         if (ctdb_db == NULL) {
420                 DEBUG(DEBUG_ERR,("Unknown database 0x%08x in ctdb_control_update_record\n", m->db_id));
421                 return -1;
422         }
423
424         state = talloc(ctdb, struct ctdb_persistent_write_state);
425         CTDB_NO_MEMORY(ctdb, state);
426
427         state->ctdb_db = ctdb_db;
428         state->c       = c;
429         state->m       = m;
430
431         /* create a child process to take out a transaction and 
432            write the data.
433         */
434         handle = ctdb_childwrite(ctdb_db, ctdb_persistent_write_callback, state);
435         if (handle == NULL) {
436                 DEBUG(DEBUG_ERR,("Failed to setup childwrite handler in ctdb_control_update_record\n"));
437                 talloc_free(state);
438                 return -1;
439         }
440
441         /* we need to wait for the replies */
442         *async_reply = true;
443
444         /* need to keep the control structure around */
445         talloc_steal(state, c);
446
447         /* but we won't wait forever */
448         event_add_timed(ctdb->ev, state, timeval_current_ofs(ctdb->tunable.control_timeout, 0),
449                         ctdb_persistent_lock_timeout, state);
450
451         return 0;
452 }
453
454
455 /*
456   called when a client has finished a local commit in a transaction to 
457   a persistent database
458  */
459 int32_t ctdb_control_trans2_finished(struct ctdb_context *ctdb, 
460                                      struct ctdb_req_control *c)
461 {
462         struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
463
464         if (client->num_persistent_updates == 0) {
465                 DEBUG(DEBUG_ERR, (__location__ " ERROR: num_persistent_updates == 0\n"));
466                 return -1;
467         }
468         client->num_persistent_updates--;
469
470         return 0;
471 }
472
473 /*
474   called when a client gets an error committing its database
475   during a transaction commit
476  */
477 int32_t ctdb_control_trans2_error(struct ctdb_context *ctdb, 
478                                   struct ctdb_req_control *c)
479 {
480         struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
481         
482         if (client->num_persistent_updates == 0) {
483                 DEBUG(DEBUG_ERR, (__location__ " ERROR: num_persistent_updates == 0\n"));
484                 return -1;
485         }
486         client->num_persistent_updates--;
487
488         DEBUG(DEBUG_ERR,(__location__ " Forcing recovery\n"));
489         client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
490
491         return 0;
492 }
493
494
495 /*
496   backwards compatibility:
497
498   start a persistent store operation. passing both the key, header and
499   data to the daemon. If the client disconnects before it has issued
500   a persistent_update call to the daemon we trigger a full recovery
501   to ensure the databases are brought back in sync.
502   for now we ignore the recdata that the client has passed to us.
503  */
504 int32_t ctdb_control_start_persistent_update(struct ctdb_context *ctdb, 
505                                       struct ctdb_req_control *c,
506                                       TDB_DATA recdata)
507 {
508         struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
509
510         if (client == NULL) {
511                 DEBUG(DEBUG_ERR,(__location__ " can not match start_persistent_update to a client. Returning error\n"));
512                 return -1;
513         }
514
515         client->num_persistent_updates++;
516
517         return 0;
518 }
519
520 /* 
521   backwards compatibility:
522
523   called to tell ctdbd that it is no longer doing a persistent update 
524 */
525 int32_t ctdb_control_cancel_persistent_update(struct ctdb_context *ctdb, 
526                                               struct ctdb_req_control *c,
527                                               TDB_DATA recdata)
528 {
529         struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
530
531         if (client == NULL) {
532                 DEBUG(DEBUG_ERR,(__location__ " can not match cancel_persistent_update to a client. Returning error\n"));
533                 return -1;
534         }
535
536         if (client->num_persistent_updates > 0) {
537                 client->num_persistent_updates--;
538         }
539
540         return 0;
541 }
542
543
544 /*
545   backwards compatibility:
546
547   single record varient of ctdb_control_trans2_commit for older clients
548  */
549 int32_t ctdb_control_persistent_store(struct ctdb_context *ctdb, 
550                                       struct ctdb_req_control *c, 
551                                       TDB_DATA recdata, bool *async_reply)
552 {
553         struct ctdb_marshall_buffer *m;
554         struct ctdb_rec_data *rec = (struct ctdb_rec_data *)recdata.dptr;
555         TDB_DATA key, data;
556
557         if (recdata.dsize != offsetof(struct ctdb_rec_data, data) + 
558             rec->keylen + rec->datalen) {
559                 DEBUG(DEBUG_ERR, (__location__ " Bad data size in recdata\n"));
560                 return -1;
561         }
562
563         key.dptr = &rec->data[0];
564         key.dsize = rec->keylen;
565         data.dptr = &rec->data[rec->keylen];
566         data.dsize = rec->datalen;
567
568         m = ctdb_marshall_add(c, NULL, rec->reqid, rec->reqid, key, NULL, data);
569         CTDB_NO_MEMORY(ctdb, m);
570
571         return ctdb_control_trans2_commit(ctdb, c, ctdb_marshall_finish(m), async_reply);
572 }
573
574