ctdb-recovery-helper: Introduce push database abstraction
[sfrench/samba-autobuild/.git] / ctdb / server / ctdb_recovery_helper.c
1 /*
2    ctdb parallel database recovery
3
4    Copyright (C) Amitay Isaacs  2015
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "replace.h"
21 #include "system/network.h"
22 #include "system/filesys.h"
23
24 #include <talloc.h>
25 #include <tevent.h>
26 #include <tdb.h>
27 #include <libgen.h>
28
29 #include "lib/tdb_wrap/tdb_wrap.h"
30 #include "lib/util/time.h"
31 #include "lib/util/tevent_unix.h"
32
33 #include "protocol/protocol.h"
34 #include "protocol/protocol_api.h"
35 #include "client/client.h"
36
37 static int recover_timeout = 120;
38
39 #define TIMEOUT()       timeval_current_ofs(recover_timeout, 0)
40
41 static void LOG(const char *fmt, ...)
42 {
43         va_list ap;
44
45         va_start(ap, fmt);
46         vfprintf(stderr, fmt, ap);
47         va_end(ap);
48 }
49
50 /*
51  * Utility functions
52  */
53
54 static ssize_t sys_write(int fd, const void *buf, size_t count)
55 {
56         ssize_t ret;
57
58         do {
59                 ret = write(fd, buf, count);
60 #if defined(EWOULDBLOCK)
61         } while (ret == -1 && (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK));
62 #else
63         } while (ret == -1 && (errno == EINTR || errno == EAGAIN));
64 #endif
65         return ret;
66 }
67
68 static bool generic_recv(struct tevent_req *req, int *perr)
69 {
70         int err;
71
72         if (tevent_req_is_unix_error(req, &err)) {
73                 if (perr != NULL) {
74                         *perr = err;
75                 }
76                 return false;
77         }
78
79         return true;
80 }
81
82 static uint64_t rec_srvid = CTDB_SRVID_RECOVERY;
83
84 static uint64_t srvid_next(void)
85 {
86         rec_srvid += 1;
87         return rec_srvid;
88 }
89
90 /*
91  * Recovery database functions
92  */
93
94 struct recdb_context {
95         uint32_t db_id;
96         const char *db_name;
97         const char *db_path;
98         struct tdb_wrap *db;
99         bool persistent;
100 };
101
102 static struct recdb_context *recdb_create(TALLOC_CTX *mem_ctx, uint32_t db_id,
103                                           const char *db_name,
104                                           const char *db_path,
105                                           uint32_t hash_size, bool persistent)
106 {
107         static char *db_dir_state = NULL;
108         struct recdb_context *recdb;
109         unsigned int tdb_flags;
110
111         recdb = talloc(mem_ctx, struct recdb_context);
112         if (recdb == NULL) {
113                 return NULL;
114         }
115
116         if (db_dir_state == NULL) {
117                 db_dir_state = getenv("CTDB_DBDIR_STATE");
118         }
119
120         recdb->db_name = db_name;
121         recdb->db_id = db_id;
122         recdb->db_path = talloc_asprintf(recdb, "%s/recdb.%s",
123                                          db_dir_state != NULL ?
124                                             db_dir_state :
125                                             dirname(discard_const(db_path)),
126                                          db_name);
127         if (recdb->db_path == NULL) {
128                 talloc_free(recdb);
129                 return NULL;
130         }
131         unlink(recdb->db_path);
132
133         tdb_flags = TDB_NOLOCK | TDB_INCOMPATIBLE_HASH | TDB_DISALLOW_NESTING;
134         recdb->db = tdb_wrap_open(mem_ctx, recdb->db_path, hash_size,
135                                   tdb_flags, O_RDWR|O_CREAT|O_EXCL, 0600);
136         if (recdb->db == NULL) {
137                 talloc_free(recdb);
138                 LOG("failed to create recovery db %s\n", recdb->db_path);
139         }
140
141         recdb->persistent = persistent;
142
143         return recdb;
144 }
145
146 static uint32_t recdb_id(struct recdb_context *recdb)
147 {
148         return recdb->db_id;
149 }
150
151 static const char *recdb_name(struct recdb_context *recdb)
152 {
153         return recdb->db_name;
154 }
155
156 static const char *recdb_path(struct recdb_context *recdb)
157 {
158         return recdb->db_path;
159 }
160
161 static struct tdb_context *recdb_tdb(struct recdb_context *recdb)
162 {
163         return recdb->db->tdb;
164 }
165
166 static bool recdb_persistent(struct recdb_context *recdb)
167 {
168         return recdb->persistent;
169 }
170
171 struct recdb_add_traverse_state {
172         struct recdb_context *recdb;
173         int mypnn;
174 };
175
176 static int recdb_add_traverse(uint32_t reqid, struct ctdb_ltdb_header *header,
177                               TDB_DATA key, TDB_DATA data,
178                               void *private_data)
179 {
180         struct recdb_add_traverse_state *state =
181                 (struct recdb_add_traverse_state *)private_data;
182         struct ctdb_ltdb_header *hdr;
183         TDB_DATA prev_data;
184         int ret;
185
186         /* header is not marshalled separately in the pulldb control */
187         if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
188                 return -1;
189         }
190
191         hdr = (struct ctdb_ltdb_header *)data.dptr;
192
193         /* fetch the existing record, if any */
194         prev_data = tdb_fetch(recdb_tdb(state->recdb), key);
195
196         if (prev_data.dptr != NULL) {
197                 struct ctdb_ltdb_header prev_hdr;
198
199                 prev_hdr = *(struct ctdb_ltdb_header *)prev_data.dptr;
200                 free(prev_data.dptr);
201                 if (hdr->rsn < prev_hdr.rsn ||
202                     (hdr->rsn == prev_hdr.rsn &&
203                      prev_hdr.dmaster != state->mypnn)) {
204                         return 0;
205                 }
206         }
207
208         ret = tdb_store(recdb_tdb(state->recdb), key, data, TDB_REPLACE);
209         if (ret != 0) {
210                 return -1;
211         }
212         return 0;
213 }
214
215 static bool recdb_add(struct recdb_context *recdb, int mypnn,
216                       struct ctdb_rec_buffer *recbuf)
217 {
218         struct recdb_add_traverse_state state;
219         int ret;
220
221         state.recdb = recdb;
222         state.mypnn = mypnn;
223
224         ret = ctdb_rec_buffer_traverse(recbuf, recdb_add_traverse, &state);
225         if (ret != 0) {
226                 return false;
227         }
228
229         return true;
230 }
231
232 /* This function decides which records from recdb are retained */
233 static int recbuf_filter_add(struct ctdb_rec_buffer *recbuf, bool persistent,
234                              uint32_t reqid, uint32_t dmaster,
235                              TDB_DATA key, TDB_DATA data)
236 {
237         struct ctdb_ltdb_header *header;
238         int ret;
239
240         /*
241          * skip empty records - but NOT for persistent databases:
242          *
243          * The record-by-record mode of recovery deletes empty records.
244          * For persistent databases, this can lead to data corruption
245          * by deleting records that should be there:
246          *
247          * - Assume the cluster has been running for a while.
248          *
249          * - A record R in a persistent database has been created and
250          *   deleted a couple of times, the last operation being deletion,
251          *   leaving an empty record with a high RSN, say 10.
252          *
253          * - Now a node N is turned off.
254          *
255          * - This leaves the local database copy of D on N with the empty
256          *   copy of R and RSN 10. On all other nodes, the recovery has deleted
257          *   the copy of record R.
258          *
259          * - Now the record is created again while node N is turned off.
260          *   This creates R with RSN = 1 on all nodes except for N.
261          *
262          * - Now node N is turned on again. The following recovery will chose
263          *   the older empty copy of R due to RSN 10 > RSN 1.
264          *
265          * ==> Hence the record is gone after the recovery.
266          *
267          * On databases like Samba's registry, this can damage the higher-level
268          * data structures built from the various tdb-level records.
269          */
270         if (!persistent && data.dsize <= sizeof(struct ctdb_ltdb_header)) {
271                 return 0;
272         }
273
274         /* update the dmaster field to point to us */
275         header = (struct ctdb_ltdb_header *)data.dptr;
276         if (!persistent) {
277                 header->dmaster = dmaster;
278                 header->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
279         }
280
281         ret = ctdb_rec_buffer_add(recbuf, recbuf, reqid, NULL, key, data);
282         if (ret != 0) {
283                 return ret;
284         }
285
286         return 0;
287 }
288
289 struct recdb_records_traverse_state {
290         struct ctdb_rec_buffer *recbuf;
291         uint32_t dmaster;
292         uint32_t reqid;
293         bool persistent;
294         bool failed;
295 };
296
297 static int recdb_records_traverse(struct tdb_context *tdb,
298                                   TDB_DATA key, TDB_DATA data,
299                                   void *private_data)
300 {
301         struct recdb_records_traverse_state *state =
302                 (struct recdb_records_traverse_state *)private_data;
303         int ret;
304
305         ret = recbuf_filter_add(state->recbuf, state->persistent,
306                                 state->reqid, state->dmaster, key, data);
307         if (ret != 0) {
308                 state->failed = true;
309                 return ret;
310         }
311
312         return 0;
313 }
314
315 static struct ctdb_rec_buffer *recdb_records(struct recdb_context *recdb,
316                                              TALLOC_CTX *mem_ctx,
317                                              uint32_t dmaster)
318 {
319         struct recdb_records_traverse_state state;
320         int ret;
321
322         state.recbuf = ctdb_rec_buffer_init(mem_ctx, recdb_id(recdb));
323         if (state.recbuf == NULL) {
324                 return NULL;
325         }
326         state.dmaster = dmaster;
327         state.reqid = 0;
328         state.persistent = recdb_persistent(recdb);
329         state.failed = false;
330
331         ret = tdb_traverse_read(recdb_tdb(recdb), recdb_records_traverse,
332                                 &state);
333         if (ret == -1 || state.failed) {
334                 LOG("Failed to marshall recovery records for %s\n",
335                     recdb_name(recdb));
336                 TALLOC_FREE(state.recbuf);
337                 return NULL;
338         }
339
340         return state.recbuf;
341 }
342
343 struct recdb_file_traverse_state {
344         struct ctdb_rec_buffer *recbuf;
345         struct recdb_context *recdb;
346         TALLOC_CTX *mem_ctx;
347         uint32_t dmaster;
348         uint32_t reqid;
349         bool persistent;
350         bool failed;
351         int fd;
352         int max_size;
353         int num_buffers;
354 };
355
356 static int recdb_file_traverse(struct tdb_context *tdb,
357                                TDB_DATA key, TDB_DATA data,
358                                void *private_data)
359 {
360         struct recdb_file_traverse_state *state =
361                 (struct recdb_file_traverse_state *)private_data;
362         int ret;
363
364         ret = recbuf_filter_add(state->recbuf, state->persistent,
365                                 state->reqid, state->dmaster, key, data);
366         if (ret != 0) {
367                 state->failed = true;
368                 return ret;
369         }
370
371         if (ctdb_rec_buffer_len(state->recbuf) > state->max_size) {
372                 ret = ctdb_rec_buffer_write(state->recbuf, state->fd);
373                 if (ret != 0) {
374                         LOG("Failed to collect recovery records for %s\n",
375                             recdb_name(state->recdb));
376                         state->failed = true;
377                         return ret;
378                 }
379
380                 state->num_buffers += 1;
381
382                 TALLOC_FREE(state->recbuf);
383                 state->recbuf = ctdb_rec_buffer_init(state->mem_ctx,
384                                                      recdb_id(state->recdb));
385                 if (state->recbuf == NULL) {
386                         state->failed = true;
387                         return ENOMEM;
388                 }
389         }
390
391         return 0;
392 }
393
394 static int recdb_file(struct recdb_context *recdb, TALLOC_CTX *mem_ctx,
395                       uint32_t dmaster, int fd, int max_size)
396 {
397         struct recdb_file_traverse_state state;
398         int ret;
399
400         state.recbuf = ctdb_rec_buffer_init(mem_ctx, recdb_id(recdb));
401         if (state.recbuf == NULL) {
402                 return -1;
403         }
404         state.recdb = recdb;
405         state.mem_ctx = mem_ctx;
406         state.dmaster = dmaster;
407         state.reqid = 0;
408         state.persistent = recdb_persistent(recdb);
409         state.failed = false;
410         state.fd = fd;
411         state.max_size = max_size;
412         state.num_buffers = 0;
413
414         ret = tdb_traverse_read(recdb_tdb(recdb), recdb_file_traverse, &state);
415         if (ret == -1 || state.failed) {
416                 TALLOC_FREE(state.recbuf);
417                 return -1;
418         }
419
420         ret = ctdb_rec_buffer_write(state.recbuf, fd);
421         if (ret != 0) {
422                 LOG("Failed to collect recovery records for %s\n",
423                     recdb_name(recdb));
424                 TALLOC_FREE(state.recbuf);
425                 return -1;
426         }
427         state.num_buffers += 1;
428
429         LOG("Wrote %d buffers of recovery records for %s\n",
430             state.num_buffers, recdb_name(recdb));
431
432         return state.num_buffers;
433 }
434
435 /*
436  * Pull database from a single node
437  */
438
439 struct pull_database_state {
440         struct tevent_context *ev;
441         struct ctdb_client_context *client;
442         struct recdb_context *recdb;
443         uint32_t pnn;
444         uint64_t srvid;
445         int num_records;
446 };
447
448 static void pull_database_handler(uint64_t srvid, TDB_DATA data,
449                                   void *private_data);
450 static void pull_database_register_done(struct tevent_req *subreq);
451 static void pull_database_old_done(struct tevent_req *subreq);
452 static void pull_database_unregister_done(struct tevent_req *subreq);
453 static void pull_database_new_done(struct tevent_req *subreq);
454
455 static struct tevent_req *pull_database_send(
456                         TALLOC_CTX *mem_ctx,
457                         struct tevent_context *ev,
458                         struct ctdb_client_context *client,
459                         uint32_t pnn, uint32_t caps,
460                         struct recdb_context *recdb)
461 {
462         struct tevent_req *req, *subreq;
463         struct pull_database_state *state;
464         struct ctdb_req_control request;
465
466         req = tevent_req_create(mem_ctx, &state, struct pull_database_state);
467         if (req == NULL) {
468                 return NULL;
469         }
470
471         state->ev = ev;
472         state->client = client;
473         state->recdb = recdb;
474         state->pnn = pnn;
475         state->srvid = srvid_next();
476
477         if (caps & CTDB_CAP_FRAGMENTED_CONTROLS) {
478                 subreq = ctdb_client_set_message_handler_send(
479                                         state, state->ev, state->client,
480                                         state->srvid, pull_database_handler,
481                                         req);
482                 if (tevent_req_nomem(subreq, req)) {
483                         return tevent_req_post(req, ev);
484                 }
485
486                 tevent_req_set_callback(subreq, pull_database_register_done,
487                                         req);
488
489         } else {
490                 struct ctdb_pulldb pulldb;
491
492                 pulldb.db_id = recdb_id(recdb);
493                 pulldb.lmaster = CTDB_LMASTER_ANY;
494
495                 ctdb_req_control_pull_db(&request, &pulldb);
496                 subreq = ctdb_client_control_send(state, state->ev,
497                                                   state->client,
498                                                   pnn, TIMEOUT(),
499                                                   &request);
500                 if (tevent_req_nomem(subreq, req)) {
501                         return tevent_req_post(req, ev);
502                 }
503                 tevent_req_set_callback(subreq, pull_database_old_done, req);
504         }
505
506         return req;
507 }
508
509 static void pull_database_handler(uint64_t srvid, TDB_DATA data,
510                                   void *private_data)
511 {
512         struct tevent_req *req = talloc_get_type_abort(
513                 private_data, struct tevent_req);
514         struct pull_database_state *state = tevent_req_data(
515                 req, struct pull_database_state);
516         struct ctdb_rec_buffer *recbuf;
517         int ret;
518         bool status;
519
520         if (srvid != state->srvid) {
521                 return;
522         }
523
524         ret = ctdb_rec_buffer_pull(data.dptr, data.dsize, state, &recbuf);
525         if (ret != 0) {
526                 LOG("Invalid data received for DB_PULL messages\n");
527                 return;
528         }
529
530         if (recbuf->db_id != recdb_id(state->recdb)) {
531                 talloc_free(recbuf);
532                 LOG("Invalid dbid:%08x for DB_PULL messages for %s\n",
533                     recbuf->db_id, recdb_name(state->recdb));
534                 return;
535         }
536
537         status = recdb_add(state->recdb, ctdb_client_pnn(state->client),
538                            recbuf);
539         if (! status) {
540                 talloc_free(recbuf);
541                 LOG("Failed to add records to recdb for %s\n",
542                     recdb_name(state->recdb));
543                 return;
544         }
545
546         state->num_records += recbuf->count;
547         talloc_free(recbuf);
548 }
549
550 static void pull_database_register_done(struct tevent_req *subreq)
551 {
552         struct tevent_req *req = tevent_req_callback_data(
553                 subreq, struct tevent_req);
554         struct pull_database_state *state = tevent_req_data(
555                 req, struct pull_database_state);
556         struct ctdb_req_control request;
557         struct ctdb_pulldb_ext pulldb_ext;
558         int ret;
559         bool status;
560
561         status = ctdb_client_set_message_handler_recv(subreq, &ret);
562         TALLOC_FREE(subreq);
563         if (! status) {
564                 LOG("failed to set message handler for DB_PULL for %s\n",
565                     recdb_name(state->recdb));
566                 tevent_req_error(req, ret);
567                 return;
568         }
569
570         pulldb_ext.db_id = recdb_id(state->recdb);
571         pulldb_ext.lmaster = CTDB_LMASTER_ANY;
572         pulldb_ext.srvid = state->srvid;
573
574         ctdb_req_control_db_pull(&request, &pulldb_ext);
575         subreq = ctdb_client_control_send(state, state->ev, state->client,
576                                           state->pnn, TIMEOUT(), &request);
577         if (tevent_req_nomem(subreq, req)) {
578                 return;
579         }
580         tevent_req_set_callback(subreq, pull_database_new_done, req);
581 }
582
583 static void pull_database_old_done(struct tevent_req *subreq)
584 {
585         struct tevent_req *req = tevent_req_callback_data(
586                 subreq, struct tevent_req);
587         struct pull_database_state *state = tevent_req_data(
588                 req, struct pull_database_state);
589         struct ctdb_reply_control *reply;
590         struct ctdb_rec_buffer *recbuf;
591         int ret;
592         bool status;
593
594         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
595         TALLOC_FREE(subreq);
596         if (! status) {
597                 LOG("control PULL_DB failed for %s on node %u, ret=%d\n",
598                     recdb_name(state->recdb), state->pnn, ret);
599                 tevent_req_error(req, ret);
600                 return;
601         }
602
603         ret = ctdb_reply_control_pull_db(reply, state, &recbuf);
604         talloc_free(reply);
605         if (ret != 0) {
606                 tevent_req_error(req, ret);
607                 return;
608         }
609
610         status = recdb_add(state->recdb, ctdb_client_pnn(state->client),
611                            recbuf);
612         if (! status) {
613                 talloc_free(recbuf);
614                 tevent_req_error(req, EIO);
615                 return;
616         }
617
618         state->num_records = recbuf->count;
619         talloc_free(recbuf);
620
621         LOG("Pulled %d records for db %s from node %d\n",
622             state->num_records, recdb_name(state->recdb), state->pnn);
623
624         tevent_req_done(req);
625 }
626
627 static void pull_database_new_done(struct tevent_req *subreq)
628 {
629         struct tevent_req *req = tevent_req_callback_data(
630                 subreq, struct tevent_req);
631         struct pull_database_state *state = tevent_req_data(
632                 req, struct pull_database_state);
633         struct ctdb_reply_control *reply;
634         uint32_t num_records;
635         int ret;
636         bool status;
637
638         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
639         TALLOC_FREE(subreq);
640         if (! status) {
641                 LOG("control DB_PULL failed for %s on node %u, ret=%d\n",
642                     recdb_name(state->recdb), state->pnn, ret);
643                 tevent_req_error(req, ret);
644                 return;
645         }
646
647         ret = ctdb_reply_control_db_pull(reply, &num_records);
648         talloc_free(reply);
649         if (num_records != state->num_records) {
650                 LOG("mismatch (%u != %u) in DB_PULL records for %s\n",
651                     num_records, state->num_records, recdb_name(state->recdb));
652                 tevent_req_error(req, EIO);
653                 return;
654         }
655
656         LOG("Pulled %d records for db %s from node %d\n",
657             state->num_records, recdb_name(state->recdb), state->pnn);
658
659         subreq = ctdb_client_remove_message_handler_send(
660                                         state, state->ev, state->client,
661                                         state->srvid, req);
662         if (tevent_req_nomem(subreq, req)) {
663                 return;
664         }
665         tevent_req_set_callback(subreq, pull_database_unregister_done, req);
666 }
667
668 static void pull_database_unregister_done(struct tevent_req *subreq)
669 {
670         struct tevent_req *req = tevent_req_callback_data(
671                 subreq, struct tevent_req);
672         struct pull_database_state *state = tevent_req_data(
673                 req, struct pull_database_state);
674         int ret;
675         bool status;
676
677         status = ctdb_client_remove_message_handler_recv(subreq, &ret);
678         TALLOC_FREE(subreq);
679         if (! status) {
680                 LOG("failed to remove message handler for DB_PULL for %s\n",
681                     recdb_name(state->recdb));
682                 tevent_req_error(req, ret);
683                 return;
684         }
685
686         tevent_req_done(req);
687 }
688
689 static bool pull_database_recv(struct tevent_req *req, int *perr)
690 {
691         return generic_recv(req, perr);
692 }
693
694 /*
695  * Push database to specified nodes (old style)
696  */
697
698 struct push_database_old_state {
699         struct tevent_context *ev;
700         struct ctdb_client_context *client;
701         struct recdb_context *recdb;
702         uint32_t *pnn_list;
703         int count;
704         struct ctdb_rec_buffer *recbuf;
705         int index;
706 };
707
708 static void push_database_old_push_done(struct tevent_req *subreq);
709
710 static struct tevent_req *push_database_old_send(
711                         TALLOC_CTX *mem_ctx,
712                         struct tevent_context *ev,
713                         struct ctdb_client_context *client,
714                         uint32_t *pnn_list, int count,
715                         struct recdb_context *recdb)
716 {
717         struct tevent_req *req, *subreq;
718         struct push_database_old_state *state;
719         struct ctdb_req_control request;
720         uint32_t pnn;
721
722         req = tevent_req_create(mem_ctx, &state,
723                                 struct push_database_old_state);
724         if (req == NULL) {
725                 return NULL;
726         }
727
728         state->ev = ev;
729         state->client = client;
730         state->recdb = recdb;
731         state->pnn_list = pnn_list;
732         state->count = count;
733         state->index = 0;
734
735         state->recbuf = recdb_records(recdb, state,
736                                       ctdb_client_pnn(client));
737         if (tevent_req_nomem(state->recbuf, req)) {
738                 return tevent_req_post(req, ev);
739         }
740
741         pnn = state->pnn_list[state->index];
742
743         ctdb_req_control_push_db(&request, state->recbuf);
744         subreq = ctdb_client_control_send(state, ev, client, pnn,
745                                           TIMEOUT(), &request);
746         if (tevent_req_nomem(subreq, req)) {
747                 return tevent_req_post(req, ev);
748         }
749         tevent_req_set_callback(subreq, push_database_old_push_done, req);
750
751         return req;
752 }
753
754 static void push_database_old_push_done(struct tevent_req *subreq)
755 {
756         struct tevent_req *req = tevent_req_callback_data(
757                 subreq, struct tevent_req);
758         struct push_database_old_state *state = tevent_req_data(
759                 req, struct push_database_old_state);
760         struct ctdb_req_control request;
761         uint32_t pnn;
762         int ret;
763         bool status;
764
765         status = ctdb_client_control_recv(subreq, &ret, NULL, NULL);
766         TALLOC_FREE(subreq);
767         if (! status) {
768                 LOG("control PUSH_DB failed for db %s on node %u, ret=%d\n",
769                     recdb_name(state->recdb), state->pnn_list[state->index],
770                     ret);
771                 tevent_req_error(req, ret);
772                 return;
773         }
774
775         state->index += 1;
776         if (state->index == state->count) {
777                 TALLOC_FREE(state->recbuf);
778                 tevent_req_done(req);
779                 return;
780         }
781
782         pnn = state->pnn_list[state->index];
783
784         ctdb_req_control_push_db(&request, state->recbuf);
785         subreq = ctdb_client_control_send(state, state->ev, state->client,
786                                           pnn, TIMEOUT(), &request);
787         if (tevent_req_nomem(subreq, req)) {
788                 return;
789         }
790         tevent_req_set_callback(subreq, push_database_old_push_done, req);
791 }
792
793 static bool push_database_old_recv(struct tevent_req *req, int *perr)
794 {
795         return generic_recv(req, perr);
796 }
797
798 /*
799  * Push database to specified nodes (new style)
800  */
801
802 struct push_database_new_state {
803         struct tevent_context *ev;
804         struct ctdb_client_context *client;
805         struct recdb_context *recdb;
806         uint32_t *pnn_list;
807         int count;
808         uint64_t srvid;
809         uint32_t dmaster;
810         int fd;
811         int num_buffers;
812         int num_buffers_sent;
813         int num_records;
814 };
815
816 static void push_database_new_started(struct tevent_req *subreq);
817 static void push_database_new_send_msg(struct tevent_req *req);
818 static void push_database_new_send_done(struct tevent_req *subreq);
819 static void push_database_new_confirmed(struct tevent_req *subreq);
820
821 static struct tevent_req *push_database_new_send(
822                         TALLOC_CTX *mem_ctx,
823                         struct tevent_context *ev,
824                         struct ctdb_client_context *client,
825                         uint32_t *pnn_list, int count,
826                         struct recdb_context *recdb,
827                         int max_size)
828 {
829         struct tevent_req *req, *subreq;
830         struct push_database_new_state *state;
831         struct ctdb_req_control request;
832         struct ctdb_pulldb_ext pulldb_ext;
833         char *filename;
834         off_t offset;
835
836         req = tevent_req_create(mem_ctx, &state,
837                                 struct push_database_new_state);
838         if (req == NULL) {
839                 return NULL;
840         }
841
842         state->ev = ev;
843         state->client = client;
844         state->recdb = recdb;
845         state->pnn_list = pnn_list;
846         state->count = count;
847
848         state->srvid = srvid_next();
849         state->dmaster = ctdb_client_pnn(client);
850         state->num_buffers_sent = 0;
851         state->num_records = 0;
852
853         filename = talloc_asprintf(state, "%s.dat", recdb_path(recdb));
854         if (tevent_req_nomem(filename, req)) {
855                 return tevent_req_post(req, ev);
856         }
857
858         state->fd = open(filename, O_RDWR|O_CREAT, 0644);
859         if (state->fd == -1) {
860                 tevent_req_error(req, errno);
861                 return tevent_req_post(req, ev);
862         }
863         unlink(filename);
864         talloc_free(filename);
865
866         state->num_buffers = recdb_file(recdb, state, state->dmaster,
867                                         state->fd, max_size);
868         if (state->num_buffers == -1) {
869                 tevent_req_error(req, ENOMEM);
870                 return tevent_req_post(req, ev);
871         }
872
873         offset = lseek(state->fd, 0, SEEK_SET);
874         if (offset != 0) {
875                 tevent_req_error(req, EIO);
876                 return tevent_req_post(req, ev);
877         }
878
879         pulldb_ext.db_id = recdb_id(recdb);
880         pulldb_ext.srvid = state->srvid;
881
882         ctdb_req_control_db_push_start(&request, &pulldb_ext);
883         subreq = ctdb_client_control_multi_send(state, ev, client,
884                                                 pnn_list, count,
885                                                 TIMEOUT(), &request);
886         if (tevent_req_nomem(subreq, req)) {
887                 return tevent_req_post(req, ev);
888         }
889         tevent_req_set_callback(subreq, push_database_new_started, req);
890
891         return req;
892 }
893
894 static void push_database_new_started(struct tevent_req *subreq)
895 {
896         struct tevent_req *req = tevent_req_callback_data(
897                 subreq, struct tevent_req);
898         struct push_database_new_state *state = tevent_req_data(
899                 req, struct push_database_new_state);
900         int *err_list;
901         int ret;
902         bool status;
903
904         status = ctdb_client_control_multi_recv(subreq, &ret, state,
905                                                 &err_list, NULL);
906         TALLOC_FREE(subreq);
907         if (! status) {
908                 int ret2;
909                 uint32_t pnn;
910
911                 ret2 = ctdb_client_control_multi_error(state->pnn_list,
912                                                        state->count,
913                                                        err_list, &pnn);
914                 if (ret2 != 0) {
915                         LOG("control DB_PUSH_START failed for db %s "
916                             "on node %u, ret=%d\n",
917                             recdb_name(state->recdb), pnn, ret2);
918                 } else {
919                         LOG("control DB_PUSH_START failed for db %s, ret=%d\n",
920                             recdb_name(state->recdb), ret);
921                 }
922                 talloc_free(err_list);
923
924                 tevent_req_error(req, ret);
925                 return;
926         }
927
928         push_database_new_send_msg(req);
929 }
930
931 static void push_database_new_send_msg(struct tevent_req *req)
932 {
933         struct push_database_new_state *state = tevent_req_data(
934                 req, struct push_database_new_state);
935         struct tevent_req *subreq;
936         struct ctdb_rec_buffer *recbuf;
937         struct ctdb_req_message message;
938         TDB_DATA data;
939         int ret;
940
941         if (state->num_buffers_sent == state->num_buffers) {
942                 struct ctdb_req_control request;
943
944                 ctdb_req_control_db_push_confirm(&request,
945                                                  recdb_id(state->recdb));
946                 subreq = ctdb_client_control_multi_send(state, state->ev,
947                                                         state->client,
948                                                         state->pnn_list,
949                                                         state->count,
950                                                         TIMEOUT(), &request);
951                 if (tevent_req_nomem(subreq, req)) {
952                         return;
953                 }
954                 tevent_req_set_callback(subreq, push_database_new_confirmed,
955                                         req);
956                 return;
957         }
958
959         ret = ctdb_rec_buffer_read(state->fd, state, &recbuf);
960         if (ret != 0) {
961                 tevent_req_error(req, ret);
962                 return;
963         }
964
965         data.dsize = ctdb_rec_buffer_len(recbuf);
966         data.dptr = talloc_size(state, data.dsize);
967         if (tevent_req_nomem(data.dptr, req)) {
968                 return;
969         }
970
971         ctdb_rec_buffer_push(recbuf, data.dptr);
972
973         message.srvid = state->srvid;
974         message.data.data = data;
975
976         LOG("Pushing buffer %d with %d records for %s\n",
977             state->num_buffers_sent, recbuf->count, recdb_name(state->recdb));
978
979         subreq = ctdb_client_message_multi_send(state, state->ev,
980                                                 state->client,
981                                                 state->pnn_list, state->count,
982                                                 &message);
983         if (tevent_req_nomem(subreq, req)) {
984                 return;
985         }
986         tevent_req_set_callback(subreq, push_database_new_send_done, req);
987
988         state->num_records += recbuf->count;
989
990         talloc_free(data.dptr);
991         talloc_free(recbuf);
992 }
993
994 static void push_database_new_send_done(struct tevent_req *subreq)
995 {
996         struct tevent_req *req = tevent_req_callback_data(
997                 subreq, struct tevent_req);
998         struct push_database_new_state *state = tevent_req_data(
999                 req, struct push_database_new_state);
1000         bool status;
1001         int ret;
1002
1003         status = ctdb_client_message_multi_recv(subreq, &ret, NULL, NULL);
1004         TALLOC_FREE(subreq);
1005         if (! status) {
1006                 LOG("Sending recovery records failed for %s\n",
1007                     recdb_name(state->recdb));
1008                 tevent_req_error(req, ret);
1009                 return;
1010         }
1011
1012         state->num_buffers_sent += 1;
1013
1014         push_database_new_send_msg(req);
1015 }
1016
1017 static void push_database_new_confirmed(struct tevent_req *subreq)
1018 {
1019         struct tevent_req *req = tevent_req_callback_data(
1020                 subreq, struct tevent_req);
1021         struct push_database_new_state *state = tevent_req_data(
1022                 req, struct push_database_new_state);
1023         struct ctdb_reply_control **reply;
1024         int *err_list;
1025         bool status;
1026         int ret, i;
1027         uint32_t num_records;
1028
1029         status = ctdb_client_control_multi_recv(subreq, &ret, state,
1030                                                 &err_list, &reply);
1031         TALLOC_FREE(subreq);
1032         if (! status) {
1033                 int ret2;
1034                 uint32_t pnn;
1035
1036                 ret2 = ctdb_client_control_multi_error(state->pnn_list,
1037                                                        state->count, err_list,
1038                                                        &pnn);
1039                 if (ret2 != 0) {
1040                         LOG("control DB_PUSH_CONFIRM failed for %s on node %u,"
1041                             " ret=%d\n", recdb_name(state->recdb), pnn, ret2);
1042                 } else {
1043                         LOG("control DB_PUSH_CONFIRM failed for %s, ret=%d\n",
1044                             recdb_name(state->recdb), ret);
1045                 }
1046                 tevent_req_error(req, ret);
1047                 return;
1048         }
1049
1050         for (i=0; i<state->count; i++) {
1051                 ret = ctdb_reply_control_db_push_confirm(reply[i],
1052                                                          &num_records);
1053                 if (ret != 0) {
1054                         tevent_req_error(req, EPROTO);
1055                         return;
1056                 }
1057
1058                 if (num_records != state->num_records) {
1059                         LOG("Node %u received %d of %d records for %s\n",
1060                             state->pnn_list[i], num_records,
1061                             state->num_records, recdb_name(state->recdb));
1062                         tevent_req_error(req, EPROTO);
1063                         return;
1064                 }
1065         }
1066
1067         talloc_free(reply);
1068
1069         LOG("Pushed %d records for db %s\n",
1070             state->num_records, recdb_name(state->recdb));
1071
1072         tevent_req_done(req);
1073 }
1074
1075 static bool push_database_new_recv(struct tevent_req *req, int *perr)
1076 {
1077         return generic_recv(req, perr);
1078 }
1079
1080 /*
1081  * wrapper for push_database_old and push_database_new
1082  */
1083
1084 struct push_database_state {
1085         bool old_done, new_done;
1086 };
1087
1088 static void push_database_old_done(struct tevent_req *subreq);
1089 static void push_database_new_done(struct tevent_req *subreq);
1090
1091 static struct tevent_req *push_database_send(
1092                         TALLOC_CTX *mem_ctx,
1093                         struct tevent_context *ev,
1094                         struct ctdb_client_context *client,
1095                         uint32_t *pnn_list, int count, uint32_t *caps,
1096                         struct ctdb_tunable_list *tun_list,
1097                         struct recdb_context *recdb)
1098 {
1099         struct tevent_req *req, *subreq;
1100         struct push_database_state *state;
1101         uint32_t *old_list, *new_list;
1102         int old_count, new_count;
1103         int i;
1104
1105         req = tevent_req_create(mem_ctx, &state, struct push_database_state);
1106         if (req == NULL) {
1107                 return NULL;
1108         }
1109
1110         state->old_done = false;
1111         state->new_done = false;
1112
1113         old_count = 0;
1114         new_count = 0;
1115         old_list = talloc_array(state, uint32_t, count);
1116         new_list = talloc_array(state, uint32_t, count);
1117         if (tevent_req_nomem(old_list, req) ||
1118             tevent_req_nomem(new_list,req)) {
1119                 return tevent_req_post(req, ev);
1120         }
1121
1122         for (i=0; i<count; i++) {
1123                 uint32_t pnn = pnn_list[i];
1124
1125                 if (caps[pnn] & CTDB_CAP_FRAGMENTED_CONTROLS) {
1126                         new_list[new_count] = pnn;
1127                         new_count += 1;
1128                 } else {
1129                         old_list[old_count] = pnn;
1130                         old_count += 1;
1131                 }
1132         }
1133
1134         if (old_count > 0) {
1135                 subreq = push_database_old_send(state, ev, client,
1136                                                 old_list, old_count, recdb);
1137                 if (tevent_req_nomem(subreq, req)) {
1138                         return tevent_req_post(req, ev);
1139                 }
1140                 tevent_req_set_callback(subreq, push_database_old_done, req);
1141         } else {
1142                 state->old_done = true;
1143         }
1144
1145         if (new_count > 0) {
1146                 subreq = push_database_new_send(state, ev, client,
1147                                                 new_list, new_count, recdb,
1148                                                 tun_list->rec_buffer_size_limit);
1149                 if (tevent_req_nomem(subreq, req)) {
1150                         return tevent_req_post(req, ev);
1151                 }
1152                 tevent_req_set_callback(subreq, push_database_new_done, req);
1153         } else {
1154                 state->new_done = true;
1155         }
1156
1157         return req;
1158 }
1159
1160 static void push_database_old_done(struct tevent_req *subreq)
1161 {
1162         struct tevent_req *req = tevent_req_callback_data(
1163                 subreq, struct tevent_req);
1164         struct push_database_state *state = tevent_req_data(
1165                 req, struct push_database_state);
1166         bool status;
1167         int ret;
1168
1169         status = push_database_old_recv(subreq, &ret);
1170         if (! status) {
1171                 tevent_req_error(req, ret);
1172                 return;
1173         }
1174
1175         state->old_done = true;
1176
1177         if (state->old_done && state->new_done) {
1178                 tevent_req_done(req);
1179         }
1180 }
1181
1182 static void push_database_new_done(struct tevent_req *subreq)
1183 {
1184         struct tevent_req *req = tevent_req_callback_data(
1185                 subreq, struct tevent_req);
1186         struct push_database_state *state = tevent_req_data(
1187                 req, struct push_database_state);
1188         bool status;
1189         int ret;
1190
1191         status = push_database_new_recv(subreq, &ret);
1192         if (! status) {
1193                 tevent_req_error(req, ret);
1194                 return;
1195         }
1196
1197         state->new_done = true;
1198
1199         if (state->old_done && state->new_done) {
1200                 tevent_req_done(req);
1201         }
1202 }
1203
1204 static bool push_database_recv(struct tevent_req *req, int *perr)
1205 {
1206         return generic_recv(req, perr);
1207 }
1208
1209 /*
1210  * Collect databases using highest sequence number
1211  */
1212
1213 struct collect_highseqnum_db_state {
1214         struct tevent_context *ev;
1215         struct ctdb_client_context *client;
1216         uint32_t *pnn_list;
1217         int count;
1218         uint32_t *caps;
1219         uint32_t db_id;
1220         struct recdb_context *recdb;
1221         uint32_t max_pnn;
1222 };
1223
1224 static void collect_highseqnum_db_seqnum_done(struct tevent_req *subreq);
1225 static void collect_highseqnum_db_pulldb_done(struct tevent_req *subreq);
1226
1227 static struct tevent_req *collect_highseqnum_db_send(
1228                         TALLOC_CTX *mem_ctx,
1229                         struct tevent_context *ev,
1230                         struct ctdb_client_context *client,
1231                         uint32_t *pnn_list, int count, uint32_t *caps,
1232                         uint32_t db_id, struct recdb_context *recdb)
1233 {
1234         struct tevent_req *req, *subreq;
1235         struct collect_highseqnum_db_state *state;
1236         struct ctdb_req_control request;
1237
1238         req = tevent_req_create(mem_ctx, &state,
1239                                 struct collect_highseqnum_db_state);
1240         if (req == NULL) {
1241                 return NULL;
1242         }
1243
1244         state->ev = ev;
1245         state->client = client;
1246         state->pnn_list = pnn_list;
1247         state->count = count;
1248         state->caps = caps;
1249         state->db_id = db_id;
1250         state->recdb = recdb;
1251
1252         ctdb_req_control_get_db_seqnum(&request, db_id);
1253         subreq = ctdb_client_control_multi_send(mem_ctx, ev, client,
1254                                                 state->pnn_list, state->count,
1255                                                 TIMEOUT(), &request);
1256         if (tevent_req_nomem(subreq, req)) {
1257                 return tevent_req_post(req, ev);
1258         }
1259         tevent_req_set_callback(subreq, collect_highseqnum_db_seqnum_done,
1260                                 req);
1261
1262         return req;
1263 }
1264
1265 static void collect_highseqnum_db_seqnum_done(struct tevent_req *subreq)
1266 {
1267         struct tevent_req *req = tevent_req_callback_data(
1268                 subreq, struct tevent_req);
1269         struct collect_highseqnum_db_state *state = tevent_req_data(
1270                 req, struct collect_highseqnum_db_state);
1271         struct ctdb_reply_control **reply;
1272         int *err_list;
1273         bool status;
1274         int ret, i;
1275         uint64_t seqnum, max_seqnum;
1276
1277         status = ctdb_client_control_multi_recv(subreq, &ret, state,
1278                                                 &err_list, &reply);
1279         TALLOC_FREE(subreq);
1280         if (! status) {
1281                 int ret2;
1282                 uint32_t pnn;
1283
1284                 ret2 = ctdb_client_control_multi_error(state->pnn_list,
1285                                                        state->count, err_list,
1286                                                        &pnn);
1287                 if (ret2 != 0) {
1288                         LOG("control GET_DB_SEQNUM failed for %s on node %u,"
1289                             " ret=%d\n", recdb_name(state->recdb), pnn, ret2);
1290                 } else {
1291                         LOG("control GET_DB_SEQNUM failed for %s, ret=%d\n",
1292                             recdb_name(state->recdb), ret);
1293                 }
1294                 tevent_req_error(req, ret);
1295                 return;
1296         }
1297
1298         max_seqnum = 0;
1299         state->max_pnn = state->pnn_list[0];
1300         for (i=0; i<state->count; i++) {
1301                 ret = ctdb_reply_control_get_db_seqnum(reply[i], &seqnum);
1302                 if (ret != 0) {
1303                         tevent_req_error(req, EPROTO);
1304                         return;
1305                 }
1306
1307                 if (max_seqnum < seqnum) {
1308                         max_seqnum = seqnum;
1309                         state->max_pnn = state->pnn_list[i];
1310                 }
1311         }
1312
1313         talloc_free(reply);
1314
1315         LOG("Pull persistent db %s from node %d with seqnum 0x%"PRIx64"\n",
1316             recdb_name(state->recdb), state->max_pnn, max_seqnum);
1317
1318         subreq = pull_database_send(state, state->ev, state->client,
1319                                     state->max_pnn,
1320                                     state->caps[state->max_pnn],
1321                                     state->recdb);
1322         if (tevent_req_nomem(subreq, req)) {
1323                 return;
1324         }
1325         tevent_req_set_callback(subreq, collect_highseqnum_db_pulldb_done,
1326                                 req);
1327 }
1328
1329 static void collect_highseqnum_db_pulldb_done(struct tevent_req *subreq)
1330 {
1331         struct tevent_req *req = tevent_req_callback_data(
1332                 subreq, struct tevent_req);
1333         int ret;
1334         bool status;
1335
1336         status = pull_database_recv(subreq, &ret);
1337         TALLOC_FREE(subreq);
1338         if (! status) {
1339                 tevent_req_error(req, ret);
1340                 return;
1341         }
1342
1343         tevent_req_done(req);
1344 }
1345
1346 static bool collect_highseqnum_db_recv(struct tevent_req *req, int *perr)
1347 {
1348         return generic_recv(req, perr);
1349 }
1350
1351 /*
1352  * Collect all databases
1353  */
1354
1355 struct collect_all_db_state {
1356         struct tevent_context *ev;
1357         struct ctdb_client_context *client;
1358         uint32_t *pnn_list;
1359         int count;
1360         uint32_t *caps;
1361         uint32_t db_id;
1362         struct recdb_context *recdb;
1363         struct ctdb_pulldb pulldb;
1364         int index;
1365 };
1366
1367 static void collect_all_db_pulldb_done(struct tevent_req *subreq);
1368
1369 static struct tevent_req *collect_all_db_send(
1370                         TALLOC_CTX *mem_ctx,
1371                         struct tevent_context *ev,
1372                         struct ctdb_client_context *client,
1373                         uint32_t *pnn_list, int count, uint32_t *caps,
1374                         uint32_t db_id, struct recdb_context *recdb)
1375 {
1376         struct tevent_req *req, *subreq;
1377         struct collect_all_db_state *state;
1378         uint32_t pnn;
1379
1380         req = tevent_req_create(mem_ctx, &state,
1381                                 struct collect_all_db_state);
1382         if (req == NULL) {
1383                 return NULL;
1384         }
1385
1386         state->ev = ev;
1387         state->client = client;
1388         state->pnn_list = pnn_list;
1389         state->count = count;
1390         state->caps = caps;
1391         state->db_id = db_id;
1392         state->recdb = recdb;
1393         state->index = 0;
1394
1395         pnn = state->pnn_list[state->index];
1396
1397         subreq = pull_database_send(state, ev, client, pnn, caps[pnn], recdb);
1398         if (tevent_req_nomem(subreq, req)) {
1399                 return tevent_req_post(req, ev);
1400         }
1401         tevent_req_set_callback(subreq, collect_all_db_pulldb_done, req);
1402
1403         return req;
1404 }
1405
1406 static void collect_all_db_pulldb_done(struct tevent_req *subreq)
1407 {
1408         struct tevent_req *req = tevent_req_callback_data(
1409                 subreq, struct tevent_req);
1410         struct collect_all_db_state *state = tevent_req_data(
1411                 req, struct collect_all_db_state);
1412         uint32_t pnn;
1413         int ret;
1414         bool status;
1415
1416         status = pull_database_recv(subreq, &ret);
1417         TALLOC_FREE(subreq);
1418         if (! status) {
1419                 tevent_req_error(req, ret);
1420                 return;
1421         }
1422
1423         state->index += 1;
1424         if (state->index == state->count) {
1425                 tevent_req_done(req);
1426                 return;
1427         }
1428
1429         pnn = state->pnn_list[state->index];
1430         subreq = pull_database_send(state, state->ev, state->client,
1431                                     pnn, state->caps[pnn], state->recdb);
1432         if (tevent_req_nomem(subreq, req)) {
1433                 return;
1434         }
1435         tevent_req_set_callback(subreq, collect_all_db_pulldb_done, req);
1436 }
1437
1438 static bool collect_all_db_recv(struct tevent_req *req, int *perr)
1439 {
1440         return generic_recv(req, perr);
1441 }
1442
1443
1444 /**
1445  * For each database do the following:
1446  *  - Get DB name
1447  *  - Get DB path
1448  *  - Freeze database on all nodes
1449  *  - Start transaction on all nodes
1450  *  - Collect database from all nodes
1451  *  - Wipe database on all nodes
1452  *  - Push database to all nodes
1453  *  - Commit transaction on all nodes
1454  *  - Thaw database on all nodes
1455  */
1456
1457 struct recover_db_state {
1458         struct tevent_context *ev;
1459         struct ctdb_client_context *client;
1460         struct ctdb_tunable_list *tun_list;
1461         uint32_t *pnn_list;
1462         int count;
1463         uint32_t *caps;
1464         uint32_t db_id;
1465         bool persistent;
1466
1467         uint32_t destnode;
1468         struct ctdb_transdb transdb;
1469
1470         const char *db_name, *db_path;
1471         struct recdb_context *recdb;
1472 };
1473
1474 static void recover_db_name_done(struct tevent_req *subreq);
1475 static void recover_db_path_done(struct tevent_req *subreq);
1476 static void recover_db_freeze_done(struct tevent_req *subreq);
1477 static void recover_db_transaction_started(struct tevent_req *subreq);
1478 static void recover_db_collect_done(struct tevent_req *subreq);
1479 static void recover_db_wipedb_done(struct tevent_req *subreq);
1480 static void recover_db_pushdb_done(struct tevent_req *subreq);
1481 static void recover_db_transaction_committed(struct tevent_req *subreq);
1482 static void recover_db_thaw_done(struct tevent_req *subreq);
1483
1484 static struct tevent_req *recover_db_send(TALLOC_CTX *mem_ctx,
1485                                           struct tevent_context *ev,
1486                                           struct ctdb_client_context *client,
1487                                           struct ctdb_tunable_list *tun_list,
1488                                           uint32_t *pnn_list, int count,
1489                                           uint32_t *caps,
1490                                           uint32_t generation,
1491                                           uint32_t db_id, bool persistent)
1492 {
1493         struct tevent_req *req, *subreq;
1494         struct recover_db_state *state;
1495         struct ctdb_req_control request;
1496
1497         req = tevent_req_create(mem_ctx, &state, struct recover_db_state);
1498         if (req == NULL) {
1499                 return NULL;
1500         }
1501
1502         state->ev = ev;
1503         state->client = client;
1504         state->tun_list = tun_list;
1505         state->pnn_list = pnn_list;
1506         state->count = count;
1507         state->caps = caps;
1508         state->db_id = db_id;
1509         state->persistent = persistent;
1510
1511         state->destnode = ctdb_client_pnn(client);
1512         state->transdb.db_id = db_id;
1513         state->transdb.tid = generation;
1514
1515         ctdb_req_control_get_dbname(&request, db_id);
1516         subreq = ctdb_client_control_send(state, ev, client, state->destnode,
1517                                           TIMEOUT(), &request);
1518         if (tevent_req_nomem(subreq, req)) {
1519                 return tevent_req_post(req, ev);
1520         }
1521         tevent_req_set_callback(subreq, recover_db_name_done, req);
1522
1523         return req;
1524 }
1525
1526 static void recover_db_name_done(struct tevent_req *subreq)
1527 {
1528         struct tevent_req *req = tevent_req_callback_data(
1529                 subreq, struct tevent_req);
1530         struct recover_db_state *state = tevent_req_data(
1531                 req, struct recover_db_state);
1532         struct ctdb_reply_control *reply;
1533         struct ctdb_req_control request;
1534         int ret;
1535         bool status;
1536
1537         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
1538         TALLOC_FREE(subreq);
1539         if (! status) {
1540                 LOG("control GET_DBNAME failed for db=0x%x, ret=%d\n",
1541                     state->db_id, ret);
1542                 tevent_req_error(req, ret);
1543                 return;
1544         }
1545
1546         ret = ctdb_reply_control_get_dbname(reply, state, &state->db_name);
1547         if (ret != 0) {
1548                 LOG("control GET_DBNAME failed for db=0x%x, ret=%d\n",
1549                     state->db_id, ret);
1550                 tevent_req_error(req, EPROTO);
1551                 return;
1552         }
1553
1554         talloc_free(reply);
1555
1556         ctdb_req_control_getdbpath(&request, state->db_id);
1557         subreq = ctdb_client_control_send(state, state->ev, state->client,
1558                                           state->destnode, TIMEOUT(),
1559                                           &request);
1560         if (tevent_req_nomem(subreq, req)) {
1561                 return;
1562         }
1563         tevent_req_set_callback(subreq, recover_db_path_done, req);
1564 }
1565
1566 static void recover_db_path_done(struct tevent_req *subreq)
1567 {
1568         struct tevent_req *req = tevent_req_callback_data(
1569                 subreq, struct tevent_req);
1570         struct recover_db_state *state = tevent_req_data(
1571                 req, struct recover_db_state);
1572         struct ctdb_reply_control *reply;
1573         struct ctdb_req_control request;
1574         int ret;
1575         bool status;
1576
1577         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
1578         TALLOC_FREE(subreq);
1579         if (! status) {
1580                 LOG("control GETDBPATH failed for db %s, ret=%d\n",
1581                     state->db_name, ret);
1582                 tevent_req_error(req, ret);
1583                 return;
1584         }
1585
1586         ret = ctdb_reply_control_getdbpath(reply, state, &state->db_path);
1587         if (ret != 0) {
1588                 LOG("control GETDBPATH failed for db %s, ret=%d\n",
1589                     state->db_name, ret);
1590                 tevent_req_error(req, EPROTO);
1591                 return;
1592         }
1593
1594         talloc_free(reply);
1595
1596         ctdb_req_control_db_freeze(&request, state->db_id);
1597         subreq = ctdb_client_control_multi_send(state, state->ev,
1598                                                 state->client,
1599                                                 state->pnn_list, state->count,
1600                                                 TIMEOUT(), &request);
1601         if (tevent_req_nomem(subreq, req)) {
1602                 return;
1603         }
1604         tevent_req_set_callback(subreq, recover_db_freeze_done, req);
1605 }
1606
1607 static void recover_db_freeze_done(struct tevent_req *subreq)
1608 {
1609         struct tevent_req *req = tevent_req_callback_data(
1610                 subreq, struct tevent_req);
1611         struct recover_db_state *state = tevent_req_data(
1612                 req, struct recover_db_state);
1613         struct ctdb_req_control request;
1614         int *err_list;
1615         int ret;
1616         bool status;
1617
1618         status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
1619                                                 NULL);
1620         TALLOC_FREE(subreq);
1621         if (! status) {
1622                 int ret2;
1623                 uint32_t pnn;
1624
1625                 ret2 = ctdb_client_control_multi_error(state->pnn_list,
1626                                                        state->count, err_list,
1627                                                        &pnn);
1628                 if (ret2 != 0) {
1629                         LOG("control FREEZE_DB failed for db %s on node %u,"
1630                             " ret=%d\n", state->db_name, pnn, ret2);
1631                 } else {
1632                         LOG("control FREEZE_DB failed for db %s, ret=%d\n",
1633                             state->db_name, ret);
1634                 }
1635                 tevent_req_error(req, ret);
1636                 return;
1637         }
1638
1639         ctdb_req_control_db_transaction_start(&request, &state->transdb);
1640         subreq = ctdb_client_control_multi_send(state, state->ev,
1641                                                 state->client,
1642                                                 state->pnn_list, state->count,
1643                                                 TIMEOUT(), &request);
1644         if (tevent_req_nomem(subreq, req)) {
1645                 return;
1646         }
1647         tevent_req_set_callback(subreq, recover_db_transaction_started, req);
1648 }
1649
1650 static void recover_db_transaction_started(struct tevent_req *subreq)
1651 {
1652         struct tevent_req *req = tevent_req_callback_data(
1653                 subreq, struct tevent_req);
1654         struct recover_db_state *state = tevent_req_data(
1655                 req, struct recover_db_state);
1656         int *err_list;
1657         int ret;
1658         bool status;
1659
1660         status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
1661                                                 NULL);
1662         TALLOC_FREE(subreq);
1663         if (! status) {
1664                 int ret2;
1665                 uint32_t pnn;
1666
1667                 ret2 = ctdb_client_control_multi_error(state->pnn_list,
1668                                                        state->count,
1669                                                        err_list, &pnn);
1670                 if (ret2 != 0) {
1671                         LOG("control TRANSACTION_DB failed for db=%s,"
1672                             " ret=%d\n", state->db_name, pnn, ret2);
1673                 } else {
1674                         LOG("control TRANSACTION_DB failed for db=%s,"
1675                             " ret=%d\n", state->db_name, ret);
1676                 }
1677                 tevent_req_error(req, ret);
1678                 return;
1679         }
1680
1681         state->recdb = recdb_create(state, state->db_id, state->db_name,
1682                                     state->db_path,
1683                                     state->tun_list->database_hash_size,
1684                                     state->persistent);
1685         if (tevent_req_nomem(state->recdb, req)) {
1686                 return;
1687         }
1688
1689         if (state->persistent && state->tun_list->recover_pdb_by_seqnum != 0) {
1690                 subreq = collect_highseqnum_db_send(
1691                                 state, state->ev, state->client,
1692                                 state->pnn_list, state->count, state->caps,
1693                                 state->db_id, state->recdb);
1694         } else {
1695                 subreq = collect_all_db_send(
1696                                 state, state->ev, state->client,
1697                                 state->pnn_list, state->count, state->caps,
1698                                 state->db_id, state->recdb);
1699         }
1700         if (tevent_req_nomem(subreq, req)) {
1701                 return;
1702         }
1703         tevent_req_set_callback(subreq, recover_db_collect_done, req);
1704 }
1705
1706 static void recover_db_collect_done(struct tevent_req *subreq)
1707 {
1708         struct tevent_req *req = tevent_req_callback_data(
1709                 subreq, struct tevent_req);
1710         struct recover_db_state *state = tevent_req_data(
1711                 req, struct recover_db_state);
1712         struct ctdb_req_control request;
1713         int ret;
1714         bool status;
1715
1716         if (state->persistent && state->tun_list->recover_pdb_by_seqnum != 0) {
1717                 status = collect_highseqnum_db_recv(subreq, &ret);
1718         } else {
1719                 status = collect_all_db_recv(subreq, &ret);
1720         }
1721         TALLOC_FREE(subreq);
1722         if (! status) {
1723                 tevent_req_error(req, ret);
1724                 return;
1725         }
1726
1727         ctdb_req_control_wipe_database(&request, &state->transdb);
1728         subreq = ctdb_client_control_multi_send(state, state->ev,
1729                                                 state->client,
1730                                                 state->pnn_list, state->count,
1731                                                 TIMEOUT(), &request);
1732         if (tevent_req_nomem(subreq, req)) {
1733                 return;
1734         }
1735         tevent_req_set_callback(subreq, recover_db_wipedb_done, req);
1736 }
1737
1738 static void recover_db_wipedb_done(struct tevent_req *subreq)
1739 {
1740         struct tevent_req *req = tevent_req_callback_data(
1741                 subreq, struct tevent_req);
1742         struct recover_db_state *state = tevent_req_data(
1743                 req, struct recover_db_state);
1744         int *err_list;
1745         int ret;
1746         bool status;
1747
1748         status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
1749                                                 NULL);
1750         TALLOC_FREE(subreq);
1751         if (! status) {
1752                 int ret2;
1753                 uint32_t pnn;
1754
1755                 ret2 = ctdb_client_control_multi_error(state->pnn_list,
1756                                                        state->count,
1757                                                        err_list, &pnn);
1758                 if (ret2 != 0) {
1759                         LOG("control WIPEDB failed for db %s on node %u,"
1760                             " ret=%d\n", state->db_name, pnn, ret2);
1761                 } else {
1762                         LOG("control WIPEDB failed for db %s, ret=%d\n",
1763                             state->db_name, pnn, ret);
1764                 }
1765                 tevent_req_error(req, ret);
1766                 return;
1767         }
1768
1769         subreq = push_database_send(state, state->ev, state->client,
1770                                     state->pnn_list, state->count,
1771                                     state->caps, state->tun_list,
1772                                     state->recdb);
1773         if (tevent_req_nomem(subreq, req)) {
1774                 return;
1775         }
1776         tevent_req_set_callback(subreq, recover_db_pushdb_done, req);
1777 }
1778
1779 static void recover_db_pushdb_done(struct tevent_req *subreq)
1780 {
1781         struct tevent_req *req = tevent_req_callback_data(
1782                 subreq, struct tevent_req);
1783         struct recover_db_state *state = tevent_req_data(
1784                 req, struct recover_db_state);
1785         struct ctdb_req_control request;
1786         int ret;
1787         bool status;
1788
1789         status = push_database_recv(subreq, &ret);
1790         TALLOC_FREE(subreq);
1791         if (! status) {
1792                 tevent_req_error(req, ret);
1793                 return;
1794         }
1795
1796         TALLOC_FREE(state->recdb);
1797
1798         ctdb_req_control_db_transaction_commit(&request, &state->transdb);
1799         subreq = ctdb_client_control_multi_send(state, state->ev,
1800                                                 state->client,
1801                                                 state->pnn_list, state->count,
1802                                                 TIMEOUT(), &request);
1803         if (tevent_req_nomem(subreq, req)) {
1804                 return;
1805         }
1806         tevent_req_set_callback(subreq, recover_db_transaction_committed, req);
1807 }
1808
1809 static void recover_db_transaction_committed(struct tevent_req *subreq)
1810 {
1811         struct tevent_req *req = tevent_req_callback_data(
1812                 subreq, struct tevent_req);
1813         struct recover_db_state *state = tevent_req_data(
1814                 req, struct recover_db_state);
1815         struct ctdb_req_control request;
1816         int *err_list;
1817         int ret;
1818         bool status;
1819
1820         status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
1821                                                 NULL);
1822         TALLOC_FREE(subreq);
1823         if (! status) {
1824                 int ret2;
1825                 uint32_t pnn;
1826
1827                 ret2 = ctdb_client_control_multi_error(state->pnn_list,
1828                                                        state->count,
1829                                                        err_list, &pnn);
1830                 if (ret2 != 0) {
1831                         LOG("control DB_TRANSACTION_COMMIT failed for db %s"
1832                             " on node %u, ret=%d\n", state->db_name, pnn, ret2);
1833                 } else {
1834                         LOG("control DB_TRANSACTION_COMMIT failed for db %s,"
1835                             " ret=%d\n", state->db_name, ret);
1836                 }
1837                 tevent_req_error(req, ret);
1838                 return;
1839         }
1840
1841         ctdb_req_control_db_thaw(&request, state->db_id);
1842         subreq = ctdb_client_control_multi_send(state, state->ev,
1843                                                 state->client,
1844                                                 state->pnn_list, state->count,
1845                                                 TIMEOUT(), &request);
1846         if (tevent_req_nomem(subreq, req)) {
1847                 return;
1848         }
1849         tevent_req_set_callback(subreq, recover_db_thaw_done, req);
1850 }
1851
1852 static void recover_db_thaw_done(struct tevent_req *subreq)
1853 {
1854         struct tevent_req *req = tevent_req_callback_data(
1855                 subreq, struct tevent_req);
1856         struct recover_db_state *state = tevent_req_data(
1857                 req, struct recover_db_state);
1858         int *err_list;
1859         int ret;
1860         bool status;
1861
1862         status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
1863                                                 NULL);
1864         TALLOC_FREE(subreq);
1865         if (! status) {
1866                 int ret2;
1867                 uint32_t pnn;
1868
1869                 ret2 = ctdb_client_control_multi_error(state->pnn_list,
1870                                                        state->count,
1871                                                        err_list, &pnn);
1872                 if (ret2 != 0) {
1873                         LOG("control DB_THAW failed for db %s on node %u,"
1874                             " ret=%d\n", state->db_name, pnn, ret2);
1875                 } else {
1876                         LOG("control DB_THAW failed for db %s, ret=%d\n",
1877                             state->db_name, ret);
1878                 }
1879                 tevent_req_error(req, ret);
1880                 return;
1881         }
1882
1883         tevent_req_done(req);
1884 }
1885
1886 static bool recover_db_recv(struct tevent_req *req)
1887 {
1888         return generic_recv(req, NULL);
1889 }
1890
1891
1892 /*
1893  * Start database recovery for each database
1894  *
1895  * Try to recover each database 5 times before failing recovery.
1896  */
1897
1898 struct db_recovery_state {
1899         struct tevent_context *ev;
1900         struct ctdb_dbid_map *dbmap;
1901         int num_replies;
1902         int num_failed;
1903 };
1904
1905 struct db_recovery_one_state {
1906         struct tevent_req *req;
1907         struct ctdb_client_context *client;
1908         struct ctdb_dbid_map *dbmap;
1909         struct ctdb_tunable_list *tun_list;
1910         uint32_t *pnn_list;
1911         int count;
1912         uint32_t *caps;
1913         uint32_t generation;
1914         uint32_t db_id;
1915         bool persistent;
1916         int num_fails;
1917 };
1918
1919 static void db_recovery_one_done(struct tevent_req *subreq);
1920
1921 static struct tevent_req *db_recovery_send(TALLOC_CTX *mem_ctx,
1922                                            struct tevent_context *ev,
1923                                            struct ctdb_client_context *client,
1924                                            struct ctdb_dbid_map *dbmap,
1925                                            struct ctdb_tunable_list *tun_list,
1926                                            uint32_t *pnn_list, int count,
1927                                            uint32_t *caps,
1928                                            uint32_t generation)
1929 {
1930         struct tevent_req *req, *subreq;
1931         struct db_recovery_state *state;
1932         int i;
1933
1934         req = tevent_req_create(mem_ctx, &state, struct db_recovery_state);
1935         if (req == NULL) {
1936                 return NULL;
1937         }
1938
1939         state->ev = ev;
1940         state->dbmap = dbmap;
1941         state->num_replies = 0;
1942         state->num_failed = 0;
1943
1944         if (dbmap->num == 0) {
1945                 tevent_req_done(req);
1946                 return tevent_req_post(req, ev);
1947         }
1948
1949         for (i=0; i<dbmap->num; i++) {
1950                 struct db_recovery_one_state *substate;
1951
1952                 substate = talloc_zero(state, struct db_recovery_one_state);
1953                 if (tevent_req_nomem(substate, req)) {
1954                         return tevent_req_post(req, ev);
1955                 }
1956
1957                 substate->req = req;
1958                 substate->client = client;
1959                 substate->dbmap = dbmap;
1960                 substate->tun_list = tun_list;
1961                 substate->pnn_list = pnn_list;
1962                 substate->count = count;
1963                 substate->caps = caps;
1964                 substate->generation = generation;
1965                 substate->db_id = dbmap->dbs[i].db_id;
1966                 substate->persistent = dbmap->dbs[i].flags &
1967                                        CTDB_DB_FLAGS_PERSISTENT;
1968
1969                 subreq = recover_db_send(state, ev, client, tun_list,
1970                                          pnn_list, count, caps,
1971                                          generation, substate->db_id,
1972                                          substate->persistent);
1973                 if (tevent_req_nomem(subreq, req)) {
1974                         return tevent_req_post(req, ev);
1975                 }
1976                 tevent_req_set_callback(subreq, db_recovery_one_done,
1977                                         substate);
1978                 LOG("recover database 0x%08x\n", substate->db_id);
1979         }
1980
1981         return req;
1982 }
1983
1984 static void db_recovery_one_done(struct tevent_req *subreq)
1985 {
1986         struct db_recovery_one_state *substate = tevent_req_callback_data(
1987                 subreq, struct db_recovery_one_state);
1988         struct tevent_req *req = substate->req;
1989         struct db_recovery_state *state = tevent_req_data(
1990                 req, struct db_recovery_state);
1991         bool status;
1992
1993         status = recover_db_recv(subreq);
1994         TALLOC_FREE(subreq);
1995
1996         if (status) {
1997                 talloc_free(substate);
1998                 goto done;
1999         }
2000
2001         substate->num_fails += 1;
2002         if (substate->num_fails < 5) {
2003                 subreq = recover_db_send(state, state->ev, substate->client,
2004                                          substate->tun_list,
2005                                          substate->pnn_list, substate->count,
2006                                          substate->caps,
2007                                          substate->generation, substate->db_id,
2008                                          substate->persistent);
2009                 if (tevent_req_nomem(subreq, req)) {
2010                         goto failed;
2011                 }
2012                 tevent_req_set_callback(subreq, db_recovery_one_done, substate);
2013                 LOG("recover database 0x%08x, attempt %d\n", substate->db_id,
2014                     substate->num_fails+1);
2015                 return;
2016         }
2017
2018 failed:
2019         state->num_failed += 1;
2020
2021 done:
2022         state->num_replies += 1;
2023
2024         if (state->num_replies == state->dbmap->num) {
2025                 tevent_req_done(req);
2026         }
2027 }
2028
2029 static bool db_recovery_recv(struct tevent_req *req, int *count)
2030 {
2031         struct db_recovery_state *state = tevent_req_data(
2032                 req, struct db_recovery_state);
2033         int err;
2034
2035         if (tevent_req_is_unix_error(req, &err)) {
2036                 *count = 0;
2037                 return false;
2038         }
2039
2040         *count = state->num_replies - state->num_failed;
2041
2042         if (state->num_failed > 0) {
2043                 return false;
2044         }
2045
2046         return true;
2047 }
2048
2049
2050 /*
2051  * Run the parallel database recovery
2052  *
2053  * - Get tunables
2054  * - Get nodemap
2055  * - Get vnnmap
2056  * - Get capabilities from all nodes
2057  * - Get dbmap
2058  * - Set RECOVERY_ACTIVE
2059  * - Send START_RECOVERY
2060  * - Update vnnmap on all nodes
2061  * - Run database recovery
2062  * - Send END_RECOVERY
2063  * - Set RECOVERY_NORMAL
2064  */
2065
2066 struct recovery_state {
2067         struct tevent_context *ev;
2068         struct ctdb_client_context *client;
2069         uint32_t generation;
2070         uint32_t *pnn_list;
2071         int count;
2072         uint32_t destnode;
2073         struct ctdb_node_map *nodemap;
2074         uint32_t *caps;
2075         struct ctdb_tunable_list *tun_list;
2076         struct ctdb_vnn_map *vnnmap;
2077         struct ctdb_dbid_map *dbmap;
2078 };
2079
2080 static void recovery_tunables_done(struct tevent_req *subreq);
2081 static void recovery_nodemap_done(struct tevent_req *subreq);
2082 static void recovery_vnnmap_done(struct tevent_req *subreq);
2083 static void recovery_capabilities_done(struct tevent_req *subreq);
2084 static void recovery_dbmap_done(struct tevent_req *subreq);
2085 static void recovery_active_done(struct tevent_req *subreq);
2086 static void recovery_start_recovery_done(struct tevent_req *subreq);
2087 static void recovery_vnnmap_update_done(struct tevent_req *subreq);
2088 static void recovery_db_recovery_done(struct tevent_req *subreq);
2089 static void recovery_normal_done(struct tevent_req *subreq);
2090 static void recovery_end_recovery_done(struct tevent_req *subreq);
2091
2092 static struct tevent_req *recovery_send(TALLOC_CTX *mem_ctx,
2093                                         struct tevent_context *ev,
2094                                         struct ctdb_client_context *client,
2095                                         uint32_t generation)
2096 {
2097         struct tevent_req *req, *subreq;
2098         struct recovery_state *state;
2099         struct ctdb_req_control request;
2100
2101         req = tevent_req_create(mem_ctx, &state, struct recovery_state);
2102         if (req == NULL) {
2103                 return NULL;
2104         }
2105
2106         state->ev = ev;
2107         state->client = client;
2108         state->generation = generation;
2109         state->destnode = ctdb_client_pnn(client);
2110
2111         ctdb_req_control_get_all_tunables(&request);
2112         subreq = ctdb_client_control_send(state, state->ev, state->client,
2113                                           state->destnode, TIMEOUT(),
2114                                           &request);
2115         if (tevent_req_nomem(subreq, req)) {
2116                 return tevent_req_post(req, ev);
2117         }
2118         tevent_req_set_callback(subreq, recovery_tunables_done, req);
2119
2120         return req;
2121 }
2122
2123 static void recovery_tunables_done(struct tevent_req *subreq)
2124 {
2125         struct tevent_req *req = tevent_req_callback_data(
2126                 subreq, struct tevent_req);
2127         struct recovery_state *state = tevent_req_data(
2128                 req, struct recovery_state);
2129         struct ctdb_reply_control *reply;
2130         struct ctdb_req_control request;
2131         int ret;
2132         bool status;
2133
2134         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
2135         TALLOC_FREE(subreq);
2136         if (! status) {
2137                 LOG("control GET_ALL_TUNABLES failed, ret=%d\n", ret);
2138                 tevent_req_error(req, ret);
2139                 return;
2140         }
2141
2142         ret = ctdb_reply_control_get_all_tunables(reply, state,
2143                                                   &state->tun_list);
2144         if (ret != 0) {
2145                 LOG("control GET_ALL_TUNABLES failed, ret=%d\n", ret);
2146                 tevent_req_error(req, EPROTO);
2147                 return;
2148         }
2149
2150         talloc_free(reply);
2151
2152         recover_timeout = state->tun_list->recover_timeout;
2153
2154         ctdb_req_control_get_nodemap(&request);
2155         subreq = ctdb_client_control_send(state, state->ev, state->client,
2156                                           state->destnode, TIMEOUT(),
2157                                           &request);
2158         if (tevent_req_nomem(subreq, req)) {
2159                 return;
2160         }
2161         tevent_req_set_callback(subreq, recovery_nodemap_done, req);
2162 }
2163
2164 static void recovery_nodemap_done(struct tevent_req *subreq)
2165 {
2166         struct tevent_req *req = tevent_req_callback_data(
2167                 subreq, struct tevent_req);
2168         struct recovery_state *state = tevent_req_data(
2169                 req, struct recovery_state);
2170         struct ctdb_reply_control *reply;
2171         struct ctdb_req_control request;
2172         bool status;
2173         int ret;
2174
2175         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
2176         TALLOC_FREE(subreq);
2177         if (! status) {
2178                 LOG("control GET_NODEMAP failed to node %u, ret=%d\n",
2179                     state->destnode, ret);
2180                 tevent_req_error(req, ret);
2181                 return;
2182         }
2183
2184         ret = ctdb_reply_control_get_nodemap(reply, state, &state->nodemap);
2185         if (ret != 0) {
2186                 LOG("control GET_NODEMAP failed, ret=%d\n", ret);
2187                 tevent_req_error(req, ret);
2188                 return;
2189         }
2190
2191         state->count = list_of_active_nodes(state->nodemap, CTDB_UNKNOWN_PNN,
2192                                             state, &state->pnn_list);
2193         if (state->count <= 0) {
2194                 tevent_req_error(req, ENOMEM);
2195                 return;
2196         }
2197
2198         ctdb_req_control_getvnnmap(&request);
2199         subreq = ctdb_client_control_send(state, state->ev, state->client,
2200                                           state->destnode, TIMEOUT(),
2201                                           &request);
2202         if (tevent_req_nomem(subreq, req)) {
2203                 return;
2204         }
2205         tevent_req_set_callback(subreq, recovery_vnnmap_done, req);
2206 }
2207
2208 static void recovery_vnnmap_done(struct tevent_req *subreq)
2209 {
2210         struct tevent_req *req = tevent_req_callback_data(
2211                 subreq, struct tevent_req);
2212         struct recovery_state *state = tevent_req_data(
2213                 req, struct recovery_state);
2214         struct ctdb_reply_control *reply;
2215         struct ctdb_req_control request;
2216         bool status;
2217         int ret;
2218
2219         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
2220         TALLOC_FREE(subreq);
2221         if (! status) {
2222                 LOG("control GETVNNMAP failed to node %u, ret=%d\n",
2223                     state->destnode, ret);
2224                 tevent_req_error(req, ret);
2225                 return;
2226         }
2227
2228         ret = ctdb_reply_control_getvnnmap(reply, state, &state->vnnmap);
2229         if (ret != 0) {
2230                 LOG("control GETVNNMAP failed, ret=%d\n", ret);
2231                 tevent_req_error(req, ret);
2232                 return;
2233         }
2234
2235         ctdb_req_control_get_capabilities(&request);
2236         subreq = ctdb_client_control_multi_send(state, state->ev,
2237                                                 state->client,
2238                                                 state->pnn_list, state->count,
2239                                                 TIMEOUT(), &request);
2240         if (tevent_req_nomem(subreq, req)) {
2241                 return;
2242         }
2243         tevent_req_set_callback(subreq, recovery_capabilities_done, req);
2244 }
2245
2246 static void recovery_capabilities_done(struct tevent_req *subreq)
2247 {
2248         struct tevent_req *req = tevent_req_callback_data(
2249                 subreq, struct tevent_req);
2250         struct recovery_state *state = tevent_req_data(
2251                 req, struct recovery_state);
2252         struct ctdb_reply_control **reply;
2253         struct ctdb_req_control request;
2254         int *err_list;
2255         int ret, i;
2256         bool status;
2257
2258         status = ctdb_client_control_multi_recv(subreq, &ret, state, &err_list,
2259                                                 &reply);
2260         TALLOC_FREE(subreq);
2261         if (! status) {
2262                 int ret2;
2263                 uint32_t pnn;
2264
2265                 ret2 = ctdb_client_control_multi_error(state->pnn_list,
2266                                                        state->count,
2267                                                        err_list, &pnn);
2268                 if (ret2 != 0) {
2269                         LOG("control GET_CAPABILITIES failed on node %u,"
2270                             " ret=%d\n", pnn, ret2);
2271                 } else {
2272                         LOG("control GET_CAPABILITIES failed, ret=%d\n", ret);
2273                 }
2274                 tevent_req_error(req, ret);
2275                 return;
2276         }
2277
2278         /* Make the array size same as nodemap */
2279         state->caps = talloc_zero_array(state, uint32_t,
2280                                         state->nodemap->num);
2281         if (tevent_req_nomem(state->caps, req)) {
2282                 return;
2283         }
2284
2285         for (i=0; i<state->count; i++) {
2286                 uint32_t pnn;
2287
2288                 pnn = state->pnn_list[i];
2289                 ret = ctdb_reply_control_get_capabilities(reply[i],
2290                                                           &state->caps[pnn]);
2291                 if (ret != 0) {
2292                         LOG("control GET_CAPABILITIES failed on node %u\n", pnn);
2293                         tevent_req_error(req, EPROTO);
2294                         return;
2295                 }
2296         }
2297
2298         talloc_free(reply);
2299
2300         ctdb_req_control_get_dbmap(&request);
2301         subreq = ctdb_client_control_send(state, state->ev, state->client,
2302                                           state->destnode, TIMEOUT(),
2303                                           &request);
2304         if (tevent_req_nomem(subreq, req)) {
2305                 return;
2306         }
2307         tevent_req_set_callback(subreq, recovery_dbmap_done, req);
2308 }
2309
2310 static void recovery_dbmap_done(struct tevent_req *subreq)
2311 {
2312         struct tevent_req *req = tevent_req_callback_data(
2313                 subreq, struct tevent_req);
2314         struct recovery_state *state = tevent_req_data(
2315                 req, struct recovery_state);
2316         struct ctdb_reply_control *reply;
2317         struct ctdb_req_control request;
2318         int ret;
2319         bool status;
2320
2321         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
2322         TALLOC_FREE(subreq);
2323         if (! status) {
2324                 LOG("control GET_DBMAP failed to node %u, ret=%d\n",
2325                     state->destnode, ret);
2326                 tevent_req_error(req, ret);
2327                 return;
2328         }
2329
2330         ret = ctdb_reply_control_get_dbmap(reply, state, &state->dbmap);
2331         if (ret != 0) {
2332                 LOG("control GET_DBMAP failed, ret=%d\n", ret);
2333                 tevent_req_error(req, ret);
2334                 return;
2335         }
2336
2337         ctdb_req_control_set_recmode(&request, CTDB_RECOVERY_ACTIVE);
2338         subreq = ctdb_client_control_multi_send(state, state->ev,
2339                                                 state->client,
2340                                                 state->pnn_list, state->count,
2341                                                 TIMEOUT(), &request);
2342         if (tevent_req_nomem(subreq, req)) {
2343                 return;
2344         }
2345         tevent_req_set_callback(subreq, recovery_active_done, req);
2346 }
2347
2348 static void recovery_active_done(struct tevent_req *subreq)
2349 {
2350         struct tevent_req *req = tevent_req_callback_data(
2351                 subreq, struct tevent_req);
2352         struct recovery_state *state = tevent_req_data(
2353                 req, struct recovery_state);
2354         struct ctdb_req_control request;
2355         struct ctdb_vnn_map *vnnmap;
2356         int *err_list;
2357         int ret, count, i;
2358         bool status;
2359
2360         status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
2361                                                 NULL);
2362         TALLOC_FREE(subreq);
2363         if (! status) {
2364                 int ret2;
2365                 uint32_t pnn;
2366
2367                 ret2 = ctdb_client_control_multi_error(state->pnn_list,
2368                                                        state->count,
2369                                                        err_list, &pnn);
2370                 if (ret2 != 0) {
2371                         LOG("failed to set recovery mode to ACTIVE on node %u,"
2372                             " ret=%d\n", pnn, ret2);
2373                 } else {
2374                         LOG("failed to set recovery mode to ACTIVE, ret=%d\n",
2375                             ret);
2376                 }
2377                 tevent_req_error(req, ret);
2378                 return;
2379         }
2380
2381         LOG("set recovery mode to ACTIVE\n");
2382
2383         /* Calculate new VNNMAP */
2384         count = 0;
2385         for (i=0; i<state->nodemap->num; i++) {
2386                 if (state->nodemap->node[i].flags & NODE_FLAGS_INACTIVE) {
2387                         continue;
2388                 }
2389                 if (!(state->caps[i] & CTDB_CAP_LMASTER)) {
2390                         continue;
2391                 }
2392                 count += 1;
2393         }
2394
2395         if (count == 0) {
2396                 LOG("no active lmasters found. Adding recmaster anyway\n");
2397         }
2398
2399         vnnmap = talloc_zero(state, struct ctdb_vnn_map);
2400         if (tevent_req_nomem(vnnmap, req)) {
2401                 return;
2402         }
2403
2404         vnnmap->size = (count == 0 ? 1 : count);
2405         vnnmap->map = talloc_array(vnnmap, uint32_t, vnnmap->size);
2406         if (tevent_req_nomem(vnnmap->map, req)) {
2407                 return;
2408         }
2409
2410         if (count == 0) {
2411                 vnnmap->map[0] = state->destnode;
2412         } else {
2413                 count = 0;
2414                 for (i=0; i<state->nodemap->num; i++) {
2415                         if (state->nodemap->node[i].flags &
2416                             NODE_FLAGS_INACTIVE) {
2417                                 continue;
2418                         }
2419                         if (!(state->caps[i] & CTDB_CAP_LMASTER)) {
2420                                 continue;
2421                         }
2422
2423                         vnnmap->map[count] = state->nodemap->node[i].pnn;
2424                         count += 1;
2425                 }
2426         }
2427
2428         vnnmap->generation = state->generation;
2429
2430         talloc_free(state->vnnmap);
2431         state->vnnmap = vnnmap;
2432
2433         ctdb_req_control_start_recovery(&request);
2434         subreq = ctdb_client_control_multi_send(state, state->ev,
2435                                                 state->client,
2436                                                 state->pnn_list, state->count,
2437                                                 TIMEOUT(), &request);
2438         if (tevent_req_nomem(subreq, req)) {
2439                 return;
2440         }
2441         tevent_req_set_callback(subreq, recovery_start_recovery_done, req);
2442 }
2443
2444 static void recovery_start_recovery_done(struct tevent_req *subreq)
2445 {
2446         struct tevent_req *req = tevent_req_callback_data(
2447                 subreq, struct tevent_req);
2448         struct recovery_state *state = tevent_req_data(
2449                 req, struct recovery_state);
2450         struct ctdb_req_control request;
2451         int *err_list;
2452         int ret;
2453         bool status;
2454
2455         status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
2456                                                 NULL);
2457         TALLOC_FREE(subreq);
2458         if (! status) {
2459                 int ret2;
2460                 uint32_t pnn;
2461
2462                 ret2 = ctdb_client_control_multi_error(state->pnn_list,
2463                                                        state->count,
2464                                                        err_list, &pnn);
2465                 if (ret2 != 0) {
2466                         LOG("failed to run start_recovery event on node %u,"
2467                             " ret=%d\n", pnn, ret2);
2468                 } else {
2469                         LOG("failed to run start_recovery event, ret=%d\n",
2470                             ret);
2471                 }
2472                 tevent_req_error(req, ret);
2473                 return;
2474         }
2475
2476         LOG("start_recovery event finished\n");
2477
2478         ctdb_req_control_setvnnmap(&request, state->vnnmap);
2479         subreq = ctdb_client_control_multi_send(state, state->ev,
2480                                                 state->client,
2481                                                 state->pnn_list, state->count,
2482                                                 TIMEOUT(), &request);
2483         if (tevent_req_nomem(subreq, req)) {
2484                 return;
2485         }
2486         tevent_req_set_callback(subreq, recovery_vnnmap_update_done, req);
2487 }
2488
2489 static void recovery_vnnmap_update_done(struct tevent_req *subreq)
2490 {
2491         struct tevent_req *req = tevent_req_callback_data(
2492                 subreq, struct tevent_req);
2493         struct recovery_state *state = tevent_req_data(
2494                 req, struct recovery_state);
2495         int *err_list;
2496         int ret;
2497         bool status;
2498
2499         status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
2500                                                 NULL);
2501         TALLOC_FREE(subreq);
2502         if (! status) {
2503                 int ret2;
2504                 uint32_t pnn;
2505
2506                 ret2 = ctdb_client_control_multi_error(state->pnn_list,
2507                                                        state->count,
2508                                                        err_list, &pnn);
2509                 if (ret2 != 0) {
2510                         LOG("failed to update VNNMAP on node %u, ret=%d\n",
2511                             pnn, ret2);
2512                 } else {
2513                         LOG("failed to update VNNMAP, ret=%d\n", ret);
2514                 }
2515                 tevent_req_error(req, ret);
2516                 return;
2517         }
2518
2519         LOG("updated VNNMAP\n");
2520
2521         subreq = db_recovery_send(state, state->ev, state->client,
2522                                   state->dbmap, state->tun_list,
2523                                   state->pnn_list, state->count,
2524                                   state->caps, state->vnnmap->generation);
2525         if (tevent_req_nomem(subreq, req)) {
2526                 return;
2527         }
2528         tevent_req_set_callback(subreq, recovery_db_recovery_done, req);
2529 }
2530
2531 static void recovery_db_recovery_done(struct tevent_req *subreq)
2532 {
2533         struct tevent_req *req = tevent_req_callback_data(
2534                 subreq, struct tevent_req);
2535         struct recovery_state *state = tevent_req_data(
2536                 req, struct recovery_state);
2537         struct ctdb_req_control request;
2538         bool status;
2539         int count;
2540
2541         status = db_recovery_recv(subreq, &count);
2542         TALLOC_FREE(subreq);
2543
2544         LOG("%d databases recovered\n", count);
2545
2546         if (! status) {
2547                 tevent_req_error(req, EIO);
2548                 return;
2549         }
2550
2551         ctdb_req_control_set_recmode(&request, CTDB_RECOVERY_NORMAL);
2552         subreq = ctdb_client_control_multi_send(state, state->ev,
2553                                                 state->client,
2554                                                 state->pnn_list, state->count,
2555                                                 TIMEOUT(), &request);
2556         if (tevent_req_nomem(subreq, req)) {
2557                 return;
2558         }
2559         tevent_req_set_callback(subreq, recovery_normal_done, req);
2560 }
2561
2562 static void recovery_normal_done(struct tevent_req *subreq)
2563 {
2564         struct tevent_req *req = tevent_req_callback_data(
2565                 subreq, struct tevent_req);
2566         struct recovery_state *state = tevent_req_data(
2567                 req, struct recovery_state);
2568         struct ctdb_req_control request;
2569         int *err_list;
2570         int ret;
2571         bool status;
2572
2573         status = ctdb_client_control_multi_recv(subreq, &ret, state, &err_list,
2574                                                 NULL);
2575         TALLOC_FREE(subreq);
2576         if (! status) {
2577                 int ret2;
2578                 uint32_t pnn;
2579
2580                 ret2 = ctdb_client_control_multi_error(state->pnn_list,
2581                                                        state->count,
2582                                                        err_list, &pnn);
2583                 if (ret2 != 0) {
2584                         LOG("failed to set recovery mode to NORMAL on node %u,"
2585                             " ret=%d\n", pnn, ret2);
2586                 } else {
2587                         LOG("failed to set recovery mode to NORMAL, ret=%d\n",
2588                             ret);
2589                 }
2590                 tevent_req_error(req, ret);
2591                 return;
2592         }
2593
2594         LOG("set recovery mode to NORMAL\n");
2595
2596         ctdb_req_control_end_recovery(&request);
2597         subreq = ctdb_client_control_multi_send(state, state->ev,
2598                                                 state->client,
2599                                                 state->pnn_list, state->count,
2600                                                 TIMEOUT(), &request);
2601         if (tevent_req_nomem(subreq, req)) {
2602                 return;
2603         }
2604         tevent_req_set_callback(subreq, recovery_end_recovery_done, req);
2605 }
2606
2607 static void recovery_end_recovery_done(struct tevent_req *subreq)
2608 {
2609         struct tevent_req *req = tevent_req_callback_data(
2610                 subreq, struct tevent_req);
2611         struct recovery_state *state = tevent_req_data(
2612                 req, struct recovery_state);
2613         int *err_list;
2614         int ret;
2615         bool status;
2616
2617         status = ctdb_client_control_multi_recv(subreq, &ret, state, &err_list,
2618                                                 NULL);
2619         TALLOC_FREE(subreq);
2620         if (! status) {
2621                 int ret2;
2622                 uint32_t pnn;
2623
2624                 ret2 = ctdb_client_control_multi_error(state->pnn_list,
2625                                                        state->count,
2626                                                        err_list, &pnn);
2627                 if (ret2 != 0) {
2628                         LOG("failed to run recovered event on node %u,"
2629                             " ret=%d\n", pnn, ret2);
2630                 } else {
2631                         LOG("failed to run recovered event, ret=%d\n", ret);
2632                 }
2633                 tevent_req_error(req, ret);
2634                 return;
2635         }
2636
2637         LOG("recovered event finished\n");
2638
2639         tevent_req_done(req);
2640 }
2641
2642 static void recovery_recv(struct tevent_req *req, int *perr)
2643 {
2644         generic_recv(req, perr);
2645 }
2646
2647 static void usage(const char *progname)
2648 {
2649         fprintf(stderr, "\nUsage: %s <log-fd> <output-fd> <ctdb-socket-path> <generation>\n",
2650                 progname);
2651 }
2652
2653
2654 /*
2655  * Arguments - log fd, write fd, socket path, generation
2656  */
2657 int main(int argc, char *argv[])
2658 {
2659         int log_fd, write_fd;
2660         const char *sockpath;
2661         TALLOC_CTX *mem_ctx;
2662         struct tevent_context *ev;
2663         struct ctdb_client_context *client;
2664         int ret;
2665         struct tevent_req *req;
2666         uint32_t generation;
2667
2668         if (argc != 5) {
2669                 usage(argv[0]);
2670                 exit(1);
2671         }
2672
2673         log_fd = atoi(argv[1]);
2674         if (log_fd != STDOUT_FILENO && log_fd != STDERR_FILENO) {
2675                 close(STDOUT_FILENO);
2676                 close(STDERR_FILENO);
2677                 dup2(log_fd, STDOUT_FILENO);
2678                 dup2(log_fd, STDERR_FILENO);
2679         }
2680         close(log_fd);
2681
2682         write_fd = atoi(argv[2]);
2683         sockpath = argv[3];
2684         generation = (uint32_t)strtoul(argv[4], NULL, 0);
2685
2686         mem_ctx = talloc_new(NULL);
2687         if (mem_ctx == NULL) {
2688                 LOG("talloc_new() failed\n");
2689                 goto failed;
2690         }
2691
2692         ev = tevent_context_init(mem_ctx);
2693         if (ev == NULL) {
2694                 LOG("tevent_context_init() failed\n");
2695                 goto failed;
2696         }
2697
2698         ret = ctdb_client_init(mem_ctx, ev, sockpath, &client);
2699         if (ret != 0) {
2700                 LOG("ctdb_client_init() failed, ret=%d\n", ret);
2701                 goto failed;
2702         }
2703
2704         req = recovery_send(mem_ctx, ev, client, generation);
2705         if (req == NULL) {
2706                 LOG("database_recover_send() failed\n");
2707                 goto failed;
2708         }
2709
2710         if (! tevent_req_poll(req, ev)) {
2711                 LOG("tevent_req_poll() failed\n");
2712                 goto failed;
2713         }
2714
2715         recovery_recv(req, &ret);
2716         TALLOC_FREE(req);
2717         if (ret != 0) {
2718                 LOG("database recovery failed, ret=%d\n", ret);
2719                 goto failed;
2720         }
2721
2722         sys_write(write_fd, &ret, sizeof(ret));
2723         return 0;
2724
2725 failed:
2726         talloc_free(mem_ctx);
2727         return 1;
2728 }