ctdb-tools: Stop "ctdb nodestatus" from always showing all nodes
[nivanova/samba-autobuild/.git] / ctdb / server / ctdb_recovery_helper.c
1 /*
2    ctdb parallel database recovery
3
4    Copyright (C) Amitay Isaacs  2015
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "replace.h"
21 #include "system/network.h"
22 #include "system/filesys.h"
23
24 #include <talloc.h>
25 #include <tevent.h>
26 #include <tdb.h>
27 #include <libgen.h>
28
29 #include "lib/tdb_wrap/tdb_wrap.h"
30 #include "lib/util/sys_rw.h"
31 #include "lib/util/time.h"
32 #include "lib/util/tevent_unix.h"
33
34 #include "protocol/protocol.h"
35 #include "protocol/protocol_api.h"
36 #include "client/client.h"
37
38 #include "common/logging.h"
39
40 static int recover_timeout = 30;
41
42 #define NUM_RETRIES     3
43
44 #define TIMEOUT()       timeval_current_ofs(recover_timeout, 0)
45
46 #define LOG(...)        DEBUG(DEBUG_NOTICE, (__VA_ARGS__))
47
48 /*
49  * Utility functions
50  */
51
52 static bool generic_recv(struct tevent_req *req, int *perr)
53 {
54         int err;
55
56         if (tevent_req_is_unix_error(req, &err)) {
57                 if (perr != NULL) {
58                         *perr = err;
59                 }
60                 return false;
61         }
62
63         return true;
64 }
65
66 static uint64_t rec_srvid = CTDB_SRVID_RECOVERY;
67
68 static uint64_t srvid_next(void)
69 {
70         rec_srvid += 1;
71         return rec_srvid;
72 }
73
74 /*
75  * Recovery database functions
76  */
77
78 struct recdb_context {
79         uint32_t db_id;
80         const char *db_name;
81         const char *db_path;
82         struct tdb_wrap *db;
83         bool persistent;
84 };
85
86 static struct recdb_context *recdb_create(TALLOC_CTX *mem_ctx, uint32_t db_id,
87                                           const char *db_name,
88                                           const char *db_path,
89                                           uint32_t hash_size, bool persistent)
90 {
91         static char *db_dir_state = NULL;
92         struct recdb_context *recdb;
93         unsigned int tdb_flags;
94
95         recdb = talloc(mem_ctx, struct recdb_context);
96         if (recdb == NULL) {
97                 return NULL;
98         }
99
100         if (db_dir_state == NULL) {
101                 db_dir_state = getenv("CTDB_DBDIR_STATE");
102         }
103
104         recdb->db_name = db_name;
105         recdb->db_id = db_id;
106         recdb->db_path = talloc_asprintf(recdb, "%s/recdb.%s",
107                                          db_dir_state != NULL ?
108                                             db_dir_state :
109                                             dirname(discard_const(db_path)),
110                                          db_name);
111         if (recdb->db_path == NULL) {
112                 talloc_free(recdb);
113                 return NULL;
114         }
115         unlink(recdb->db_path);
116
117         tdb_flags = TDB_NOLOCK | TDB_INCOMPATIBLE_HASH | TDB_DISALLOW_NESTING;
118         recdb->db = tdb_wrap_open(mem_ctx, recdb->db_path, hash_size,
119                                   tdb_flags, O_RDWR|O_CREAT|O_EXCL, 0600);
120         if (recdb->db == NULL) {
121                 talloc_free(recdb);
122                 LOG("failed to create recovery db %s\n", recdb->db_path);
123                 return NULL;
124         }
125
126         recdb->persistent = persistent;
127
128         return recdb;
129 }
130
131 static uint32_t recdb_id(struct recdb_context *recdb)
132 {
133         return recdb->db_id;
134 }
135
136 static const char *recdb_name(struct recdb_context *recdb)
137 {
138         return recdb->db_name;
139 }
140
141 static const char *recdb_path(struct recdb_context *recdb)
142 {
143         return recdb->db_path;
144 }
145
146 static struct tdb_context *recdb_tdb(struct recdb_context *recdb)
147 {
148         return recdb->db->tdb;
149 }
150
151 static bool recdb_persistent(struct recdb_context *recdb)
152 {
153         return recdb->persistent;
154 }
155
156 struct recdb_add_traverse_state {
157         struct recdb_context *recdb;
158         int mypnn;
159 };
160
161 static int recdb_add_traverse(uint32_t reqid, struct ctdb_ltdb_header *header,
162                               TDB_DATA key, TDB_DATA data,
163                               void *private_data)
164 {
165         struct recdb_add_traverse_state *state =
166                 (struct recdb_add_traverse_state *)private_data;
167         struct ctdb_ltdb_header *hdr;
168         TDB_DATA prev_data;
169         int ret;
170
171         /* header is not marshalled separately in the pulldb control */
172         if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
173                 return -1;
174         }
175
176         hdr = (struct ctdb_ltdb_header *)data.dptr;
177
178         /* fetch the existing record, if any */
179         prev_data = tdb_fetch(recdb_tdb(state->recdb), key);
180
181         if (prev_data.dptr != NULL) {
182                 struct ctdb_ltdb_header prev_hdr;
183
184                 prev_hdr = *(struct ctdb_ltdb_header *)prev_data.dptr;
185                 free(prev_data.dptr);
186                 if (hdr->rsn < prev_hdr.rsn ||
187                     (hdr->rsn == prev_hdr.rsn &&
188                      prev_hdr.dmaster != state->mypnn)) {
189                         return 0;
190                 }
191         }
192
193         ret = tdb_store(recdb_tdb(state->recdb), key, data, TDB_REPLACE);
194         if (ret != 0) {
195                 return -1;
196         }
197         return 0;
198 }
199
200 static bool recdb_add(struct recdb_context *recdb, int mypnn,
201                       struct ctdb_rec_buffer *recbuf)
202 {
203         struct recdb_add_traverse_state state;
204         int ret;
205
206         state.recdb = recdb;
207         state.mypnn = mypnn;
208
209         ret = ctdb_rec_buffer_traverse(recbuf, recdb_add_traverse, &state);
210         if (ret != 0) {
211                 return false;
212         }
213
214         return true;
215 }
216
217 /* This function decides which records from recdb are retained */
218 static int recbuf_filter_add(struct ctdb_rec_buffer *recbuf, bool persistent,
219                              uint32_t reqid, uint32_t dmaster,
220                              TDB_DATA key, TDB_DATA data)
221 {
222         struct ctdb_ltdb_header *header;
223         int ret;
224
225         /*
226          * skip empty records - but NOT for persistent databases:
227          *
228          * The record-by-record mode of recovery deletes empty records.
229          * For persistent databases, this can lead to data corruption
230          * by deleting records that should be there:
231          *
232          * - Assume the cluster has been running for a while.
233          *
234          * - A record R in a persistent database has been created and
235          *   deleted a couple of times, the last operation being deletion,
236          *   leaving an empty record with a high RSN, say 10.
237          *
238          * - Now a node N is turned off.
239          *
240          * - This leaves the local database copy of D on N with the empty
241          *   copy of R and RSN 10. On all other nodes, the recovery has deleted
242          *   the copy of record R.
243          *
244          * - Now the record is created again while node N is turned off.
245          *   This creates R with RSN = 1 on all nodes except for N.
246          *
247          * - Now node N is turned on again. The following recovery will chose
248          *   the older empty copy of R due to RSN 10 > RSN 1.
249          *
250          * ==> Hence the record is gone after the recovery.
251          *
252          * On databases like Samba's registry, this can damage the higher-level
253          * data structures built from the various tdb-level records.
254          */
255         if (!persistent && data.dsize <= sizeof(struct ctdb_ltdb_header)) {
256                 return 0;
257         }
258
259         /* update the dmaster field to point to us */
260         header = (struct ctdb_ltdb_header *)data.dptr;
261         if (!persistent) {
262                 header->dmaster = dmaster;
263                 header->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
264         }
265
266         ret = ctdb_rec_buffer_add(recbuf, recbuf, reqid, NULL, key, data);
267         if (ret != 0) {
268                 return ret;
269         }
270
271         return 0;
272 }
273
274 struct recdb_records_traverse_state {
275         struct ctdb_rec_buffer *recbuf;
276         uint32_t dmaster;
277         uint32_t reqid;
278         bool persistent;
279         bool failed;
280 };
281
282 static int recdb_records_traverse(struct tdb_context *tdb,
283                                   TDB_DATA key, TDB_DATA data,
284                                   void *private_data)
285 {
286         struct recdb_records_traverse_state *state =
287                 (struct recdb_records_traverse_state *)private_data;
288         int ret;
289
290         ret = recbuf_filter_add(state->recbuf, state->persistent,
291                                 state->reqid, state->dmaster, key, data);
292         if (ret != 0) {
293                 state->failed = true;
294                 return ret;
295         }
296
297         return 0;
298 }
299
300 static struct ctdb_rec_buffer *recdb_records(struct recdb_context *recdb,
301                                              TALLOC_CTX *mem_ctx,
302                                              uint32_t dmaster)
303 {
304         struct recdb_records_traverse_state state;
305         int ret;
306
307         state.recbuf = ctdb_rec_buffer_init(mem_ctx, recdb_id(recdb));
308         if (state.recbuf == NULL) {
309                 return NULL;
310         }
311         state.dmaster = dmaster;
312         state.reqid = 0;
313         state.persistent = recdb_persistent(recdb);
314         state.failed = false;
315
316         ret = tdb_traverse_read(recdb_tdb(recdb), recdb_records_traverse,
317                                 &state);
318         if (ret == -1 || state.failed) {
319                 LOG("Failed to marshall recovery records for %s\n",
320                     recdb_name(recdb));
321                 TALLOC_FREE(state.recbuf);
322                 return NULL;
323         }
324
325         return state.recbuf;
326 }
327
328 struct recdb_file_traverse_state {
329         struct ctdb_rec_buffer *recbuf;
330         struct recdb_context *recdb;
331         TALLOC_CTX *mem_ctx;
332         uint32_t dmaster;
333         uint32_t reqid;
334         bool persistent;
335         bool failed;
336         int fd;
337         int max_size;
338         int num_buffers;
339 };
340
341 static int recdb_file_traverse(struct tdb_context *tdb,
342                                TDB_DATA key, TDB_DATA data,
343                                void *private_data)
344 {
345         struct recdb_file_traverse_state *state =
346                 (struct recdb_file_traverse_state *)private_data;
347         int ret;
348
349         ret = recbuf_filter_add(state->recbuf, state->persistent,
350                                 state->reqid, state->dmaster, key, data);
351         if (ret != 0) {
352                 state->failed = true;
353                 return ret;
354         }
355
356         if (ctdb_rec_buffer_len(state->recbuf) > state->max_size) {
357                 ret = ctdb_rec_buffer_write(state->recbuf, state->fd);
358                 if (ret != 0) {
359                         LOG("Failed to collect recovery records for %s\n",
360                             recdb_name(state->recdb));
361                         state->failed = true;
362                         return ret;
363                 }
364
365                 state->num_buffers += 1;
366
367                 TALLOC_FREE(state->recbuf);
368                 state->recbuf = ctdb_rec_buffer_init(state->mem_ctx,
369                                                      recdb_id(state->recdb));
370                 if (state->recbuf == NULL) {
371                         state->failed = true;
372                         return ENOMEM;
373                 }
374         }
375
376         return 0;
377 }
378
379 static int recdb_file(struct recdb_context *recdb, TALLOC_CTX *mem_ctx,
380                       uint32_t dmaster, int fd, int max_size)
381 {
382         struct recdb_file_traverse_state state;
383         int ret;
384
385         state.recbuf = ctdb_rec_buffer_init(mem_ctx, recdb_id(recdb));
386         if (state.recbuf == NULL) {
387                 return -1;
388         }
389         state.recdb = recdb;
390         state.mem_ctx = mem_ctx;
391         state.dmaster = dmaster;
392         state.reqid = 0;
393         state.persistent = recdb_persistent(recdb);
394         state.failed = false;
395         state.fd = fd;
396         state.max_size = max_size;
397         state.num_buffers = 0;
398
399         ret = tdb_traverse_read(recdb_tdb(recdb), recdb_file_traverse, &state);
400         if (ret == -1 || state.failed) {
401                 TALLOC_FREE(state.recbuf);
402                 return -1;
403         }
404
405         ret = ctdb_rec_buffer_write(state.recbuf, fd);
406         if (ret != 0) {
407                 LOG("Failed to collect recovery records for %s\n",
408                     recdb_name(recdb));
409                 TALLOC_FREE(state.recbuf);
410                 return -1;
411         }
412         state.num_buffers += 1;
413
414         LOG("Wrote %d buffers of recovery records for %s\n",
415             state.num_buffers, recdb_name(recdb));
416
417         return state.num_buffers;
418 }
419
420 /*
421  * Pull database from a single node
422  */
423
424 struct pull_database_state {
425         struct tevent_context *ev;
426         struct ctdb_client_context *client;
427         struct recdb_context *recdb;
428         uint32_t pnn;
429         uint64_t srvid;
430         int num_records;
431 };
432
433 static void pull_database_handler(uint64_t srvid, TDB_DATA data,
434                                   void *private_data);
435 static void pull_database_register_done(struct tevent_req *subreq);
436 static void pull_database_old_done(struct tevent_req *subreq);
437 static void pull_database_unregister_done(struct tevent_req *subreq);
438 static void pull_database_new_done(struct tevent_req *subreq);
439
440 static struct tevent_req *pull_database_send(
441                         TALLOC_CTX *mem_ctx,
442                         struct tevent_context *ev,
443                         struct ctdb_client_context *client,
444                         uint32_t pnn, uint32_t caps,
445                         struct recdb_context *recdb)
446 {
447         struct tevent_req *req, *subreq;
448         struct pull_database_state *state;
449         struct ctdb_req_control request;
450
451         req = tevent_req_create(mem_ctx, &state, struct pull_database_state);
452         if (req == NULL) {
453                 return NULL;
454         }
455
456         state->ev = ev;
457         state->client = client;
458         state->recdb = recdb;
459         state->pnn = pnn;
460         state->srvid = srvid_next();
461
462         if (caps & CTDB_CAP_FRAGMENTED_CONTROLS) {
463                 subreq = ctdb_client_set_message_handler_send(
464                                         state, state->ev, state->client,
465                                         state->srvid, pull_database_handler,
466                                         req);
467                 if (tevent_req_nomem(subreq, req)) {
468                         return tevent_req_post(req, ev);
469                 }
470
471                 tevent_req_set_callback(subreq, pull_database_register_done,
472                                         req);
473
474         } else {
475                 struct ctdb_pulldb pulldb;
476
477                 pulldb.db_id = recdb_id(recdb);
478                 pulldb.lmaster = CTDB_LMASTER_ANY;
479
480                 ctdb_req_control_pull_db(&request, &pulldb);
481                 subreq = ctdb_client_control_send(state, state->ev,
482                                                   state->client,
483                                                   pnn, TIMEOUT(),
484                                                   &request);
485                 if (tevent_req_nomem(subreq, req)) {
486                         return tevent_req_post(req, ev);
487                 }
488                 tevent_req_set_callback(subreq, pull_database_old_done, req);
489         }
490
491         return req;
492 }
493
494 static void pull_database_handler(uint64_t srvid, TDB_DATA data,
495                                   void *private_data)
496 {
497         struct tevent_req *req = talloc_get_type_abort(
498                 private_data, struct tevent_req);
499         struct pull_database_state *state = tevent_req_data(
500                 req, struct pull_database_state);
501         struct ctdb_rec_buffer *recbuf;
502         int ret;
503         bool status;
504
505         if (srvid != state->srvid) {
506                 return;
507         }
508
509         ret = ctdb_rec_buffer_pull(data.dptr, data.dsize, state, &recbuf);
510         if (ret != 0) {
511                 LOG("Invalid data received for DB_PULL messages\n");
512                 return;
513         }
514
515         if (recbuf->db_id != recdb_id(state->recdb)) {
516                 talloc_free(recbuf);
517                 LOG("Invalid dbid:%08x for DB_PULL messages for %s\n",
518                     recbuf->db_id, recdb_name(state->recdb));
519                 return;
520         }
521
522         status = recdb_add(state->recdb, ctdb_client_pnn(state->client),
523                            recbuf);
524         if (! status) {
525                 talloc_free(recbuf);
526                 LOG("Failed to add records to recdb for %s\n",
527                     recdb_name(state->recdb));
528                 return;
529         }
530
531         state->num_records += recbuf->count;
532         talloc_free(recbuf);
533 }
534
535 static void pull_database_register_done(struct tevent_req *subreq)
536 {
537         struct tevent_req *req = tevent_req_callback_data(
538                 subreq, struct tevent_req);
539         struct pull_database_state *state = tevent_req_data(
540                 req, struct pull_database_state);
541         struct ctdb_req_control request;
542         struct ctdb_pulldb_ext pulldb_ext;
543         int ret;
544         bool status;
545
546         status = ctdb_client_set_message_handler_recv(subreq, &ret);
547         TALLOC_FREE(subreq);
548         if (! status) {
549                 LOG("failed to set message handler for DB_PULL for %s\n",
550                     recdb_name(state->recdb));
551                 tevent_req_error(req, ret);
552                 return;
553         }
554
555         pulldb_ext.db_id = recdb_id(state->recdb);
556         pulldb_ext.lmaster = CTDB_LMASTER_ANY;
557         pulldb_ext.srvid = state->srvid;
558
559         ctdb_req_control_db_pull(&request, &pulldb_ext);
560         subreq = ctdb_client_control_send(state, state->ev, state->client,
561                                           state->pnn, TIMEOUT(), &request);
562         if (tevent_req_nomem(subreq, req)) {
563                 return;
564         }
565         tevent_req_set_callback(subreq, pull_database_new_done, req);
566 }
567
568 static void pull_database_old_done(struct tevent_req *subreq)
569 {
570         struct tevent_req *req = tevent_req_callback_data(
571                 subreq, struct tevent_req);
572         struct pull_database_state *state = tevent_req_data(
573                 req, struct pull_database_state);
574         struct ctdb_reply_control *reply;
575         struct ctdb_rec_buffer *recbuf;
576         int ret;
577         bool status;
578
579         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
580         TALLOC_FREE(subreq);
581         if (! status) {
582                 LOG("control PULL_DB failed for %s on node %u, ret=%d\n",
583                     recdb_name(state->recdb), state->pnn, ret);
584                 tevent_req_error(req, ret);
585                 return;
586         }
587
588         ret = ctdb_reply_control_pull_db(reply, state, &recbuf);
589         talloc_free(reply);
590         if (ret != 0) {
591                 tevent_req_error(req, ret);
592                 return;
593         }
594
595         status = recdb_add(state->recdb, ctdb_client_pnn(state->client),
596                            recbuf);
597         if (! status) {
598                 talloc_free(recbuf);
599                 tevent_req_error(req, EIO);
600                 return;
601         }
602
603         state->num_records = recbuf->count;
604         talloc_free(recbuf);
605
606         LOG("Pulled %d records for db %s from node %d\n",
607             state->num_records, recdb_name(state->recdb), state->pnn);
608
609         tevent_req_done(req);
610 }
611
612 static void pull_database_new_done(struct tevent_req *subreq)
613 {
614         struct tevent_req *req = tevent_req_callback_data(
615                 subreq, struct tevent_req);
616         struct pull_database_state *state = tevent_req_data(
617                 req, struct pull_database_state);
618         struct ctdb_reply_control *reply;
619         uint32_t num_records;
620         int ret;
621         bool status;
622
623         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
624         TALLOC_FREE(subreq);
625         if (! status) {
626                 LOG("control DB_PULL failed for %s on node %u, ret=%d\n",
627                     recdb_name(state->recdb), state->pnn, ret);
628                 tevent_req_error(req, ret);
629                 return;
630         }
631
632         ret = ctdb_reply_control_db_pull(reply, &num_records);
633         talloc_free(reply);
634         if (num_records != state->num_records) {
635                 LOG("mismatch (%u != %u) in DB_PULL records for %s\n",
636                     num_records, state->num_records, recdb_name(state->recdb));
637                 tevent_req_error(req, EIO);
638                 return;
639         }
640
641         LOG("Pulled %d records for db %s from node %d\n",
642             state->num_records, recdb_name(state->recdb), state->pnn);
643
644         subreq = ctdb_client_remove_message_handler_send(
645                                         state, state->ev, state->client,
646                                         state->srvid, req);
647         if (tevent_req_nomem(subreq, req)) {
648                 return;
649         }
650         tevent_req_set_callback(subreq, pull_database_unregister_done, req);
651 }
652
653 static void pull_database_unregister_done(struct tevent_req *subreq)
654 {
655         struct tevent_req *req = tevent_req_callback_data(
656                 subreq, struct tevent_req);
657         struct pull_database_state *state = tevent_req_data(
658                 req, struct pull_database_state);
659         int ret;
660         bool status;
661
662         status = ctdb_client_remove_message_handler_recv(subreq, &ret);
663         TALLOC_FREE(subreq);
664         if (! status) {
665                 LOG("failed to remove message handler for DB_PULL for %s\n",
666                     recdb_name(state->recdb));
667                 tevent_req_error(req, ret);
668                 return;
669         }
670
671         tevent_req_done(req);
672 }
673
674 static bool pull_database_recv(struct tevent_req *req, int *perr)
675 {
676         return generic_recv(req, perr);
677 }
678
679 /*
680  * Push database to specified nodes (old style)
681  */
682
683 struct push_database_old_state {
684         struct tevent_context *ev;
685         struct ctdb_client_context *client;
686         struct recdb_context *recdb;
687         uint32_t *pnn_list;
688         int count;
689         struct ctdb_rec_buffer *recbuf;
690         int index;
691 };
692
693 static void push_database_old_push_done(struct tevent_req *subreq);
694
695 static struct tevent_req *push_database_old_send(
696                         TALLOC_CTX *mem_ctx,
697                         struct tevent_context *ev,
698                         struct ctdb_client_context *client,
699                         uint32_t *pnn_list, int count,
700                         struct recdb_context *recdb)
701 {
702         struct tevent_req *req, *subreq;
703         struct push_database_old_state *state;
704         struct ctdb_req_control request;
705         uint32_t pnn;
706
707         req = tevent_req_create(mem_ctx, &state,
708                                 struct push_database_old_state);
709         if (req == NULL) {
710                 return NULL;
711         }
712
713         state->ev = ev;
714         state->client = client;
715         state->recdb = recdb;
716         state->pnn_list = pnn_list;
717         state->count = count;
718         state->index = 0;
719
720         state->recbuf = recdb_records(recdb, state,
721                                       ctdb_client_pnn(client));
722         if (tevent_req_nomem(state->recbuf, req)) {
723                 return tevent_req_post(req, ev);
724         }
725
726         pnn = state->pnn_list[state->index];
727
728         ctdb_req_control_push_db(&request, state->recbuf);
729         subreq = ctdb_client_control_send(state, ev, client, pnn,
730                                           TIMEOUT(), &request);
731         if (tevent_req_nomem(subreq, req)) {
732                 return tevent_req_post(req, ev);
733         }
734         tevent_req_set_callback(subreq, push_database_old_push_done, req);
735
736         return req;
737 }
738
739 static void push_database_old_push_done(struct tevent_req *subreq)
740 {
741         struct tevent_req *req = tevent_req_callback_data(
742                 subreq, struct tevent_req);
743         struct push_database_old_state *state = tevent_req_data(
744                 req, struct push_database_old_state);
745         struct ctdb_req_control request;
746         uint32_t pnn;
747         int ret;
748         bool status;
749
750         status = ctdb_client_control_recv(subreq, &ret, NULL, NULL);
751         TALLOC_FREE(subreq);
752         if (! status) {
753                 LOG("control PUSH_DB failed for db %s on node %u, ret=%d\n",
754                     recdb_name(state->recdb), state->pnn_list[state->index],
755                     ret);
756                 tevent_req_error(req, ret);
757                 return;
758         }
759
760         state->index += 1;
761         if (state->index == state->count) {
762                 TALLOC_FREE(state->recbuf);
763                 tevent_req_done(req);
764                 return;
765         }
766
767         pnn = state->pnn_list[state->index];
768
769         ctdb_req_control_push_db(&request, state->recbuf);
770         subreq = ctdb_client_control_send(state, state->ev, state->client,
771                                           pnn, TIMEOUT(), &request);
772         if (tevent_req_nomem(subreq, req)) {
773                 return;
774         }
775         tevent_req_set_callback(subreq, push_database_old_push_done, req);
776 }
777
778 static bool push_database_old_recv(struct tevent_req *req, int *perr)
779 {
780         return generic_recv(req, perr);
781 }
782
783 /*
784  * Push database to specified nodes (new style)
785  */
786
787 struct push_database_new_state {
788         struct tevent_context *ev;
789         struct ctdb_client_context *client;
790         struct recdb_context *recdb;
791         uint32_t *pnn_list;
792         int count;
793         uint64_t srvid;
794         uint32_t dmaster;
795         int fd;
796         int num_buffers;
797         int num_buffers_sent;
798         int num_records;
799 };
800
801 static void push_database_new_started(struct tevent_req *subreq);
802 static void push_database_new_send_msg(struct tevent_req *req);
803 static void push_database_new_send_done(struct tevent_req *subreq);
804 static void push_database_new_confirmed(struct tevent_req *subreq);
805
806 static struct tevent_req *push_database_new_send(
807                         TALLOC_CTX *mem_ctx,
808                         struct tevent_context *ev,
809                         struct ctdb_client_context *client,
810                         uint32_t *pnn_list, int count,
811                         struct recdb_context *recdb,
812                         int max_size)
813 {
814         struct tevent_req *req, *subreq;
815         struct push_database_new_state *state;
816         struct ctdb_req_control request;
817         struct ctdb_pulldb_ext pulldb_ext;
818         char *filename;
819         off_t offset;
820
821         req = tevent_req_create(mem_ctx, &state,
822                                 struct push_database_new_state);
823         if (req == NULL) {
824                 return NULL;
825         }
826
827         state->ev = ev;
828         state->client = client;
829         state->recdb = recdb;
830         state->pnn_list = pnn_list;
831         state->count = count;
832
833         state->srvid = srvid_next();
834         state->dmaster = ctdb_client_pnn(client);
835         state->num_buffers_sent = 0;
836         state->num_records = 0;
837
838         filename = talloc_asprintf(state, "%s.dat", recdb_path(recdb));
839         if (tevent_req_nomem(filename, req)) {
840                 return tevent_req_post(req, ev);
841         }
842
843         state->fd = open(filename, O_RDWR|O_CREAT, 0644);
844         if (state->fd == -1) {
845                 tevent_req_error(req, errno);
846                 return tevent_req_post(req, ev);
847         }
848         unlink(filename);
849         talloc_free(filename);
850
851         state->num_buffers = recdb_file(recdb, state, state->dmaster,
852                                         state->fd, max_size);
853         if (state->num_buffers == -1) {
854                 tevent_req_error(req, ENOMEM);
855                 return tevent_req_post(req, ev);
856         }
857
858         offset = lseek(state->fd, 0, SEEK_SET);
859         if (offset != 0) {
860                 tevent_req_error(req, EIO);
861                 return tevent_req_post(req, ev);
862         }
863
864         pulldb_ext.db_id = recdb_id(recdb);
865         pulldb_ext.srvid = state->srvid;
866
867         ctdb_req_control_db_push_start(&request, &pulldb_ext);
868         subreq = ctdb_client_control_multi_send(state, ev, client,
869                                                 pnn_list, count,
870                                                 TIMEOUT(), &request);
871         if (tevent_req_nomem(subreq, req)) {
872                 return tevent_req_post(req, ev);
873         }
874         tevent_req_set_callback(subreq, push_database_new_started, req);
875
876         return req;
877 }
878
879 static void push_database_new_started(struct tevent_req *subreq)
880 {
881         struct tevent_req *req = tevent_req_callback_data(
882                 subreq, struct tevent_req);
883         struct push_database_new_state *state = tevent_req_data(
884                 req, struct push_database_new_state);
885         int *err_list;
886         int ret;
887         bool status;
888
889         status = ctdb_client_control_multi_recv(subreq, &ret, state,
890                                                 &err_list, NULL);
891         TALLOC_FREE(subreq);
892         if (! status) {
893                 int ret2;
894                 uint32_t pnn;
895
896                 ret2 = ctdb_client_control_multi_error(state->pnn_list,
897                                                        state->count,
898                                                        err_list, &pnn);
899                 if (ret2 != 0) {
900                         LOG("control DB_PUSH_START failed for db %s "
901                             "on node %u, ret=%d\n",
902                             recdb_name(state->recdb), pnn, ret2);
903                 } else {
904                         LOG("control DB_PUSH_START failed for db %s, ret=%d\n",
905                             recdb_name(state->recdb), ret);
906                 }
907                 talloc_free(err_list);
908
909                 tevent_req_error(req, ret);
910                 return;
911         }
912
913         push_database_new_send_msg(req);
914 }
915
916 static void push_database_new_send_msg(struct tevent_req *req)
917 {
918         struct push_database_new_state *state = tevent_req_data(
919                 req, struct push_database_new_state);
920         struct tevent_req *subreq;
921         struct ctdb_rec_buffer *recbuf;
922         struct ctdb_req_message message;
923         TDB_DATA data;
924         int ret;
925
926         if (state->num_buffers_sent == state->num_buffers) {
927                 struct ctdb_req_control request;
928
929                 ctdb_req_control_db_push_confirm(&request,
930                                                  recdb_id(state->recdb));
931                 subreq = ctdb_client_control_multi_send(state, state->ev,
932                                                         state->client,
933                                                         state->pnn_list,
934                                                         state->count,
935                                                         TIMEOUT(), &request);
936                 if (tevent_req_nomem(subreq, req)) {
937                         return;
938                 }
939                 tevent_req_set_callback(subreq, push_database_new_confirmed,
940                                         req);
941                 return;
942         }
943
944         ret = ctdb_rec_buffer_read(state->fd, state, &recbuf);
945         if (ret != 0) {
946                 tevent_req_error(req, ret);
947                 return;
948         }
949
950         data.dsize = ctdb_rec_buffer_len(recbuf);
951         data.dptr = talloc_size(state, data.dsize);
952         if (tevent_req_nomem(data.dptr, req)) {
953                 return;
954         }
955
956         ctdb_rec_buffer_push(recbuf, data.dptr);
957
958         message.srvid = state->srvid;
959         message.data.data = data;
960
961         LOG("Pushing buffer %d with %d records for %s\n",
962             state->num_buffers_sent, recbuf->count, recdb_name(state->recdb));
963
964         subreq = ctdb_client_message_multi_send(state, state->ev,
965                                                 state->client,
966                                                 state->pnn_list, state->count,
967                                                 &message);
968         if (tevent_req_nomem(subreq, req)) {
969                 return;
970         }
971         tevent_req_set_callback(subreq, push_database_new_send_done, req);
972
973         state->num_records += recbuf->count;
974
975         talloc_free(data.dptr);
976         talloc_free(recbuf);
977 }
978
979 static void push_database_new_send_done(struct tevent_req *subreq)
980 {
981         struct tevent_req *req = tevent_req_callback_data(
982                 subreq, struct tevent_req);
983         struct push_database_new_state *state = tevent_req_data(
984                 req, struct push_database_new_state);
985         bool status;
986         int ret;
987
988         status = ctdb_client_message_multi_recv(subreq, &ret, NULL, NULL);
989         TALLOC_FREE(subreq);
990         if (! status) {
991                 LOG("Sending recovery records failed for %s\n",
992                     recdb_name(state->recdb));
993                 tevent_req_error(req, ret);
994                 return;
995         }
996
997         state->num_buffers_sent += 1;
998
999         push_database_new_send_msg(req);
1000 }
1001
1002 static void push_database_new_confirmed(struct tevent_req *subreq)
1003 {
1004         struct tevent_req *req = tevent_req_callback_data(
1005                 subreq, struct tevent_req);
1006         struct push_database_new_state *state = tevent_req_data(
1007                 req, struct push_database_new_state);
1008         struct ctdb_reply_control **reply;
1009         int *err_list;
1010         bool status;
1011         int ret, i;
1012         uint32_t num_records;
1013
1014         status = ctdb_client_control_multi_recv(subreq, &ret, state,
1015                                                 &err_list, &reply);
1016         TALLOC_FREE(subreq);
1017         if (! status) {
1018                 int ret2;
1019                 uint32_t pnn;
1020
1021                 ret2 = ctdb_client_control_multi_error(state->pnn_list,
1022                                                        state->count, err_list,
1023                                                        &pnn);
1024                 if (ret2 != 0) {
1025                         LOG("control DB_PUSH_CONFIRM failed for %s on node %u,"
1026                             " ret=%d\n", recdb_name(state->recdb), pnn, ret2);
1027                 } else {
1028                         LOG("control DB_PUSH_CONFIRM failed for %s, ret=%d\n",
1029                             recdb_name(state->recdb), ret);
1030                 }
1031                 tevent_req_error(req, ret);
1032                 return;
1033         }
1034
1035         for (i=0; i<state->count; i++) {
1036                 ret = ctdb_reply_control_db_push_confirm(reply[i],
1037                                                          &num_records);
1038                 if (ret != 0) {
1039                         tevent_req_error(req, EPROTO);
1040                         return;
1041                 }
1042
1043                 if (num_records != state->num_records) {
1044                         LOG("Node %u received %d of %d records for %s\n",
1045                             state->pnn_list[i], num_records,
1046                             state->num_records, recdb_name(state->recdb));
1047                         tevent_req_error(req, EPROTO);
1048                         return;
1049                 }
1050         }
1051
1052         talloc_free(reply);
1053
1054         LOG("Pushed %d records for db %s\n",
1055             state->num_records, recdb_name(state->recdb));
1056
1057         tevent_req_done(req);
1058 }
1059
1060 static bool push_database_new_recv(struct tevent_req *req, int *perr)
1061 {
1062         return generic_recv(req, perr);
1063 }
1064
1065 /*
1066  * wrapper for push_database_old and push_database_new
1067  */
1068
1069 struct push_database_state {
1070         bool old_done, new_done;
1071 };
1072
1073 static void push_database_old_done(struct tevent_req *subreq);
1074 static void push_database_new_done(struct tevent_req *subreq);
1075
1076 static struct tevent_req *push_database_send(
1077                         TALLOC_CTX *mem_ctx,
1078                         struct tevent_context *ev,
1079                         struct ctdb_client_context *client,
1080                         uint32_t *pnn_list, int count, uint32_t *caps,
1081                         struct ctdb_tunable_list *tun_list,
1082                         struct recdb_context *recdb)
1083 {
1084         struct tevent_req *req, *subreq;
1085         struct push_database_state *state;
1086         uint32_t *old_list, *new_list;
1087         int old_count, new_count;
1088         int i;
1089
1090         req = tevent_req_create(mem_ctx, &state, struct push_database_state);
1091         if (req == NULL) {
1092                 return NULL;
1093         }
1094
1095         state->old_done = false;
1096         state->new_done = false;
1097
1098         old_count = 0;
1099         new_count = 0;
1100         old_list = talloc_array(state, uint32_t, count);
1101         new_list = talloc_array(state, uint32_t, count);
1102         if (tevent_req_nomem(old_list, req) ||
1103             tevent_req_nomem(new_list,req)) {
1104                 return tevent_req_post(req, ev);
1105         }
1106
1107         for (i=0; i<count; i++) {
1108                 uint32_t pnn = pnn_list[i];
1109
1110                 if (caps[pnn] & CTDB_CAP_FRAGMENTED_CONTROLS) {
1111                         new_list[new_count] = pnn;
1112                         new_count += 1;
1113                 } else {
1114                         old_list[old_count] = pnn;
1115                         old_count += 1;
1116                 }
1117         }
1118
1119         if (old_count > 0) {
1120                 subreq = push_database_old_send(state, ev, client,
1121                                                 old_list, old_count, recdb);
1122                 if (tevent_req_nomem(subreq, req)) {
1123                         return tevent_req_post(req, ev);
1124                 }
1125                 tevent_req_set_callback(subreq, push_database_old_done, req);
1126         } else {
1127                 state->old_done = true;
1128         }
1129
1130         if (new_count > 0) {
1131                 subreq = push_database_new_send(state, ev, client,
1132                                                 new_list, new_count, recdb,
1133                                                 tun_list->rec_buffer_size_limit);
1134                 if (tevent_req_nomem(subreq, req)) {
1135                         return tevent_req_post(req, ev);
1136                 }
1137                 tevent_req_set_callback(subreq, push_database_new_done, req);
1138         } else {
1139                 state->new_done = true;
1140         }
1141
1142         return req;
1143 }
1144
1145 static void push_database_old_done(struct tevent_req *subreq)
1146 {
1147         struct tevent_req *req = tevent_req_callback_data(
1148                 subreq, struct tevent_req);
1149         struct push_database_state *state = tevent_req_data(
1150                 req, struct push_database_state);
1151         bool status;
1152         int ret;
1153
1154         status = push_database_old_recv(subreq, &ret);
1155         if (! status) {
1156                 tevent_req_error(req, ret);
1157                 return;
1158         }
1159
1160         state->old_done = true;
1161
1162         if (state->old_done && state->new_done) {
1163                 tevent_req_done(req);
1164         }
1165 }
1166
1167 static void push_database_new_done(struct tevent_req *subreq)
1168 {
1169         struct tevent_req *req = tevent_req_callback_data(
1170                 subreq, struct tevent_req);
1171         struct push_database_state *state = tevent_req_data(
1172                 req, struct push_database_state);
1173         bool status;
1174         int ret;
1175
1176         status = push_database_new_recv(subreq, &ret);
1177         if (! status) {
1178                 tevent_req_error(req, ret);
1179                 return;
1180         }
1181
1182         state->new_done = true;
1183
1184         if (state->old_done && state->new_done) {
1185                 tevent_req_done(req);
1186         }
1187 }
1188
1189 static bool push_database_recv(struct tevent_req *req, int *perr)
1190 {
1191         return generic_recv(req, perr);
1192 }
1193
1194 /*
1195  * Collect databases using highest sequence number
1196  */
1197
1198 struct collect_highseqnum_db_state {
1199         struct tevent_context *ev;
1200         struct ctdb_client_context *client;
1201         uint32_t *pnn_list;
1202         int count;
1203         uint32_t *caps;
1204         uint32_t *ban_credits;
1205         uint32_t db_id;
1206         struct recdb_context *recdb;
1207         uint32_t max_pnn;
1208 };
1209
1210 static void collect_highseqnum_db_seqnum_done(struct tevent_req *subreq);
1211 static void collect_highseqnum_db_pulldb_done(struct tevent_req *subreq);
1212
1213 static struct tevent_req *collect_highseqnum_db_send(
1214                         TALLOC_CTX *mem_ctx,
1215                         struct tevent_context *ev,
1216                         struct ctdb_client_context *client,
1217                         uint32_t *pnn_list, int count, uint32_t *caps,
1218                         uint32_t *ban_credits, uint32_t db_id,
1219                         struct recdb_context *recdb)
1220 {
1221         struct tevent_req *req, *subreq;
1222         struct collect_highseqnum_db_state *state;
1223         struct ctdb_req_control request;
1224
1225         req = tevent_req_create(mem_ctx, &state,
1226                                 struct collect_highseqnum_db_state);
1227         if (req == NULL) {
1228                 return NULL;
1229         }
1230
1231         state->ev = ev;
1232         state->client = client;
1233         state->pnn_list = pnn_list;
1234         state->count = count;
1235         state->caps = caps;
1236         state->ban_credits = ban_credits;
1237         state->db_id = db_id;
1238         state->recdb = recdb;
1239
1240         ctdb_req_control_get_db_seqnum(&request, db_id);
1241         subreq = ctdb_client_control_multi_send(mem_ctx, ev, client,
1242                                                 state->pnn_list, state->count,
1243                                                 TIMEOUT(), &request);
1244         if (tevent_req_nomem(subreq, req)) {
1245                 return tevent_req_post(req, ev);
1246         }
1247         tevent_req_set_callback(subreq, collect_highseqnum_db_seqnum_done,
1248                                 req);
1249
1250         return req;
1251 }
1252
1253 static void collect_highseqnum_db_seqnum_done(struct tevent_req *subreq)
1254 {
1255         struct tevent_req *req = tevent_req_callback_data(
1256                 subreq, struct tevent_req);
1257         struct collect_highseqnum_db_state *state = tevent_req_data(
1258                 req, struct collect_highseqnum_db_state);
1259         struct ctdb_reply_control **reply;
1260         int *err_list;
1261         bool status;
1262         int ret, i;
1263         uint64_t seqnum, max_seqnum;
1264
1265         status = ctdb_client_control_multi_recv(subreq, &ret, state,
1266                                                 &err_list, &reply);
1267         TALLOC_FREE(subreq);
1268         if (! status) {
1269                 int ret2;
1270                 uint32_t pnn;
1271
1272                 ret2 = ctdb_client_control_multi_error(state->pnn_list,
1273                                                        state->count, err_list,
1274                                                        &pnn);
1275                 if (ret2 != 0) {
1276                         LOG("control GET_DB_SEQNUM failed for %s on node %u,"
1277                             " ret=%d\n", recdb_name(state->recdb), pnn, ret2);
1278                 } else {
1279                         LOG("control GET_DB_SEQNUM failed for %s, ret=%d\n",
1280                             recdb_name(state->recdb), ret);
1281                 }
1282                 tevent_req_error(req, ret);
1283                 return;
1284         }
1285
1286         max_seqnum = 0;
1287         state->max_pnn = state->pnn_list[0];
1288         for (i=0; i<state->count; i++) {
1289                 ret = ctdb_reply_control_get_db_seqnum(reply[i], &seqnum);
1290                 if (ret != 0) {
1291                         tevent_req_error(req, EPROTO);
1292                         return;
1293                 }
1294
1295                 if (max_seqnum < seqnum) {
1296                         max_seqnum = seqnum;
1297                         state->max_pnn = state->pnn_list[i];
1298                 }
1299         }
1300
1301         talloc_free(reply);
1302
1303         LOG("Pull persistent db %s from node %d with seqnum 0x%"PRIx64"\n",
1304             recdb_name(state->recdb), state->max_pnn, max_seqnum);
1305
1306         subreq = pull_database_send(state, state->ev, state->client,
1307                                     state->max_pnn,
1308                                     state->caps[state->max_pnn],
1309                                     state->recdb);
1310         if (tevent_req_nomem(subreq, req)) {
1311                 return;
1312         }
1313         tevent_req_set_callback(subreq, collect_highseqnum_db_pulldb_done,
1314                                 req);
1315 }
1316
1317 static void collect_highseqnum_db_pulldb_done(struct tevent_req *subreq)
1318 {
1319         struct tevent_req *req = tevent_req_callback_data(
1320                 subreq, struct tevent_req);
1321         struct collect_highseqnum_db_state *state = tevent_req_data(
1322                 req, struct collect_highseqnum_db_state);
1323         int ret;
1324         bool status;
1325
1326         status = pull_database_recv(subreq, &ret);
1327         TALLOC_FREE(subreq);
1328         if (! status) {
1329                 state->ban_credits[state->max_pnn] += 1;
1330                 tevent_req_error(req, ret);
1331                 return;
1332         }
1333
1334         tevent_req_done(req);
1335 }
1336
1337 static bool collect_highseqnum_db_recv(struct tevent_req *req, int *perr)
1338 {
1339         return generic_recv(req, perr);
1340 }
1341
1342 /*
1343  * Collect all databases
1344  */
1345
1346 struct collect_all_db_state {
1347         struct tevent_context *ev;
1348         struct ctdb_client_context *client;
1349         uint32_t *pnn_list;
1350         int count;
1351         uint32_t *caps;
1352         uint32_t *ban_credits;
1353         uint32_t db_id;
1354         struct recdb_context *recdb;
1355         struct ctdb_pulldb pulldb;
1356         int index;
1357 };
1358
1359 static void collect_all_db_pulldb_done(struct tevent_req *subreq);
1360
1361 static struct tevent_req *collect_all_db_send(
1362                         TALLOC_CTX *mem_ctx,
1363                         struct tevent_context *ev,
1364                         struct ctdb_client_context *client,
1365                         uint32_t *pnn_list, int count, uint32_t *caps,
1366                         uint32_t *ban_credits, uint32_t db_id,
1367                         struct recdb_context *recdb)
1368 {
1369         struct tevent_req *req, *subreq;
1370         struct collect_all_db_state *state;
1371         uint32_t pnn;
1372
1373         req = tevent_req_create(mem_ctx, &state,
1374                                 struct collect_all_db_state);
1375         if (req == NULL) {
1376                 return NULL;
1377         }
1378
1379         state->ev = ev;
1380         state->client = client;
1381         state->pnn_list = pnn_list;
1382         state->count = count;
1383         state->caps = caps;
1384         state->ban_credits = ban_credits;
1385         state->db_id = db_id;
1386         state->recdb = recdb;
1387         state->index = 0;
1388
1389         pnn = state->pnn_list[state->index];
1390
1391         subreq = pull_database_send(state, ev, client, pnn, caps[pnn], recdb);
1392         if (tevent_req_nomem(subreq, req)) {
1393                 return tevent_req_post(req, ev);
1394         }
1395         tevent_req_set_callback(subreq, collect_all_db_pulldb_done, req);
1396
1397         return req;
1398 }
1399
1400 static void collect_all_db_pulldb_done(struct tevent_req *subreq)
1401 {
1402         struct tevent_req *req = tevent_req_callback_data(
1403                 subreq, struct tevent_req);
1404         struct collect_all_db_state *state = tevent_req_data(
1405                 req, struct collect_all_db_state);
1406         uint32_t pnn;
1407         int ret;
1408         bool status;
1409
1410         status = pull_database_recv(subreq, &ret);
1411         TALLOC_FREE(subreq);
1412         if (! status) {
1413                 pnn = state->pnn_list[state->index];
1414                 state->ban_credits[pnn] += 1;
1415                 tevent_req_error(req, ret);
1416                 return;
1417         }
1418
1419         state->index += 1;
1420         if (state->index == state->count) {
1421                 tevent_req_done(req);
1422                 return;
1423         }
1424
1425         pnn = state->pnn_list[state->index];
1426         subreq = pull_database_send(state, state->ev, state->client,
1427                                     pnn, state->caps[pnn], state->recdb);
1428         if (tevent_req_nomem(subreq, req)) {
1429                 return;
1430         }
1431         tevent_req_set_callback(subreq, collect_all_db_pulldb_done, req);
1432 }
1433
1434 static bool collect_all_db_recv(struct tevent_req *req, int *perr)
1435 {
1436         return generic_recv(req, perr);
1437 }
1438
1439
1440 /**
1441  * For each database do the following:
1442  *  - Get DB name
1443  *  - Get DB path
1444  *  - Freeze database on all nodes
1445  *  - Start transaction on all nodes
1446  *  - Collect database from all nodes
1447  *  - Wipe database on all nodes
1448  *  - Push database to all nodes
1449  *  - Commit transaction on all nodes
1450  *  - Thaw database on all nodes
1451  */
1452
1453 struct recover_db_state {
1454         struct tevent_context *ev;
1455         struct ctdb_client_context *client;
1456         struct ctdb_tunable_list *tun_list;
1457         uint32_t *pnn_list;
1458         int count;
1459         uint32_t *caps;
1460         uint32_t *ban_credits;
1461         uint32_t db_id;
1462         bool persistent;
1463
1464         uint32_t destnode;
1465         struct ctdb_transdb transdb;
1466
1467         const char *db_name, *db_path;
1468         struct recdb_context *recdb;
1469 };
1470
1471 static void recover_db_name_done(struct tevent_req *subreq);
1472 static void recover_db_path_done(struct tevent_req *subreq);
1473 static void recover_db_freeze_done(struct tevent_req *subreq);
1474 static void recover_db_transaction_started(struct tevent_req *subreq);
1475 static void recover_db_collect_done(struct tevent_req *subreq);
1476 static void recover_db_wipedb_done(struct tevent_req *subreq);
1477 static void recover_db_pushdb_done(struct tevent_req *subreq);
1478 static void recover_db_transaction_committed(struct tevent_req *subreq);
1479 static void recover_db_thaw_done(struct tevent_req *subreq);
1480
1481 static struct tevent_req *recover_db_send(TALLOC_CTX *mem_ctx,
1482                                           struct tevent_context *ev,
1483                                           struct ctdb_client_context *client,
1484                                           struct ctdb_tunable_list *tun_list,
1485                                           uint32_t *pnn_list, int count,
1486                                           uint32_t *caps,
1487                                           uint32_t *ban_credits,
1488                                           uint32_t generation,
1489                                           uint32_t db_id, bool persistent)
1490 {
1491         struct tevent_req *req, *subreq;
1492         struct recover_db_state *state;
1493         struct ctdb_req_control request;
1494
1495         req = tevent_req_create(mem_ctx, &state, struct recover_db_state);
1496         if (req == NULL) {
1497                 return NULL;
1498         }
1499
1500         state->ev = ev;
1501         state->client = client;
1502         state->tun_list = tun_list;
1503         state->pnn_list = pnn_list;
1504         state->count = count;
1505         state->caps = caps;
1506         state->ban_credits = ban_credits;
1507         state->db_id = db_id;
1508         state->persistent = persistent;
1509
1510         state->destnode = ctdb_client_pnn(client);
1511         state->transdb.db_id = db_id;
1512         state->transdb.tid = generation;
1513
1514         ctdb_req_control_get_dbname(&request, db_id);
1515         subreq = ctdb_client_control_send(state, ev, client, state->destnode,
1516                                           TIMEOUT(), &request);
1517         if (tevent_req_nomem(subreq, req)) {
1518                 return tevent_req_post(req, ev);
1519         }
1520         tevent_req_set_callback(subreq, recover_db_name_done, req);
1521
1522         return req;
1523 }
1524
1525 static void recover_db_name_done(struct tevent_req *subreq)
1526 {
1527         struct tevent_req *req = tevent_req_callback_data(
1528                 subreq, struct tevent_req);
1529         struct recover_db_state *state = tevent_req_data(
1530                 req, struct recover_db_state);
1531         struct ctdb_reply_control *reply;
1532         struct ctdb_req_control request;
1533         int ret;
1534         bool status;
1535
1536         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
1537         TALLOC_FREE(subreq);
1538         if (! status) {
1539                 LOG("control GET_DBNAME failed for db=0x%x, ret=%d\n",
1540                     state->db_id, ret);
1541                 tevent_req_error(req, ret);
1542                 return;
1543         }
1544
1545         ret = ctdb_reply_control_get_dbname(reply, state, &state->db_name);
1546         if (ret != 0) {
1547                 LOG("control GET_DBNAME failed for db=0x%x, ret=%d\n",
1548                     state->db_id, ret);
1549                 tevent_req_error(req, EPROTO);
1550                 return;
1551         }
1552
1553         talloc_free(reply);
1554
1555         ctdb_req_control_getdbpath(&request, state->db_id);
1556         subreq = ctdb_client_control_send(state, state->ev, state->client,
1557                                           state->destnode, TIMEOUT(),
1558                                           &request);
1559         if (tevent_req_nomem(subreq, req)) {
1560                 return;
1561         }
1562         tevent_req_set_callback(subreq, recover_db_path_done, req);
1563 }
1564
1565 static void recover_db_path_done(struct tevent_req *subreq)
1566 {
1567         struct tevent_req *req = tevent_req_callback_data(
1568                 subreq, struct tevent_req);
1569         struct recover_db_state *state = tevent_req_data(
1570                 req, struct recover_db_state);
1571         struct ctdb_reply_control *reply;
1572         struct ctdb_req_control request;
1573         int ret;
1574         bool status;
1575
1576         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
1577         TALLOC_FREE(subreq);
1578         if (! status) {
1579                 LOG("control GETDBPATH failed for db %s, ret=%d\n",
1580                     state->db_name, ret);
1581                 tevent_req_error(req, ret);
1582                 return;
1583         }
1584
1585         ret = ctdb_reply_control_getdbpath(reply, state, &state->db_path);
1586         if (ret != 0) {
1587                 LOG("control GETDBPATH failed for db %s, ret=%d\n",
1588                     state->db_name, ret);
1589                 tevent_req_error(req, EPROTO);
1590                 return;
1591         }
1592
1593         talloc_free(reply);
1594
1595         ctdb_req_control_db_freeze(&request, state->db_id);
1596         subreq = ctdb_client_control_multi_send(state, state->ev,
1597                                                 state->client,
1598                                                 state->pnn_list, state->count,
1599                                                 TIMEOUT(), &request);
1600         if (tevent_req_nomem(subreq, req)) {
1601                 return;
1602         }
1603         tevent_req_set_callback(subreq, recover_db_freeze_done, req);
1604 }
1605
1606 static void recover_db_freeze_done(struct tevent_req *subreq)
1607 {
1608         struct tevent_req *req = tevent_req_callback_data(
1609                 subreq, struct tevent_req);
1610         struct recover_db_state *state = tevent_req_data(
1611                 req, struct recover_db_state);
1612         struct ctdb_req_control request;
1613         int *err_list;
1614         int ret;
1615         bool status;
1616
1617         status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
1618                                                 NULL);
1619         TALLOC_FREE(subreq);
1620         if (! status) {
1621                 int ret2;
1622                 uint32_t pnn;
1623
1624                 ret2 = ctdb_client_control_multi_error(state->pnn_list,
1625                                                        state->count, err_list,
1626                                                        &pnn);
1627                 if (ret2 != 0) {
1628                         LOG("control FREEZE_DB failed for db %s on node %u,"
1629                             " ret=%d\n", state->db_name, pnn, ret2);
1630                 } else {
1631                         LOG("control FREEZE_DB failed for db %s, ret=%d\n",
1632                             state->db_name, ret);
1633                 }
1634                 tevent_req_error(req, ret);
1635                 return;
1636         }
1637
1638         ctdb_req_control_db_transaction_start(&request, &state->transdb);
1639         subreq = ctdb_client_control_multi_send(state, state->ev,
1640                                                 state->client,
1641                                                 state->pnn_list, state->count,
1642                                                 TIMEOUT(), &request);
1643         if (tevent_req_nomem(subreq, req)) {
1644                 return;
1645         }
1646         tevent_req_set_callback(subreq, recover_db_transaction_started, req);
1647 }
1648
1649 static void recover_db_transaction_started(struct tevent_req *subreq)
1650 {
1651         struct tevent_req *req = tevent_req_callback_data(
1652                 subreq, struct tevent_req);
1653         struct recover_db_state *state = tevent_req_data(
1654                 req, struct recover_db_state);
1655         int *err_list;
1656         int ret;
1657         bool status;
1658
1659         status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
1660                                                 NULL);
1661         TALLOC_FREE(subreq);
1662         if (! status) {
1663                 int ret2;
1664                 uint32_t pnn;
1665
1666                 ret2 = ctdb_client_control_multi_error(state->pnn_list,
1667                                                        state->count,
1668                                                        err_list, &pnn);
1669                 if (ret2 != 0) {
1670                         LOG("control TRANSACTION_DB failed for db=%s on node %u,"
1671                             " ret=%d\n", state->db_name, pnn, ret2);
1672                 } else {
1673                         LOG("control TRANSACTION_DB failed for db=%s,"
1674                             " ret=%d\n", state->db_name, ret);
1675                 }
1676                 tevent_req_error(req, ret);
1677                 return;
1678         }
1679
1680         state->recdb = recdb_create(state, state->db_id, state->db_name,
1681                                     state->db_path,
1682                                     state->tun_list->database_hash_size,
1683                                     state->persistent);
1684         if (tevent_req_nomem(state->recdb, req)) {
1685                 return;
1686         }
1687
1688         if (state->persistent) {
1689                 subreq = collect_highseqnum_db_send(
1690                                 state, state->ev, state->client,
1691                                 state->pnn_list, state->count, state->caps,
1692                                 state->ban_credits, state->db_id,
1693                                 state->recdb);
1694         } else {
1695                 subreq = collect_all_db_send(
1696                                 state, state->ev, state->client,
1697                                 state->pnn_list, state->count, state->caps,
1698                                 state->ban_credits, state->db_id,
1699                                 state->recdb);
1700         }
1701         if (tevent_req_nomem(subreq, req)) {
1702                 return;
1703         }
1704         tevent_req_set_callback(subreq, recover_db_collect_done, req);
1705 }
1706
1707 static void recover_db_collect_done(struct tevent_req *subreq)
1708 {
1709         struct tevent_req *req = tevent_req_callback_data(
1710                 subreq, struct tevent_req);
1711         struct recover_db_state *state = tevent_req_data(
1712                 req, struct recover_db_state);
1713         struct ctdb_req_control request;
1714         int ret;
1715         bool status;
1716
1717         if (state->persistent) {
1718                 status = collect_highseqnum_db_recv(subreq, &ret);
1719         } else {
1720                 status = collect_all_db_recv(subreq, &ret);
1721         }
1722         TALLOC_FREE(subreq);
1723         if (! status) {
1724                 tevent_req_error(req, ret);
1725                 return;
1726         }
1727
1728         ctdb_req_control_wipe_database(&request, &state->transdb);
1729         subreq = ctdb_client_control_multi_send(state, state->ev,
1730                                                 state->client,
1731                                                 state->pnn_list, state->count,
1732                                                 TIMEOUT(), &request);
1733         if (tevent_req_nomem(subreq, req)) {
1734                 return;
1735         }
1736         tevent_req_set_callback(subreq, recover_db_wipedb_done, req);
1737 }
1738
1739 static void recover_db_wipedb_done(struct tevent_req *subreq)
1740 {
1741         struct tevent_req *req = tevent_req_callback_data(
1742                 subreq, struct tevent_req);
1743         struct recover_db_state *state = tevent_req_data(
1744                 req, struct recover_db_state);
1745         int *err_list;
1746         int ret;
1747         bool status;
1748
1749         status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
1750                                                 NULL);
1751         TALLOC_FREE(subreq);
1752         if (! status) {
1753                 int ret2;
1754                 uint32_t pnn;
1755
1756                 ret2 = ctdb_client_control_multi_error(state->pnn_list,
1757                                                        state->count,
1758                                                        err_list, &pnn);
1759                 if (ret2 != 0) {
1760                         LOG("control WIPEDB failed for db %s on node %u,"
1761                             " ret=%d\n", state->db_name, pnn, ret2);
1762                 } else {
1763                         LOG("control WIPEDB failed for db %s, ret=%d\n",
1764                             state->db_name, ret);
1765                 }
1766                 tevent_req_error(req, ret);
1767                 return;
1768         }
1769
1770         subreq = push_database_send(state, state->ev, state->client,
1771                                     state->pnn_list, state->count,
1772                                     state->caps, state->tun_list,
1773                                     state->recdb);
1774         if (tevent_req_nomem(subreq, req)) {
1775                 return;
1776         }
1777         tevent_req_set_callback(subreq, recover_db_pushdb_done, req);
1778 }
1779
1780 static void recover_db_pushdb_done(struct tevent_req *subreq)
1781 {
1782         struct tevent_req *req = tevent_req_callback_data(
1783                 subreq, struct tevent_req);
1784         struct recover_db_state *state = tevent_req_data(
1785                 req, struct recover_db_state);
1786         struct ctdb_req_control request;
1787         int ret;
1788         bool status;
1789
1790         status = push_database_recv(subreq, &ret);
1791         TALLOC_FREE(subreq);
1792         if (! status) {
1793                 tevent_req_error(req, ret);
1794                 return;
1795         }
1796
1797         TALLOC_FREE(state->recdb);
1798
1799         ctdb_req_control_db_transaction_commit(&request, &state->transdb);
1800         subreq = ctdb_client_control_multi_send(state, state->ev,
1801                                                 state->client,
1802                                                 state->pnn_list, state->count,
1803                                                 TIMEOUT(), &request);
1804         if (tevent_req_nomem(subreq, req)) {
1805                 return;
1806         }
1807         tevent_req_set_callback(subreq, recover_db_transaction_committed, req);
1808 }
1809
1810 static void recover_db_transaction_committed(struct tevent_req *subreq)
1811 {
1812         struct tevent_req *req = tevent_req_callback_data(
1813                 subreq, struct tevent_req);
1814         struct recover_db_state *state = tevent_req_data(
1815                 req, struct recover_db_state);
1816         struct ctdb_req_control request;
1817         int *err_list;
1818         int ret;
1819         bool status;
1820
1821         status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
1822                                                 NULL);
1823         TALLOC_FREE(subreq);
1824         if (! status) {
1825                 int ret2;
1826                 uint32_t pnn;
1827
1828                 ret2 = ctdb_client_control_multi_error(state->pnn_list,
1829                                                        state->count,
1830                                                        err_list, &pnn);
1831                 if (ret2 != 0) {
1832                         LOG("control DB_TRANSACTION_COMMIT failed for db %s"
1833                             " on node %u, ret=%d\n", state->db_name, pnn, ret2);
1834                 } else {
1835                         LOG("control DB_TRANSACTION_COMMIT failed for db %s,"
1836                             " ret=%d\n", state->db_name, ret);
1837                 }
1838                 tevent_req_error(req, ret);
1839                 return;
1840         }
1841
1842         ctdb_req_control_db_thaw(&request, state->db_id);
1843         subreq = ctdb_client_control_multi_send(state, state->ev,
1844                                                 state->client,
1845                                                 state->pnn_list, state->count,
1846                                                 TIMEOUT(), &request);
1847         if (tevent_req_nomem(subreq, req)) {
1848                 return;
1849         }
1850         tevent_req_set_callback(subreq, recover_db_thaw_done, req);
1851 }
1852
1853 static void recover_db_thaw_done(struct tevent_req *subreq)
1854 {
1855         struct tevent_req *req = tevent_req_callback_data(
1856                 subreq, struct tevent_req);
1857         struct recover_db_state *state = tevent_req_data(
1858                 req, struct recover_db_state);
1859         int *err_list;
1860         int ret;
1861         bool status;
1862
1863         status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
1864                                                 NULL);
1865         TALLOC_FREE(subreq);
1866         if (! status) {
1867                 int ret2;
1868                 uint32_t pnn;
1869
1870                 ret2 = ctdb_client_control_multi_error(state->pnn_list,
1871                                                        state->count,
1872                                                        err_list, &pnn);
1873                 if (ret2 != 0) {
1874                         LOG("control DB_THAW failed for db %s on node %u,"
1875                             " ret=%d\n", state->db_name, pnn, ret2);
1876                 } else {
1877                         LOG("control DB_THAW failed for db %s, ret=%d\n",
1878                             state->db_name, ret);
1879                 }
1880                 tevent_req_error(req, ret);
1881                 return;
1882         }
1883
1884         tevent_req_done(req);
1885 }
1886
1887 static bool recover_db_recv(struct tevent_req *req)
1888 {
1889         return generic_recv(req, NULL);
1890 }
1891
1892
1893 /*
1894  * Start database recovery for each database
1895  *
1896  * Try to recover each database 5 times before failing recovery.
1897  */
1898
1899 struct db_recovery_state {
1900         struct tevent_context *ev;
1901         struct ctdb_dbid_map *dbmap;
1902         int num_replies;
1903         int num_failed;
1904 };
1905
1906 struct db_recovery_one_state {
1907         struct tevent_req *req;
1908         struct ctdb_client_context *client;
1909         struct ctdb_dbid_map *dbmap;
1910         struct ctdb_tunable_list *tun_list;
1911         uint32_t *pnn_list;
1912         int count;
1913         uint32_t *caps;
1914         uint32_t *ban_credits;
1915         uint32_t generation;
1916         uint32_t db_id;
1917         bool persistent;
1918         int num_fails;
1919 };
1920
1921 static void db_recovery_one_done(struct tevent_req *subreq);
1922
1923 static struct tevent_req *db_recovery_send(TALLOC_CTX *mem_ctx,
1924                                            struct tevent_context *ev,
1925                                            struct ctdb_client_context *client,
1926                                            struct ctdb_dbid_map *dbmap,
1927                                            struct ctdb_tunable_list *tun_list,
1928                                            uint32_t *pnn_list, int count,
1929                                            uint32_t *caps,
1930                                            uint32_t *ban_credits,
1931                                            uint32_t generation)
1932 {
1933         struct tevent_req *req, *subreq;
1934         struct db_recovery_state *state;
1935         int i;
1936
1937         req = tevent_req_create(mem_ctx, &state, struct db_recovery_state);
1938         if (req == NULL) {
1939                 return NULL;
1940         }
1941
1942         state->ev = ev;
1943         state->dbmap = dbmap;
1944         state->num_replies = 0;
1945         state->num_failed = 0;
1946
1947         if (dbmap->num == 0) {
1948                 tevent_req_done(req);
1949                 return tevent_req_post(req, ev);
1950         }
1951
1952         for (i=0; i<dbmap->num; i++) {
1953                 struct db_recovery_one_state *substate;
1954
1955                 substate = talloc_zero(state, struct db_recovery_one_state);
1956                 if (tevent_req_nomem(substate, req)) {
1957                         return tevent_req_post(req, ev);
1958                 }
1959
1960                 substate->req = req;
1961                 substate->client = client;
1962                 substate->dbmap = dbmap;
1963                 substate->tun_list = tun_list;
1964                 substate->pnn_list = pnn_list;
1965                 substate->count = count;
1966                 substate->caps = caps;
1967                 substate->ban_credits = ban_credits;
1968                 substate->generation = generation;
1969                 substate->db_id = dbmap->dbs[i].db_id;
1970                 substate->persistent = dbmap->dbs[i].flags &
1971                                        CTDB_DB_FLAGS_PERSISTENT;
1972
1973                 subreq = recover_db_send(state, ev, client, tun_list,
1974                                          pnn_list, count, caps, ban_credits,
1975                                          generation, substate->db_id,
1976                                          substate->persistent);
1977                 if (tevent_req_nomem(subreq, req)) {
1978                         return tevent_req_post(req, ev);
1979                 }
1980                 tevent_req_set_callback(subreq, db_recovery_one_done,
1981                                         substate);
1982                 LOG("recover database 0x%08x\n", substate->db_id);
1983         }
1984
1985         return req;
1986 }
1987
1988 static void db_recovery_one_done(struct tevent_req *subreq)
1989 {
1990         struct db_recovery_one_state *substate = tevent_req_callback_data(
1991                 subreq, struct db_recovery_one_state);
1992         struct tevent_req *req = substate->req;
1993         struct db_recovery_state *state = tevent_req_data(
1994                 req, struct db_recovery_state);
1995         bool status;
1996
1997         status = recover_db_recv(subreq);
1998         TALLOC_FREE(subreq);
1999
2000         if (status) {
2001                 talloc_free(substate);
2002                 goto done;
2003         }
2004
2005         substate->num_fails += 1;
2006         if (substate->num_fails < NUM_RETRIES) {
2007                 subreq = recover_db_send(state, state->ev, substate->client,
2008                                          substate->tun_list,
2009                                          substate->pnn_list, substate->count,
2010                                          substate->caps, substate->ban_credits,
2011                                          substate->generation, substate->db_id,
2012                                          substate->persistent);
2013                 if (tevent_req_nomem(subreq, req)) {
2014                         goto failed;
2015                 }
2016                 tevent_req_set_callback(subreq, db_recovery_one_done, substate);
2017                 LOG("recover database 0x%08x, attempt %d\n", substate->db_id,
2018                     substate->num_fails+1);
2019                 return;
2020         }
2021
2022 failed:
2023         state->num_failed += 1;
2024
2025 done:
2026         state->num_replies += 1;
2027
2028         if (state->num_replies == state->dbmap->num) {
2029                 tevent_req_done(req);
2030         }
2031 }
2032
2033 static bool db_recovery_recv(struct tevent_req *req, int *count)
2034 {
2035         struct db_recovery_state *state = tevent_req_data(
2036                 req, struct db_recovery_state);
2037         int err;
2038
2039         if (tevent_req_is_unix_error(req, &err)) {
2040                 *count = 0;
2041                 return false;
2042         }
2043
2044         *count = state->num_replies - state->num_failed;
2045
2046         if (state->num_failed > 0) {
2047                 return false;
2048         }
2049
2050         return true;
2051 }
2052
2053
2054 /*
2055  * Run the parallel database recovery
2056  *
2057  * - Get tunables
2058  * - Get nodemap
2059  * - Get vnnmap
2060  * - Get capabilities from all nodes
2061  * - Get dbmap
2062  * - Set RECOVERY_ACTIVE
2063  * - Send START_RECOVERY
2064  * - Update vnnmap on all nodes
2065  * - Run database recovery
2066  * - Set RECOVERY_NORMAL
2067  * - Send END_RECOVERY
2068  */
2069
2070 struct recovery_state {
2071         struct tevent_context *ev;
2072         struct ctdb_client_context *client;
2073         uint32_t generation;
2074         uint32_t *pnn_list;
2075         int count;
2076         uint32_t destnode;
2077         struct ctdb_node_map *nodemap;
2078         uint32_t *caps;
2079         uint32_t *ban_credits;
2080         struct ctdb_tunable_list *tun_list;
2081         struct ctdb_vnn_map *vnnmap;
2082         struct ctdb_dbid_map *dbmap;
2083 };
2084
2085 static void recovery_tunables_done(struct tevent_req *subreq);
2086 static void recovery_nodemap_done(struct tevent_req *subreq);
2087 static void recovery_vnnmap_done(struct tevent_req *subreq);
2088 static void recovery_capabilities_done(struct tevent_req *subreq);
2089 static void recovery_dbmap_done(struct tevent_req *subreq);
2090 static void recovery_active_done(struct tevent_req *subreq);
2091 static void recovery_start_recovery_done(struct tevent_req *subreq);
2092 static void recovery_vnnmap_update_done(struct tevent_req *subreq);
2093 static void recovery_db_recovery_done(struct tevent_req *subreq);
2094 static void recovery_failed_done(struct tevent_req *subreq);
2095 static void recovery_normal_done(struct tevent_req *subreq);
2096 static void recovery_end_recovery_done(struct tevent_req *subreq);
2097
2098 static struct tevent_req *recovery_send(TALLOC_CTX *mem_ctx,
2099                                         struct tevent_context *ev,
2100                                         struct ctdb_client_context *client,
2101                                         uint32_t generation)
2102 {
2103         struct tevent_req *req, *subreq;
2104         struct recovery_state *state;
2105         struct ctdb_req_control request;
2106
2107         req = tevent_req_create(mem_ctx, &state, struct recovery_state);
2108         if (req == NULL) {
2109                 return NULL;
2110         }
2111
2112         state->ev = ev;
2113         state->client = client;
2114         state->generation = generation;
2115         state->destnode = ctdb_client_pnn(client);
2116
2117         ctdb_req_control_get_all_tunables(&request);
2118         subreq = ctdb_client_control_send(state, state->ev, state->client,
2119                                           state->destnode, TIMEOUT(),
2120                                           &request);
2121         if (tevent_req_nomem(subreq, req)) {
2122                 return tevent_req_post(req, ev);
2123         }
2124         tevent_req_set_callback(subreq, recovery_tunables_done, req);
2125
2126         return req;
2127 }
2128
2129 static void recovery_tunables_done(struct tevent_req *subreq)
2130 {
2131         struct tevent_req *req = tevent_req_callback_data(
2132                 subreq, struct tevent_req);
2133         struct recovery_state *state = tevent_req_data(
2134                 req, struct recovery_state);
2135         struct ctdb_reply_control *reply;
2136         struct ctdb_req_control request;
2137         int ret;
2138         bool status;
2139
2140         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
2141         TALLOC_FREE(subreq);
2142         if (! status) {
2143                 LOG("control GET_ALL_TUNABLES failed, ret=%d\n", ret);
2144                 tevent_req_error(req, ret);
2145                 return;
2146         }
2147
2148         ret = ctdb_reply_control_get_all_tunables(reply, state,
2149                                                   &state->tun_list);
2150         if (ret != 0) {
2151                 LOG("control GET_ALL_TUNABLES failed, ret=%d\n", ret);
2152                 tevent_req_error(req, EPROTO);
2153                 return;
2154         }
2155
2156         talloc_free(reply);
2157
2158         recover_timeout = state->tun_list->recover_timeout;
2159
2160         ctdb_req_control_get_nodemap(&request);
2161         subreq = ctdb_client_control_send(state, state->ev, state->client,
2162                                           state->destnode, TIMEOUT(),
2163                                           &request);
2164         if (tevent_req_nomem(subreq, req)) {
2165                 return;
2166         }
2167         tevent_req_set_callback(subreq, recovery_nodemap_done, req);
2168 }
2169
2170 static void recovery_nodemap_done(struct tevent_req *subreq)
2171 {
2172         struct tevent_req *req = tevent_req_callback_data(
2173                 subreq, struct tevent_req);
2174         struct recovery_state *state = tevent_req_data(
2175                 req, struct recovery_state);
2176         struct ctdb_reply_control *reply;
2177         struct ctdb_req_control request;
2178         bool status;
2179         int ret;
2180
2181         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
2182         TALLOC_FREE(subreq);
2183         if (! status) {
2184                 LOG("control GET_NODEMAP failed to node %u, ret=%d\n",
2185                     state->destnode, ret);
2186                 tevent_req_error(req, ret);
2187                 return;
2188         }
2189
2190         ret = ctdb_reply_control_get_nodemap(reply, state, &state->nodemap);
2191         if (ret != 0) {
2192                 LOG("control GET_NODEMAP failed, ret=%d\n", ret);
2193                 tevent_req_error(req, ret);
2194                 return;
2195         }
2196
2197         state->count = list_of_active_nodes(state->nodemap, CTDB_UNKNOWN_PNN,
2198                                             state, &state->pnn_list);
2199         if (state->count <= 0) {
2200                 tevent_req_error(req, ENOMEM);
2201                 return;
2202         }
2203
2204         state->ban_credits = talloc_zero_array(state, uint32_t,
2205                                                state->nodemap->num);
2206         if (tevent_req_nomem(state->ban_credits, req)) {
2207                 return;
2208         }
2209
2210         ctdb_req_control_getvnnmap(&request);
2211         subreq = ctdb_client_control_send(state, state->ev, state->client,
2212                                           state->destnode, TIMEOUT(),
2213                                           &request);
2214         if (tevent_req_nomem(subreq, req)) {
2215                 return;
2216         }
2217         tevent_req_set_callback(subreq, recovery_vnnmap_done, req);
2218 }
2219
2220 static void recovery_vnnmap_done(struct tevent_req *subreq)
2221 {
2222         struct tevent_req *req = tevent_req_callback_data(
2223                 subreq, struct tevent_req);
2224         struct recovery_state *state = tevent_req_data(
2225                 req, struct recovery_state);
2226         struct ctdb_reply_control *reply;
2227         struct ctdb_req_control request;
2228         bool status;
2229         int ret;
2230
2231         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
2232         TALLOC_FREE(subreq);
2233         if (! status) {
2234                 LOG("control GETVNNMAP failed to node %u, ret=%d\n",
2235                     state->destnode, ret);
2236                 tevent_req_error(req, ret);
2237                 return;
2238         }
2239
2240         ret = ctdb_reply_control_getvnnmap(reply, state, &state->vnnmap);
2241         if (ret != 0) {
2242                 LOG("control GETVNNMAP failed, ret=%d\n", ret);
2243                 tevent_req_error(req, ret);
2244                 return;
2245         }
2246
2247         ctdb_req_control_get_capabilities(&request);
2248         subreq = ctdb_client_control_multi_send(state, state->ev,
2249                                                 state->client,
2250                                                 state->pnn_list, state->count,
2251                                                 TIMEOUT(), &request);
2252         if (tevent_req_nomem(subreq, req)) {
2253                 return;
2254         }
2255         tevent_req_set_callback(subreq, recovery_capabilities_done, req);
2256 }
2257
2258 static void recovery_capabilities_done(struct tevent_req *subreq)
2259 {
2260         struct tevent_req *req = tevent_req_callback_data(
2261                 subreq, struct tevent_req);
2262         struct recovery_state *state = tevent_req_data(
2263                 req, struct recovery_state);
2264         struct ctdb_reply_control **reply;
2265         struct ctdb_req_control request;
2266         int *err_list;
2267         int ret, i;
2268         bool status;
2269
2270         status = ctdb_client_control_multi_recv(subreq, &ret, state, &err_list,
2271                                                 &reply);
2272         TALLOC_FREE(subreq);
2273         if (! status) {
2274                 int ret2;
2275                 uint32_t pnn;
2276
2277                 ret2 = ctdb_client_control_multi_error(state->pnn_list,
2278                                                        state->count,
2279                                                        err_list, &pnn);
2280                 if (ret2 != 0) {
2281                         LOG("control GET_CAPABILITIES failed on node %u,"
2282                             " ret=%d\n", pnn, ret2);
2283                 } else {
2284                         LOG("control GET_CAPABILITIES failed, ret=%d\n", ret);
2285                 }
2286                 tevent_req_error(req, ret);
2287                 return;
2288         }
2289
2290         /* Make the array size same as nodemap */
2291         state->caps = talloc_zero_array(state, uint32_t,
2292                                         state->nodemap->num);
2293         if (tevent_req_nomem(state->caps, req)) {
2294                 return;
2295         }
2296
2297         for (i=0; i<state->count; i++) {
2298                 uint32_t pnn;
2299
2300                 pnn = state->pnn_list[i];
2301                 ret = ctdb_reply_control_get_capabilities(reply[i],
2302                                                           &state->caps[pnn]);
2303                 if (ret != 0) {
2304                         LOG("control GET_CAPABILITIES failed on node %u\n", pnn);
2305                         tevent_req_error(req, EPROTO);
2306                         return;
2307                 }
2308         }
2309
2310         talloc_free(reply);
2311
2312         ctdb_req_control_get_dbmap(&request);
2313         subreq = ctdb_client_control_send(state, state->ev, state->client,
2314                                           state->destnode, TIMEOUT(),
2315                                           &request);
2316         if (tevent_req_nomem(subreq, req)) {
2317                 return;
2318         }
2319         tevent_req_set_callback(subreq, recovery_dbmap_done, req);
2320 }
2321
2322 static void recovery_dbmap_done(struct tevent_req *subreq)
2323 {
2324         struct tevent_req *req = tevent_req_callback_data(
2325                 subreq, struct tevent_req);
2326         struct recovery_state *state = tevent_req_data(
2327                 req, struct recovery_state);
2328         struct ctdb_reply_control *reply;
2329         struct ctdb_req_control request;
2330         int ret;
2331         bool status;
2332
2333         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
2334         TALLOC_FREE(subreq);
2335         if (! status) {
2336                 LOG("control GET_DBMAP failed to node %u, ret=%d\n",
2337                     state->destnode, ret);
2338                 tevent_req_error(req, ret);
2339                 return;
2340         }
2341
2342         ret = ctdb_reply_control_get_dbmap(reply, state, &state->dbmap);
2343         if (ret != 0) {
2344                 LOG("control GET_DBMAP failed, ret=%d\n", ret);
2345                 tevent_req_error(req, ret);
2346                 return;
2347         }
2348
2349         ctdb_req_control_set_recmode(&request, CTDB_RECOVERY_ACTIVE);
2350         subreq = ctdb_client_control_multi_send(state, state->ev,
2351                                                 state->client,
2352                                                 state->pnn_list, state->count,
2353                                                 TIMEOUT(), &request);
2354         if (tevent_req_nomem(subreq, req)) {
2355                 return;
2356         }
2357         tevent_req_set_callback(subreq, recovery_active_done, req);
2358 }
2359
2360 static void recovery_active_done(struct tevent_req *subreq)
2361 {
2362         struct tevent_req *req = tevent_req_callback_data(
2363                 subreq, struct tevent_req);
2364         struct recovery_state *state = tevent_req_data(
2365                 req, struct recovery_state);
2366         struct ctdb_req_control request;
2367         struct ctdb_vnn_map *vnnmap;
2368         int *err_list;
2369         int ret, count, i;
2370         bool status;
2371
2372         status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
2373                                                 NULL);
2374         TALLOC_FREE(subreq);
2375         if (! status) {
2376                 int ret2;
2377                 uint32_t pnn;
2378
2379                 ret2 = ctdb_client_control_multi_error(state->pnn_list,
2380                                                        state->count,
2381                                                        err_list, &pnn);
2382                 if (ret2 != 0) {
2383                         LOG("failed to set recovery mode to ACTIVE on node %u,"
2384                             " ret=%d\n", pnn, ret2);
2385                 } else {
2386                         LOG("failed to set recovery mode to ACTIVE, ret=%d\n",
2387                             ret);
2388                 }
2389                 tevent_req_error(req, ret);
2390                 return;
2391         }
2392
2393         LOG("set recovery mode to ACTIVE\n");
2394
2395         /* Calculate new VNNMAP */
2396         count = 0;
2397         for (i=0; i<state->nodemap->num; i++) {
2398                 if (state->nodemap->node[i].flags & NODE_FLAGS_INACTIVE) {
2399                         continue;
2400                 }
2401                 if (!(state->caps[i] & CTDB_CAP_LMASTER)) {
2402                         continue;
2403                 }
2404                 count += 1;
2405         }
2406
2407         if (count == 0) {
2408                 LOG("no active lmasters found. Adding recmaster anyway\n");
2409         }
2410
2411         vnnmap = talloc_zero(state, struct ctdb_vnn_map);
2412         if (tevent_req_nomem(vnnmap, req)) {
2413                 return;
2414         }
2415
2416         vnnmap->size = (count == 0 ? 1 : count);
2417         vnnmap->map = talloc_array(vnnmap, uint32_t, vnnmap->size);
2418         if (tevent_req_nomem(vnnmap->map, req)) {
2419                 return;
2420         }
2421
2422         if (count == 0) {
2423                 vnnmap->map[0] = state->destnode;
2424         } else {
2425                 count = 0;
2426                 for (i=0; i<state->nodemap->num; i++) {
2427                         if (state->nodemap->node[i].flags &
2428                             NODE_FLAGS_INACTIVE) {
2429                                 continue;
2430                         }
2431                         if (!(state->caps[i] & CTDB_CAP_LMASTER)) {
2432                                 continue;
2433                         }
2434
2435                         vnnmap->map[count] = state->nodemap->node[i].pnn;
2436                         count += 1;
2437                 }
2438         }
2439
2440         vnnmap->generation = state->generation;
2441
2442         talloc_free(state->vnnmap);
2443         state->vnnmap = vnnmap;
2444
2445         ctdb_req_control_start_recovery(&request);
2446         subreq = ctdb_client_control_multi_send(state, state->ev,
2447                                                 state->client,
2448                                                 state->pnn_list, state->count,
2449                                                 TIMEOUT(), &request);
2450         if (tevent_req_nomem(subreq, req)) {
2451                 return;
2452         }
2453         tevent_req_set_callback(subreq, recovery_start_recovery_done, req);
2454 }
2455
2456 static void recovery_start_recovery_done(struct tevent_req *subreq)
2457 {
2458         struct tevent_req *req = tevent_req_callback_data(
2459                 subreq, struct tevent_req);
2460         struct recovery_state *state = tevent_req_data(
2461                 req, struct recovery_state);
2462         struct ctdb_req_control request;
2463         int *err_list;
2464         int ret;
2465         bool status;
2466
2467         status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
2468                                                 NULL);
2469         TALLOC_FREE(subreq);
2470         if (! status) {
2471                 int ret2;
2472                 uint32_t pnn;
2473
2474                 ret2 = ctdb_client_control_multi_error(state->pnn_list,
2475                                                        state->count,
2476                                                        err_list, &pnn);
2477                 if (ret2 != 0) {
2478                         LOG("failed to run start_recovery event on node %u,"
2479                             " ret=%d\n", pnn, ret2);
2480                 } else {
2481                         LOG("failed to run start_recovery event, ret=%d\n",
2482                             ret);
2483                 }
2484                 tevent_req_error(req, ret);
2485                 return;
2486         }
2487
2488         LOG("start_recovery event finished\n");
2489
2490         ctdb_req_control_setvnnmap(&request, state->vnnmap);
2491         subreq = ctdb_client_control_multi_send(state, state->ev,
2492                                                 state->client,
2493                                                 state->pnn_list, state->count,
2494                                                 TIMEOUT(), &request);
2495         if (tevent_req_nomem(subreq, req)) {
2496                 return;
2497         }
2498         tevent_req_set_callback(subreq, recovery_vnnmap_update_done, req);
2499 }
2500
2501 static void recovery_vnnmap_update_done(struct tevent_req *subreq)
2502 {
2503         struct tevent_req *req = tevent_req_callback_data(
2504                 subreq, struct tevent_req);
2505         struct recovery_state *state = tevent_req_data(
2506                 req, struct recovery_state);
2507         int *err_list;
2508         int ret;
2509         bool status;
2510
2511         status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
2512                                                 NULL);
2513         TALLOC_FREE(subreq);
2514         if (! status) {
2515                 int ret2;
2516                 uint32_t pnn;
2517
2518                 ret2 = ctdb_client_control_multi_error(state->pnn_list,
2519                                                        state->count,
2520                                                        err_list, &pnn);
2521                 if (ret2 != 0) {
2522                         LOG("failed to update VNNMAP on node %u, ret=%d\n",
2523                             pnn, ret2);
2524                 } else {
2525                         LOG("failed to update VNNMAP, ret=%d\n", ret);
2526                 }
2527                 tevent_req_error(req, ret);
2528                 return;
2529         }
2530
2531         LOG("updated VNNMAP\n");
2532
2533         subreq = db_recovery_send(state, state->ev, state->client,
2534                                   state->dbmap, state->tun_list,
2535                                   state->pnn_list, state->count,
2536                                   state->caps, state->ban_credits,
2537                                   state->vnnmap->generation);
2538         if (tevent_req_nomem(subreq, req)) {
2539                 return;
2540         }
2541         tevent_req_set_callback(subreq, recovery_db_recovery_done, req);
2542 }
2543
2544 static void recovery_db_recovery_done(struct tevent_req *subreq)
2545 {
2546         struct tevent_req *req = tevent_req_callback_data(
2547                 subreq, struct tevent_req);
2548         struct recovery_state *state = tevent_req_data(
2549                 req, struct recovery_state);
2550         struct ctdb_req_control request;
2551         bool status;
2552         int count;
2553
2554         status = db_recovery_recv(subreq, &count);
2555         TALLOC_FREE(subreq);
2556
2557         LOG("%d of %d databases recovered\n", count, state->dbmap->num);
2558
2559         if (! status) {
2560                 uint32_t max_pnn = CTDB_UNKNOWN_PNN, max_credits = 0;
2561                 int i;
2562
2563                 /* Bans are not enabled */
2564                 if (state->tun_list->enable_bans == 0) {
2565                         tevent_req_error(req, EIO);
2566                         return;
2567                 }
2568
2569                 for (i=0; i<state->count; i++) {
2570                         uint32_t pnn;
2571                         pnn = state->pnn_list[i];
2572                         if (state->ban_credits[pnn] > max_credits) {
2573                                 max_pnn = pnn;
2574                                 max_credits = state->ban_credits[pnn];
2575                         }
2576                 }
2577
2578                 /* If pulling database fails multiple times */
2579                 if (max_credits >= NUM_RETRIES) {
2580                         struct ctdb_req_message message;
2581
2582                         LOG("Assigning banning credits to node %u\n", max_pnn);
2583
2584                         message.srvid = CTDB_SRVID_BANNING;
2585                         message.data.pnn = max_pnn;
2586
2587                         subreq = ctdb_client_message_send(
2588                                         state, state->ev, state->client,
2589                                         ctdb_client_pnn(state->client),
2590                                         &message);
2591                         if (tevent_req_nomem(subreq, req)) {
2592                                 return;
2593                         }
2594                         tevent_req_set_callback(subreq, recovery_failed_done,
2595                                                 req);
2596                 } else {
2597                         tevent_req_error(req, EIO);
2598                 }
2599                 return;
2600         }
2601
2602         ctdb_req_control_set_recmode(&request, CTDB_RECOVERY_NORMAL);
2603         subreq = ctdb_client_control_multi_send(state, state->ev,
2604                                                 state->client,
2605                                                 state->pnn_list, state->count,
2606                                                 TIMEOUT(), &request);
2607         if (tevent_req_nomem(subreq, req)) {
2608                 return;
2609         }
2610         tevent_req_set_callback(subreq, recovery_normal_done, req);
2611 }
2612
2613 static void recovery_failed_done(struct tevent_req *subreq)
2614 {
2615         struct tevent_req *req = tevent_req_callback_data(
2616                 subreq, struct tevent_req);
2617         int ret;
2618         bool status;
2619
2620         status = ctdb_client_message_recv(subreq, &ret);
2621         TALLOC_FREE(subreq);
2622         if (! status) {
2623                 LOG("failed to assign banning credits, ret=%d\n", ret);
2624         }
2625
2626         tevent_req_error(req, EIO);
2627 }
2628
2629 static void recovery_normal_done(struct tevent_req *subreq)
2630 {
2631         struct tevent_req *req = tevent_req_callback_data(
2632                 subreq, struct tevent_req);
2633         struct recovery_state *state = tevent_req_data(
2634                 req, struct recovery_state);
2635         struct ctdb_req_control request;
2636         int *err_list;
2637         int ret;
2638         bool status;
2639
2640         status = ctdb_client_control_multi_recv(subreq, &ret, state, &err_list,
2641                                                 NULL);
2642         TALLOC_FREE(subreq);
2643         if (! status) {
2644                 int ret2;
2645                 uint32_t pnn;
2646
2647                 ret2 = ctdb_client_control_multi_error(state->pnn_list,
2648                                                        state->count,
2649                                                        err_list, &pnn);
2650                 if (ret2 != 0) {
2651                         LOG("failed to set recovery mode to NORMAL on node %u,"
2652                             " ret=%d\n", pnn, ret2);
2653                 } else {
2654                         LOG("failed to set recovery mode to NORMAL, ret=%d\n",
2655                             ret);
2656                 }
2657                 tevent_req_error(req, ret);
2658                 return;
2659         }
2660
2661         LOG("set recovery mode to NORMAL\n");
2662
2663         ctdb_req_control_end_recovery(&request);
2664         subreq = ctdb_client_control_multi_send(state, state->ev,
2665                                                 state->client,
2666                                                 state->pnn_list, state->count,
2667                                                 TIMEOUT(), &request);
2668         if (tevent_req_nomem(subreq, req)) {
2669                 return;
2670         }
2671         tevent_req_set_callback(subreq, recovery_end_recovery_done, req);
2672 }
2673
2674 static void recovery_end_recovery_done(struct tevent_req *subreq)
2675 {
2676         struct tevent_req *req = tevent_req_callback_data(
2677                 subreq, struct tevent_req);
2678         struct recovery_state *state = tevent_req_data(
2679                 req, struct recovery_state);
2680         int *err_list;
2681         int ret;
2682         bool status;
2683
2684         status = ctdb_client_control_multi_recv(subreq, &ret, state, &err_list,
2685                                                 NULL);
2686         TALLOC_FREE(subreq);
2687         if (! status) {
2688                 int ret2;
2689                 uint32_t pnn;
2690
2691                 ret2 = ctdb_client_control_multi_error(state->pnn_list,
2692                                                        state->count,
2693                                                        err_list, &pnn);
2694                 if (ret2 != 0) {
2695                         LOG("failed to run recovered event on node %u,"
2696                             " ret=%d\n", pnn, ret2);
2697                 } else {
2698                         LOG("failed to run recovered event, ret=%d\n", ret);
2699                 }
2700                 tevent_req_error(req, ret);
2701                 return;
2702         }
2703
2704         LOG("recovered event finished\n");
2705
2706         tevent_req_done(req);
2707 }
2708
2709 static void recovery_recv(struct tevent_req *req, int *perr)
2710 {
2711         generic_recv(req, perr);
2712 }
2713
2714 static void usage(const char *progname)
2715 {
2716         fprintf(stderr, "\nUsage: %s <output-fd> <ctdb-socket-path> <generation>\n",
2717                 progname);
2718 }
2719
2720
2721 /*
2722  * Arguments - log fd, write fd, socket path, generation
2723  */
2724 int main(int argc, char *argv[])
2725 {
2726         int write_fd;
2727         const char *sockpath;
2728         TALLOC_CTX *mem_ctx;
2729         struct tevent_context *ev;
2730         struct ctdb_client_context *client;
2731         int ret;
2732         struct tevent_req *req;
2733         uint32_t generation;
2734
2735         if (argc != 4) {
2736                 usage(argv[0]);
2737                 exit(1);
2738         }
2739
2740         write_fd = atoi(argv[1]);
2741         sockpath = argv[2];
2742         generation = (uint32_t)strtoul(argv[3], NULL, 0);
2743
2744         mem_ctx = talloc_new(NULL);
2745         if (mem_ctx == NULL) {
2746                 fprintf(stderr, "recovery: talloc_new() failed\n");
2747                 goto failed;
2748         }
2749
2750         ret = logging_init(mem_ctx, NULL, NULL, "ctdb-recovery");
2751         if (ret != 0) {
2752                 fprintf(stderr, "recovery: Unable to initialize logging\n");
2753                 goto failed;
2754         }
2755
2756         ev = tevent_context_init(mem_ctx);
2757         if (ev == NULL) {
2758                 LOG("tevent_context_init() failed\n");
2759                 goto failed;
2760         }
2761
2762         ret = ctdb_client_init(mem_ctx, ev, sockpath, &client);
2763         if (ret != 0) {
2764                 LOG("ctdb_client_init() failed, ret=%d\n", ret);
2765                 goto failed;
2766         }
2767
2768         req = recovery_send(mem_ctx, ev, client, generation);
2769         if (req == NULL) {
2770                 LOG("database_recover_send() failed\n");
2771                 goto failed;
2772         }
2773
2774         if (! tevent_req_poll(req, ev)) {
2775                 LOG("tevent_req_poll() failed\n");
2776                 goto failed;
2777         }
2778
2779         recovery_recv(req, &ret);
2780         TALLOC_FREE(req);
2781         if (ret != 0) {
2782                 LOG("database recovery failed, ret=%d\n", ret);
2783                 goto failed;
2784         }
2785
2786         sys_write(write_fd, &ret, sizeof(ret));
2787         return 0;
2788
2789 failed:
2790         TALLOC_FREE(mem_ctx);
2791         return 1;
2792 }