b882da0a134ba9e531553286c1de4f22492c20e3
[sfrench/samba-autobuild/.git] / ctdb / server / ctdb_recovery_helper.c
1 /*
2    ctdb parallel database recovery
3
4    Copyright (C) Amitay Isaacs  2015
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "replace.h"
21 #include "system/network.h"
22 #include "system/filesys.h"
23
24 #include <talloc.h>
25 #include <tevent.h>
26 #include <tdb.h>
27 #include <libgen.h>
28
29 #include "lib/tdb_wrap/tdb_wrap.h"
30 #include "lib/util/sys_rw.h"
31 #include "lib/util/time.h"
32 #include "lib/util/tevent_unix.h"
33
34 #include "protocol/protocol.h"
35 #include "protocol/protocol_api.h"
36 #include "client/client.h"
37
38 #include "common/logging.h"
39
40 static int recover_timeout = 30;
41
42 #define NUM_RETRIES     3
43
44 #define TIMEOUT()       timeval_current_ofs(recover_timeout, 0)
45
46 /*
47  * Utility functions
48  */
49
50 static bool generic_recv(struct tevent_req *req, int *perr)
51 {
52         int err;
53
54         if (tevent_req_is_unix_error(req, &err)) {
55                 if (perr != NULL) {
56                         *perr = err;
57                 }
58                 return false;
59         }
60
61         return true;
62 }
63
64 static uint64_t rec_srvid = CTDB_SRVID_RECOVERY;
65
66 static uint64_t srvid_next(void)
67 {
68         rec_srvid += 1;
69         return rec_srvid;
70 }
71
72 /*
73  * Recovery database functions
74  */
75
76 struct recdb_context {
77         uint32_t db_id;
78         const char *db_name;
79         const char *db_path;
80         struct tdb_wrap *db;
81         bool persistent;
82 };
83
84 static struct recdb_context *recdb_create(TALLOC_CTX *mem_ctx, uint32_t db_id,
85                                           const char *db_name,
86                                           const char *db_path,
87                                           uint32_t hash_size, bool persistent)
88 {
89         static char *db_dir_state = NULL;
90         struct recdb_context *recdb;
91         unsigned int tdb_flags;
92
93         recdb = talloc(mem_ctx, struct recdb_context);
94         if (recdb == NULL) {
95                 return NULL;
96         }
97
98         if (db_dir_state == NULL) {
99                 db_dir_state = getenv("CTDB_DBDIR_STATE");
100         }
101
102         recdb->db_name = db_name;
103         recdb->db_id = db_id;
104         recdb->db_path = talloc_asprintf(recdb, "%s/recdb.%s",
105                                          db_dir_state != NULL ?
106                                             db_dir_state :
107                                             dirname(discard_const(db_path)),
108                                          db_name);
109         if (recdb->db_path == NULL) {
110                 talloc_free(recdb);
111                 return NULL;
112         }
113         unlink(recdb->db_path);
114
115         tdb_flags = TDB_NOLOCK | TDB_INCOMPATIBLE_HASH | TDB_DISALLOW_NESTING;
116         recdb->db = tdb_wrap_open(mem_ctx, recdb->db_path, hash_size,
117                                   tdb_flags, O_RDWR|O_CREAT|O_EXCL, 0600);
118         if (recdb->db == NULL) {
119                 talloc_free(recdb);
120                 D_ERR("failed to create recovery db %s\n", recdb->db_path);
121                 return NULL;
122         }
123
124         recdb->persistent = persistent;
125
126         return recdb;
127 }
128
129 static uint32_t recdb_id(struct recdb_context *recdb)
130 {
131         return recdb->db_id;
132 }
133
134 static const char *recdb_name(struct recdb_context *recdb)
135 {
136         return recdb->db_name;
137 }
138
139 static const char *recdb_path(struct recdb_context *recdb)
140 {
141         return recdb->db_path;
142 }
143
144 static struct tdb_context *recdb_tdb(struct recdb_context *recdb)
145 {
146         return recdb->db->tdb;
147 }
148
149 static bool recdb_persistent(struct recdb_context *recdb)
150 {
151         return recdb->persistent;
152 }
153
154 struct recdb_add_traverse_state {
155         struct recdb_context *recdb;
156         int mypnn;
157 };
158
159 static int recdb_add_traverse(uint32_t reqid, struct ctdb_ltdb_header *header,
160                               TDB_DATA key, TDB_DATA data,
161                               void *private_data)
162 {
163         struct recdb_add_traverse_state *state =
164                 (struct recdb_add_traverse_state *)private_data;
165         struct ctdb_ltdb_header *hdr;
166         TDB_DATA prev_data;
167         int ret;
168
169         /* header is not marshalled separately in the pulldb control */
170         if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
171                 return -1;
172         }
173
174         hdr = (struct ctdb_ltdb_header *)data.dptr;
175
176         /* fetch the existing record, if any */
177         prev_data = tdb_fetch(recdb_tdb(state->recdb), key);
178
179         if (prev_data.dptr != NULL) {
180                 struct ctdb_ltdb_header prev_hdr;
181
182                 prev_hdr = *(struct ctdb_ltdb_header *)prev_data.dptr;
183                 free(prev_data.dptr);
184                 if (hdr->rsn < prev_hdr.rsn ||
185                     (hdr->rsn == prev_hdr.rsn &&
186                      prev_hdr.dmaster != state->mypnn)) {
187                         return 0;
188                 }
189         }
190
191         ret = tdb_store(recdb_tdb(state->recdb), key, data, TDB_REPLACE);
192         if (ret != 0) {
193                 return -1;
194         }
195         return 0;
196 }
197
198 static bool recdb_add(struct recdb_context *recdb, int mypnn,
199                       struct ctdb_rec_buffer *recbuf)
200 {
201         struct recdb_add_traverse_state state;
202         int ret;
203
204         state.recdb = recdb;
205         state.mypnn = mypnn;
206
207         ret = ctdb_rec_buffer_traverse(recbuf, recdb_add_traverse, &state);
208         if (ret != 0) {
209                 return false;
210         }
211
212         return true;
213 }
214
215 /* This function decides which records from recdb are retained */
216 static int recbuf_filter_add(struct ctdb_rec_buffer *recbuf, bool persistent,
217                              uint32_t reqid, uint32_t dmaster,
218                              TDB_DATA key, TDB_DATA data)
219 {
220         struct ctdb_ltdb_header *header;
221         int ret;
222
223         /* Skip empty records */
224         if (data.dsize <= sizeof(struct ctdb_ltdb_header)) {
225                 return 0;
226         }
227
228         /* update the dmaster field to point to us */
229         header = (struct ctdb_ltdb_header *)data.dptr;
230         if (!persistent) {
231                 header->dmaster = dmaster;
232                 header->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
233         }
234
235         ret = ctdb_rec_buffer_add(recbuf, recbuf, reqid, NULL, key, data);
236         if (ret != 0) {
237                 return ret;
238         }
239
240         return 0;
241 }
242
243 struct recdb_records_traverse_state {
244         struct ctdb_rec_buffer *recbuf;
245         uint32_t dmaster;
246         uint32_t reqid;
247         bool persistent;
248         bool failed;
249 };
250
251 static int recdb_records_traverse(struct tdb_context *tdb,
252                                   TDB_DATA key, TDB_DATA data,
253                                   void *private_data)
254 {
255         struct recdb_records_traverse_state *state =
256                 (struct recdb_records_traverse_state *)private_data;
257         int ret;
258
259         ret = recbuf_filter_add(state->recbuf, state->persistent,
260                                 state->reqid, state->dmaster, key, data);
261         if (ret != 0) {
262                 state->failed = true;
263                 return ret;
264         }
265
266         return 0;
267 }
268
269 static struct ctdb_rec_buffer *recdb_records(struct recdb_context *recdb,
270                                              TALLOC_CTX *mem_ctx,
271                                              uint32_t dmaster)
272 {
273         struct recdb_records_traverse_state state;
274         int ret;
275
276         state.recbuf = ctdb_rec_buffer_init(mem_ctx, recdb_id(recdb));
277         if (state.recbuf == NULL) {
278                 return NULL;
279         }
280         state.dmaster = dmaster;
281         state.reqid = 0;
282         state.persistent = recdb_persistent(recdb);
283         state.failed = false;
284
285         ret = tdb_traverse_read(recdb_tdb(recdb), recdb_records_traverse,
286                                 &state);
287         if (ret == -1 || state.failed) {
288                 D_ERR("Failed to marshall recovery records for %s\n",
289                       recdb_name(recdb));
290                 TALLOC_FREE(state.recbuf);
291                 return NULL;
292         }
293
294         return state.recbuf;
295 }
296
297 struct recdb_file_traverse_state {
298         struct ctdb_rec_buffer *recbuf;
299         struct recdb_context *recdb;
300         TALLOC_CTX *mem_ctx;
301         uint32_t dmaster;
302         uint32_t reqid;
303         bool persistent;
304         bool failed;
305         int fd;
306         int max_size;
307         int num_buffers;
308 };
309
310 static int recdb_file_traverse(struct tdb_context *tdb,
311                                TDB_DATA key, TDB_DATA data,
312                                void *private_data)
313 {
314         struct recdb_file_traverse_state *state =
315                 (struct recdb_file_traverse_state *)private_data;
316         int ret;
317
318         ret = recbuf_filter_add(state->recbuf, state->persistent,
319                                 state->reqid, state->dmaster, key, data);
320         if (ret != 0) {
321                 state->failed = true;
322                 return ret;
323         }
324
325         if (ctdb_rec_buffer_len(state->recbuf) > state->max_size) {
326                 ret = ctdb_rec_buffer_write(state->recbuf, state->fd);
327                 if (ret != 0) {
328                         D_ERR("Failed to collect recovery records for %s\n",
329                               recdb_name(state->recdb));
330                         state->failed = true;
331                         return ret;
332                 }
333
334                 state->num_buffers += 1;
335
336                 TALLOC_FREE(state->recbuf);
337                 state->recbuf = ctdb_rec_buffer_init(state->mem_ctx,
338                                                      recdb_id(state->recdb));
339                 if (state->recbuf == NULL) {
340                         state->failed = true;
341                         return ENOMEM;
342                 }
343         }
344
345         return 0;
346 }
347
348 static int recdb_file(struct recdb_context *recdb, TALLOC_CTX *mem_ctx,
349                       uint32_t dmaster, int fd, int max_size)
350 {
351         struct recdb_file_traverse_state state;
352         int ret;
353
354         state.recbuf = ctdb_rec_buffer_init(mem_ctx, recdb_id(recdb));
355         if (state.recbuf == NULL) {
356                 return -1;
357         }
358         state.recdb = recdb;
359         state.mem_ctx = mem_ctx;
360         state.dmaster = dmaster;
361         state.reqid = 0;
362         state.persistent = recdb_persistent(recdb);
363         state.failed = false;
364         state.fd = fd;
365         state.max_size = max_size;
366         state.num_buffers = 0;
367
368         ret = tdb_traverse_read(recdb_tdb(recdb), recdb_file_traverse, &state);
369         if (ret == -1 || state.failed) {
370                 TALLOC_FREE(state.recbuf);
371                 return -1;
372         }
373
374         ret = ctdb_rec_buffer_write(state.recbuf, fd);
375         if (ret != 0) {
376                 D_ERR("Failed to collect recovery records for %s\n",
377                       recdb_name(recdb));
378                 TALLOC_FREE(state.recbuf);
379                 return -1;
380         }
381         state.num_buffers += 1;
382
383         D_DEBUG("Wrote %d buffers of recovery records for %s\n",
384                 state.num_buffers, recdb_name(recdb));
385
386         return state.num_buffers;
387 }
388
389 /*
390  * Pull database from a single node
391  */
392
393 struct pull_database_state {
394         struct tevent_context *ev;
395         struct ctdb_client_context *client;
396         struct recdb_context *recdb;
397         uint32_t pnn;
398         uint64_t srvid;
399         int num_records;
400 };
401
402 static void pull_database_handler(uint64_t srvid, TDB_DATA data,
403                                   void *private_data);
404 static void pull_database_register_done(struct tevent_req *subreq);
405 static void pull_database_old_done(struct tevent_req *subreq);
406 static void pull_database_unregister_done(struct tevent_req *subreq);
407 static void pull_database_new_done(struct tevent_req *subreq);
408
409 static struct tevent_req *pull_database_send(
410                         TALLOC_CTX *mem_ctx,
411                         struct tevent_context *ev,
412                         struct ctdb_client_context *client,
413                         uint32_t pnn, uint32_t caps,
414                         struct recdb_context *recdb)
415 {
416         struct tevent_req *req, *subreq;
417         struct pull_database_state *state;
418         struct ctdb_req_control request;
419
420         req = tevent_req_create(mem_ctx, &state, struct pull_database_state);
421         if (req == NULL) {
422                 return NULL;
423         }
424
425         state->ev = ev;
426         state->client = client;
427         state->recdb = recdb;
428         state->pnn = pnn;
429         state->srvid = srvid_next();
430
431         if (caps & CTDB_CAP_FRAGMENTED_CONTROLS) {
432                 subreq = ctdb_client_set_message_handler_send(
433                                         state, state->ev, state->client,
434                                         state->srvid, pull_database_handler,
435                                         req);
436                 if (tevent_req_nomem(subreq, req)) {
437                         return tevent_req_post(req, ev);
438                 }
439
440                 tevent_req_set_callback(subreq, pull_database_register_done,
441                                         req);
442
443         } else {
444                 struct ctdb_pulldb pulldb;
445
446                 pulldb.db_id = recdb_id(recdb);
447                 pulldb.lmaster = CTDB_LMASTER_ANY;
448
449                 ctdb_req_control_pull_db(&request, &pulldb);
450                 subreq = ctdb_client_control_send(state, state->ev,
451                                                   state->client,
452                                                   pnn, TIMEOUT(),
453                                                   &request);
454                 if (tevent_req_nomem(subreq, req)) {
455                         return tevent_req_post(req, ev);
456                 }
457                 tevent_req_set_callback(subreq, pull_database_old_done, req);
458         }
459
460         return req;
461 }
462
463 static void pull_database_handler(uint64_t srvid, TDB_DATA data,
464                                   void *private_data)
465 {
466         struct tevent_req *req = talloc_get_type_abort(
467                 private_data, struct tevent_req);
468         struct pull_database_state *state = tevent_req_data(
469                 req, struct pull_database_state);
470         struct ctdb_rec_buffer *recbuf;
471         int ret;
472         bool status;
473
474         if (srvid != state->srvid) {
475                 return;
476         }
477
478         ret = ctdb_rec_buffer_pull(data.dptr, data.dsize, state, &recbuf);
479         if (ret != 0) {
480                 D_ERR("Invalid data received for DB_PULL messages\n");
481                 return;
482         }
483
484         if (recbuf->db_id != recdb_id(state->recdb)) {
485                 talloc_free(recbuf);
486                 D_ERR("Invalid dbid:%08x for DB_PULL messages for %s\n",
487                       recbuf->db_id, recdb_name(state->recdb));
488                 return;
489         }
490
491         status = recdb_add(state->recdb, ctdb_client_pnn(state->client),
492                            recbuf);
493         if (! status) {
494                 talloc_free(recbuf);
495                 D_ERR("Failed to add records to recdb for %s\n",
496                       recdb_name(state->recdb));
497                 return;
498         }
499
500         state->num_records += recbuf->count;
501         talloc_free(recbuf);
502 }
503
504 static void pull_database_register_done(struct tevent_req *subreq)
505 {
506         struct tevent_req *req = tevent_req_callback_data(
507                 subreq, struct tevent_req);
508         struct pull_database_state *state = tevent_req_data(
509                 req, struct pull_database_state);
510         struct ctdb_req_control request;
511         struct ctdb_pulldb_ext pulldb_ext;
512         int ret;
513         bool status;
514
515         status = ctdb_client_set_message_handler_recv(subreq, &ret);
516         TALLOC_FREE(subreq);
517         if (! status) {
518                 D_ERR("Failed to set message handler for DB_PULL for %s\n",
519                       recdb_name(state->recdb));
520                 tevent_req_error(req, ret);
521                 return;
522         }
523
524         pulldb_ext.db_id = recdb_id(state->recdb);
525         pulldb_ext.lmaster = CTDB_LMASTER_ANY;
526         pulldb_ext.srvid = state->srvid;
527
528         ctdb_req_control_db_pull(&request, &pulldb_ext);
529         subreq = ctdb_client_control_send(state, state->ev, state->client,
530                                           state->pnn, TIMEOUT(), &request);
531         if (tevent_req_nomem(subreq, req)) {
532                 return;
533         }
534         tevent_req_set_callback(subreq, pull_database_new_done, req);
535 }
536
537 static void pull_database_old_done(struct tevent_req *subreq)
538 {
539         struct tevent_req *req = tevent_req_callback_data(
540                 subreq, struct tevent_req);
541         struct pull_database_state *state = tevent_req_data(
542                 req, struct pull_database_state);
543         struct ctdb_reply_control *reply;
544         struct ctdb_rec_buffer *recbuf;
545         int ret;
546         bool status;
547
548         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
549         TALLOC_FREE(subreq);
550         if (! status) {
551                 D_ERR("control PULL_DB failed for %s on node %u, ret=%d\n",
552                       recdb_name(state->recdb), state->pnn, ret);
553                 tevent_req_error(req, ret);
554                 return;
555         }
556
557         ret = ctdb_reply_control_pull_db(reply, state, &recbuf);
558         talloc_free(reply);
559         if (ret != 0) {
560                 tevent_req_error(req, ret);
561                 return;
562         }
563
564         status = recdb_add(state->recdb, ctdb_client_pnn(state->client),
565                            recbuf);
566         if (! status) {
567                 talloc_free(recbuf);
568                 tevent_req_error(req, EIO);
569                 return;
570         }
571
572         state->num_records = recbuf->count;
573         talloc_free(recbuf);
574
575         D_INFO("Pulled %d records for db %s from node %d\n",
576                state->num_records, recdb_name(state->recdb), state->pnn);
577
578         tevent_req_done(req);
579 }
580
581 static void pull_database_new_done(struct tevent_req *subreq)
582 {
583         struct tevent_req *req = tevent_req_callback_data(
584                 subreq, struct tevent_req);
585         struct pull_database_state *state = tevent_req_data(
586                 req, struct pull_database_state);
587         struct ctdb_reply_control *reply;
588         uint32_t num_records;
589         int ret;
590         bool status;
591
592         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
593         TALLOC_FREE(subreq);
594         if (! status) {
595                 D_ERR("control DB_PULL failed for %s on node %u, ret=%d\n",
596                       recdb_name(state->recdb), state->pnn, ret);
597                 tevent_req_error(req, ret);
598                 return;
599         }
600
601         ret = ctdb_reply_control_db_pull(reply, &num_records);
602         talloc_free(reply);
603         if (num_records != state->num_records) {
604                 D_ERR("mismatch (%u != %u) in DB_PULL records for db %s\n",
605                       num_records, state->num_records,
606                       recdb_name(state->recdb));
607                 tevent_req_error(req, EIO);
608                 return;
609         }
610
611         D_INFO("Pulled %d records for db %s from node %d\n",
612                state->num_records, recdb_name(state->recdb), state->pnn);
613
614         subreq = ctdb_client_remove_message_handler_send(
615                                         state, state->ev, state->client,
616                                         state->srvid, req);
617         if (tevent_req_nomem(subreq, req)) {
618                 return;
619         }
620         tevent_req_set_callback(subreq, pull_database_unregister_done, req);
621 }
622
623 static void pull_database_unregister_done(struct tevent_req *subreq)
624 {
625         struct tevent_req *req = tevent_req_callback_data(
626                 subreq, struct tevent_req);
627         struct pull_database_state *state = tevent_req_data(
628                 req, struct pull_database_state);
629         int ret;
630         bool status;
631
632         status = ctdb_client_remove_message_handler_recv(subreq, &ret);
633         TALLOC_FREE(subreq);
634         if (! status) {
635                 D_ERR("failed to remove message handler for DB_PULL for db %s\n",
636                       recdb_name(state->recdb));
637                 tevent_req_error(req, ret);
638                 return;
639         }
640
641         tevent_req_done(req);
642 }
643
644 static bool pull_database_recv(struct tevent_req *req, int *perr)
645 {
646         return generic_recv(req, perr);
647 }
648
649 /*
650  * Push database to specified nodes (old style)
651  */
652
653 struct push_database_old_state {
654         struct tevent_context *ev;
655         struct ctdb_client_context *client;
656         struct recdb_context *recdb;
657         uint32_t *pnn_list;
658         int count;
659         struct ctdb_rec_buffer *recbuf;
660         int index;
661 };
662
663 static void push_database_old_push_done(struct tevent_req *subreq);
664
665 static struct tevent_req *push_database_old_send(
666                         TALLOC_CTX *mem_ctx,
667                         struct tevent_context *ev,
668                         struct ctdb_client_context *client,
669                         uint32_t *pnn_list, int count,
670                         struct recdb_context *recdb)
671 {
672         struct tevent_req *req, *subreq;
673         struct push_database_old_state *state;
674         struct ctdb_req_control request;
675         uint32_t pnn;
676
677         req = tevent_req_create(mem_ctx, &state,
678                                 struct push_database_old_state);
679         if (req == NULL) {
680                 return NULL;
681         }
682
683         state->ev = ev;
684         state->client = client;
685         state->recdb = recdb;
686         state->pnn_list = pnn_list;
687         state->count = count;
688         state->index = 0;
689
690         state->recbuf = recdb_records(recdb, state,
691                                       ctdb_client_pnn(client));
692         if (tevent_req_nomem(state->recbuf, req)) {
693                 return tevent_req_post(req, ev);
694         }
695
696         pnn = state->pnn_list[state->index];
697
698         ctdb_req_control_push_db(&request, state->recbuf);
699         subreq = ctdb_client_control_send(state, ev, client, pnn,
700                                           TIMEOUT(), &request);
701         if (tevent_req_nomem(subreq, req)) {
702                 return tevent_req_post(req, ev);
703         }
704         tevent_req_set_callback(subreq, push_database_old_push_done, req);
705
706         return req;
707 }
708
709 static void push_database_old_push_done(struct tevent_req *subreq)
710 {
711         struct tevent_req *req = tevent_req_callback_data(
712                 subreq, struct tevent_req);
713         struct push_database_old_state *state = tevent_req_data(
714                 req, struct push_database_old_state);
715         struct ctdb_req_control request;
716         uint32_t pnn;
717         int ret;
718         bool status;
719
720         status = ctdb_client_control_recv(subreq, &ret, NULL, NULL);
721         TALLOC_FREE(subreq);
722         if (! status) {
723                 D_ERR("control PUSH_DB failed for db %s on node %u, ret=%d\n",
724                       recdb_name(state->recdb), state->pnn_list[state->index],
725                       ret);
726                 tevent_req_error(req, ret);
727                 return;
728         }
729
730         state->index += 1;
731         if (state->index == state->count) {
732                 TALLOC_FREE(state->recbuf);
733                 tevent_req_done(req);
734                 return;
735         }
736
737         pnn = state->pnn_list[state->index];
738
739         ctdb_req_control_push_db(&request, state->recbuf);
740         subreq = ctdb_client_control_send(state, state->ev, state->client,
741                                           pnn, TIMEOUT(), &request);
742         if (tevent_req_nomem(subreq, req)) {
743                 return;
744         }
745         tevent_req_set_callback(subreq, push_database_old_push_done, req);
746 }
747
748 static bool push_database_old_recv(struct tevent_req *req, int *perr)
749 {
750         return generic_recv(req, perr);
751 }
752
753 /*
754  * Push database to specified nodes (new style)
755  */
756
757 struct push_database_new_state {
758         struct tevent_context *ev;
759         struct ctdb_client_context *client;
760         struct recdb_context *recdb;
761         uint32_t *pnn_list;
762         int count;
763         uint64_t srvid;
764         uint32_t dmaster;
765         int fd;
766         int num_buffers;
767         int num_buffers_sent;
768         int num_records;
769 };
770
771 static void push_database_new_started(struct tevent_req *subreq);
772 static void push_database_new_send_msg(struct tevent_req *req);
773 static void push_database_new_send_done(struct tevent_req *subreq);
774 static void push_database_new_confirmed(struct tevent_req *subreq);
775
776 static struct tevent_req *push_database_new_send(
777                         TALLOC_CTX *mem_ctx,
778                         struct tevent_context *ev,
779                         struct ctdb_client_context *client,
780                         uint32_t *pnn_list, int count,
781                         struct recdb_context *recdb,
782                         int max_size)
783 {
784         struct tevent_req *req, *subreq;
785         struct push_database_new_state *state;
786         struct ctdb_req_control request;
787         struct ctdb_pulldb_ext pulldb_ext;
788         char *filename;
789         off_t offset;
790
791         req = tevent_req_create(mem_ctx, &state,
792                                 struct push_database_new_state);
793         if (req == NULL) {
794                 return NULL;
795         }
796
797         state->ev = ev;
798         state->client = client;
799         state->recdb = recdb;
800         state->pnn_list = pnn_list;
801         state->count = count;
802
803         state->srvid = srvid_next();
804         state->dmaster = ctdb_client_pnn(client);
805         state->num_buffers_sent = 0;
806         state->num_records = 0;
807
808         filename = talloc_asprintf(state, "%s.dat", recdb_path(recdb));
809         if (tevent_req_nomem(filename, req)) {
810                 return tevent_req_post(req, ev);
811         }
812
813         state->fd = open(filename, O_RDWR|O_CREAT, 0644);
814         if (state->fd == -1) {
815                 tevent_req_error(req, errno);
816                 return tevent_req_post(req, ev);
817         }
818         unlink(filename);
819         talloc_free(filename);
820
821         state->num_buffers = recdb_file(recdb, state, state->dmaster,
822                                         state->fd, max_size);
823         if (state->num_buffers == -1) {
824                 tevent_req_error(req, ENOMEM);
825                 return tevent_req_post(req, ev);
826         }
827
828         offset = lseek(state->fd, 0, SEEK_SET);
829         if (offset != 0) {
830                 tevent_req_error(req, EIO);
831                 return tevent_req_post(req, ev);
832         }
833
834         pulldb_ext.db_id = recdb_id(recdb);
835         pulldb_ext.srvid = state->srvid;
836
837         ctdb_req_control_db_push_start(&request, &pulldb_ext);
838         subreq = ctdb_client_control_multi_send(state, ev, client,
839                                                 pnn_list, count,
840                                                 TIMEOUT(), &request);
841         if (tevent_req_nomem(subreq, req)) {
842                 return tevent_req_post(req, ev);
843         }
844         tevent_req_set_callback(subreq, push_database_new_started, req);
845
846         return req;
847 }
848
849 static void push_database_new_started(struct tevent_req *subreq)
850 {
851         struct tevent_req *req = tevent_req_callback_data(
852                 subreq, struct tevent_req);
853         struct push_database_new_state *state = tevent_req_data(
854                 req, struct push_database_new_state);
855         int *err_list;
856         int ret;
857         bool status;
858
859         status = ctdb_client_control_multi_recv(subreq, &ret, state,
860                                                 &err_list, NULL);
861         TALLOC_FREE(subreq);
862         if (! status) {
863                 int ret2;
864                 uint32_t pnn;
865
866                 ret2 = ctdb_client_control_multi_error(state->pnn_list,
867                                                        state->count,
868                                                        err_list, &pnn);
869                 if (ret2 != 0) {
870                         D_ERR("control DB_PUSH_START failed for db %s"
871                               " on node %u, ret=%d\n",
872                               recdb_name(state->recdb), pnn, ret2);
873                 } else {
874                         D_ERR("control DB_PUSH_START failed for db %s,"
875                               " ret=%d\n",
876                               recdb_name(state->recdb), ret);
877                 }
878                 talloc_free(err_list);
879
880                 tevent_req_error(req, ret);
881                 return;
882         }
883
884         push_database_new_send_msg(req);
885 }
886
887 static void push_database_new_send_msg(struct tevent_req *req)
888 {
889         struct push_database_new_state *state = tevent_req_data(
890                 req, struct push_database_new_state);
891         struct tevent_req *subreq;
892         struct ctdb_rec_buffer *recbuf;
893         struct ctdb_req_message message;
894         TDB_DATA data;
895         int ret;
896
897         if (state->num_buffers_sent == state->num_buffers) {
898                 struct ctdb_req_control request;
899
900                 ctdb_req_control_db_push_confirm(&request,
901                                                  recdb_id(state->recdb));
902                 subreq = ctdb_client_control_multi_send(state, state->ev,
903                                                         state->client,
904                                                         state->pnn_list,
905                                                         state->count,
906                                                         TIMEOUT(), &request);
907                 if (tevent_req_nomem(subreq, req)) {
908                         return;
909                 }
910                 tevent_req_set_callback(subreq, push_database_new_confirmed,
911                                         req);
912                 return;
913         }
914
915         ret = ctdb_rec_buffer_read(state->fd, state, &recbuf);
916         if (ret != 0) {
917                 tevent_req_error(req, ret);
918                 return;
919         }
920
921         data.dsize = ctdb_rec_buffer_len(recbuf);
922         data.dptr = talloc_size(state, data.dsize);
923         if (tevent_req_nomem(data.dptr, req)) {
924                 return;
925         }
926
927         ctdb_rec_buffer_push(recbuf, data.dptr);
928
929         message.srvid = state->srvid;
930         message.data.data = data;
931
932         D_DEBUG("Pushing buffer %d with %d records for db %s\n",
933                 state->num_buffers_sent, recbuf->count,
934                 recdb_name(state->recdb));
935
936         subreq = ctdb_client_message_multi_send(state, state->ev,
937                                                 state->client,
938                                                 state->pnn_list, state->count,
939                                                 &message);
940         if (tevent_req_nomem(subreq, req)) {
941                 return;
942         }
943         tevent_req_set_callback(subreq, push_database_new_send_done, req);
944
945         state->num_records += recbuf->count;
946
947         talloc_free(data.dptr);
948         talloc_free(recbuf);
949 }
950
951 static void push_database_new_send_done(struct tevent_req *subreq)
952 {
953         struct tevent_req *req = tevent_req_callback_data(
954                 subreq, struct tevent_req);
955         struct push_database_new_state *state = tevent_req_data(
956                 req, struct push_database_new_state);
957         bool status;
958         int ret;
959
960         status = ctdb_client_message_multi_recv(subreq, &ret, NULL, NULL);
961         TALLOC_FREE(subreq);
962         if (! status) {
963                 D_ERR("Sending recovery records failed for %s\n",
964                       recdb_name(state->recdb));
965                 tevent_req_error(req, ret);
966                 return;
967         }
968
969         state->num_buffers_sent += 1;
970
971         push_database_new_send_msg(req);
972 }
973
974 static void push_database_new_confirmed(struct tevent_req *subreq)
975 {
976         struct tevent_req *req = tevent_req_callback_data(
977                 subreq, struct tevent_req);
978         struct push_database_new_state *state = tevent_req_data(
979                 req, struct push_database_new_state);
980         struct ctdb_reply_control **reply;
981         int *err_list;
982         bool status;
983         int ret, i;
984         uint32_t num_records;
985
986         status = ctdb_client_control_multi_recv(subreq, &ret, state,
987                                                 &err_list, &reply);
988         TALLOC_FREE(subreq);
989         if (! status) {
990                 int ret2;
991                 uint32_t pnn;
992
993                 ret2 = ctdb_client_control_multi_error(state->pnn_list,
994                                                        state->count, err_list,
995                                                        &pnn);
996                 if (ret2 != 0) {
997                         D_ERR("control DB_PUSH_CONFIRM failed for db %s"
998                               " on node %u, ret=%d\n",
999                               recdb_name(state->recdb), pnn, ret2);
1000                 } else {
1001                         D_ERR("control DB_PUSH_CONFIRM failed for db %s,"
1002                               " ret=%d\n",
1003                               recdb_name(state->recdb), ret);
1004                 }
1005                 tevent_req_error(req, ret);
1006                 return;
1007         }
1008
1009         for (i=0; i<state->count; i++) {
1010                 ret = ctdb_reply_control_db_push_confirm(reply[i],
1011                                                          &num_records);
1012                 if (ret != 0) {
1013                         tevent_req_error(req, EPROTO);
1014                         return;
1015                 }
1016
1017                 if (num_records != state->num_records) {
1018                         D_ERR("Node %u received %d of %d records for %s\n",
1019                               state->pnn_list[i], num_records,
1020                               state->num_records, recdb_name(state->recdb));
1021                         tevent_req_error(req, EPROTO);
1022                         return;
1023                 }
1024         }
1025
1026         talloc_free(reply);
1027
1028         D_INFO("Pushed %d records for db %s\n",
1029                state->num_records, recdb_name(state->recdb));
1030
1031         tevent_req_done(req);
1032 }
1033
1034 static bool push_database_new_recv(struct tevent_req *req, int *perr)
1035 {
1036         return generic_recv(req, perr);
1037 }
1038
1039 /*
1040  * wrapper for push_database_old and push_database_new
1041  */
1042
1043 struct push_database_state {
1044         bool old_done, new_done;
1045 };
1046
1047 static void push_database_old_done(struct tevent_req *subreq);
1048 static void push_database_new_done(struct tevent_req *subreq);
1049
1050 static struct tevent_req *push_database_send(
1051                         TALLOC_CTX *mem_ctx,
1052                         struct tevent_context *ev,
1053                         struct ctdb_client_context *client,
1054                         uint32_t *pnn_list, int count, uint32_t *caps,
1055                         struct ctdb_tunable_list *tun_list,
1056                         struct recdb_context *recdb)
1057 {
1058         struct tevent_req *req, *subreq;
1059         struct push_database_state *state;
1060         uint32_t *old_list, *new_list;
1061         int old_count, new_count;
1062         int i;
1063
1064         req = tevent_req_create(mem_ctx, &state, struct push_database_state);
1065         if (req == NULL) {
1066                 return NULL;
1067         }
1068
1069         state->old_done = false;
1070         state->new_done = false;
1071
1072         old_count = 0;
1073         new_count = 0;
1074         old_list = talloc_array(state, uint32_t, count);
1075         new_list = talloc_array(state, uint32_t, count);
1076         if (tevent_req_nomem(old_list, req) ||
1077             tevent_req_nomem(new_list,req)) {
1078                 return tevent_req_post(req, ev);
1079         }
1080
1081         for (i=0; i<count; i++) {
1082                 uint32_t pnn = pnn_list[i];
1083
1084                 if (caps[pnn] & CTDB_CAP_FRAGMENTED_CONTROLS) {
1085                         new_list[new_count] = pnn;
1086                         new_count += 1;
1087                 } else {
1088                         old_list[old_count] = pnn;
1089                         old_count += 1;
1090                 }
1091         }
1092
1093         if (old_count > 0) {
1094                 subreq = push_database_old_send(state, ev, client,
1095                                                 old_list, old_count, recdb);
1096                 if (tevent_req_nomem(subreq, req)) {
1097                         return tevent_req_post(req, ev);
1098                 }
1099                 tevent_req_set_callback(subreq, push_database_old_done, req);
1100         } else {
1101                 state->old_done = true;
1102         }
1103
1104         if (new_count > 0) {
1105                 subreq = push_database_new_send(state, ev, client,
1106                                                 new_list, new_count, recdb,
1107                                                 tun_list->rec_buffer_size_limit);
1108                 if (tevent_req_nomem(subreq, req)) {
1109                         return tevent_req_post(req, ev);
1110                 }
1111                 tevent_req_set_callback(subreq, push_database_new_done, req);
1112         } else {
1113                 state->new_done = true;
1114         }
1115
1116         return req;
1117 }
1118
1119 static void push_database_old_done(struct tevent_req *subreq)
1120 {
1121         struct tevent_req *req = tevent_req_callback_data(
1122                 subreq, struct tevent_req);
1123         struct push_database_state *state = tevent_req_data(
1124                 req, struct push_database_state);
1125         bool status;
1126         int ret;
1127
1128         status = push_database_old_recv(subreq, &ret);
1129         if (! status) {
1130                 tevent_req_error(req, ret);
1131                 return;
1132         }
1133
1134         state->old_done = true;
1135
1136         if (state->old_done && state->new_done) {
1137                 tevent_req_done(req);
1138         }
1139 }
1140
1141 static void push_database_new_done(struct tevent_req *subreq)
1142 {
1143         struct tevent_req *req = tevent_req_callback_data(
1144                 subreq, struct tevent_req);
1145         struct push_database_state *state = tevent_req_data(
1146                 req, struct push_database_state);
1147         bool status;
1148         int ret;
1149
1150         status = push_database_new_recv(subreq, &ret);
1151         if (! status) {
1152                 tevent_req_error(req, ret);
1153                 return;
1154         }
1155
1156         state->new_done = true;
1157
1158         if (state->old_done && state->new_done) {
1159                 tevent_req_done(req);
1160         }
1161 }
1162
1163 static bool push_database_recv(struct tevent_req *req, int *perr)
1164 {
1165         return generic_recv(req, perr);
1166 }
1167
1168 /*
1169  * Collect databases using highest sequence number
1170  */
1171
1172 struct collect_highseqnum_db_state {
1173         struct tevent_context *ev;
1174         struct ctdb_client_context *client;
1175         uint32_t *pnn_list;
1176         int count;
1177         uint32_t *caps;
1178         uint32_t *ban_credits;
1179         uint32_t db_id;
1180         struct recdb_context *recdb;
1181         uint32_t max_pnn;
1182 };
1183
1184 static void collect_highseqnum_db_seqnum_done(struct tevent_req *subreq);
1185 static void collect_highseqnum_db_pulldb_done(struct tevent_req *subreq);
1186
1187 static struct tevent_req *collect_highseqnum_db_send(
1188                         TALLOC_CTX *mem_ctx,
1189                         struct tevent_context *ev,
1190                         struct ctdb_client_context *client,
1191                         uint32_t *pnn_list, int count, uint32_t *caps,
1192                         uint32_t *ban_credits, uint32_t db_id,
1193                         struct recdb_context *recdb)
1194 {
1195         struct tevent_req *req, *subreq;
1196         struct collect_highseqnum_db_state *state;
1197         struct ctdb_req_control request;
1198
1199         req = tevent_req_create(mem_ctx, &state,
1200                                 struct collect_highseqnum_db_state);
1201         if (req == NULL) {
1202                 return NULL;
1203         }
1204
1205         state->ev = ev;
1206         state->client = client;
1207         state->pnn_list = pnn_list;
1208         state->count = count;
1209         state->caps = caps;
1210         state->ban_credits = ban_credits;
1211         state->db_id = db_id;
1212         state->recdb = recdb;
1213
1214         ctdb_req_control_get_db_seqnum(&request, db_id);
1215         subreq = ctdb_client_control_multi_send(mem_ctx, ev, client,
1216                                                 state->pnn_list, state->count,
1217                                                 TIMEOUT(), &request);
1218         if (tevent_req_nomem(subreq, req)) {
1219                 return tevent_req_post(req, ev);
1220         }
1221         tevent_req_set_callback(subreq, collect_highseqnum_db_seqnum_done,
1222                                 req);
1223
1224         return req;
1225 }
1226
1227 static void collect_highseqnum_db_seqnum_done(struct tevent_req *subreq)
1228 {
1229         struct tevent_req *req = tevent_req_callback_data(
1230                 subreq, struct tevent_req);
1231         struct collect_highseqnum_db_state *state = tevent_req_data(
1232                 req, struct collect_highseqnum_db_state);
1233         struct ctdb_reply_control **reply;
1234         int *err_list;
1235         bool status;
1236         int ret, i;
1237         uint64_t seqnum, max_seqnum;
1238
1239         status = ctdb_client_control_multi_recv(subreq, &ret, state,
1240                                                 &err_list, &reply);
1241         TALLOC_FREE(subreq);
1242         if (! status) {
1243                 int ret2;
1244                 uint32_t pnn;
1245
1246                 ret2 = ctdb_client_control_multi_error(state->pnn_list,
1247                                                        state->count, err_list,
1248                                                        &pnn);
1249                 if (ret2 != 0) {
1250                         D_ERR("control GET_DB_SEQNUM failed for db %s"
1251                               " on node %u, ret=%d\n",
1252                               recdb_name(state->recdb), pnn, ret2);
1253                 } else {
1254                         D_ERR("control GET_DB_SEQNUM failed for db %s,"
1255                               " ret=%d\n",
1256                               recdb_name(state->recdb), ret);
1257                 }
1258                 tevent_req_error(req, ret);
1259                 return;
1260         }
1261
1262         max_seqnum = 0;
1263         state->max_pnn = state->pnn_list[0];
1264         for (i=0; i<state->count; i++) {
1265                 ret = ctdb_reply_control_get_db_seqnum(reply[i], &seqnum);
1266                 if (ret != 0) {
1267                         tevent_req_error(req, EPROTO);
1268                         return;
1269                 }
1270
1271                 if (max_seqnum < seqnum) {
1272                         max_seqnum = seqnum;
1273                         state->max_pnn = state->pnn_list[i];
1274                 }
1275         }
1276
1277         talloc_free(reply);
1278
1279         D_INFO("Pull persistent db %s from node %d with seqnum 0x%"PRIx64"\n",
1280                recdb_name(state->recdb), state->max_pnn, max_seqnum);
1281
1282         subreq = pull_database_send(state, state->ev, state->client,
1283                                     state->max_pnn,
1284                                     state->caps[state->max_pnn],
1285                                     state->recdb);
1286         if (tevent_req_nomem(subreq, req)) {
1287                 return;
1288         }
1289         tevent_req_set_callback(subreq, collect_highseqnum_db_pulldb_done,
1290                                 req);
1291 }
1292
1293 static void collect_highseqnum_db_pulldb_done(struct tevent_req *subreq)
1294 {
1295         struct tevent_req *req = tevent_req_callback_data(
1296                 subreq, struct tevent_req);
1297         struct collect_highseqnum_db_state *state = tevent_req_data(
1298                 req, struct collect_highseqnum_db_state);
1299         int ret;
1300         bool status;
1301
1302         status = pull_database_recv(subreq, &ret);
1303         TALLOC_FREE(subreq);
1304         if (! status) {
1305                 state->ban_credits[state->max_pnn] += 1;
1306                 tevent_req_error(req, ret);
1307                 return;
1308         }
1309
1310         tevent_req_done(req);
1311 }
1312
1313 static bool collect_highseqnum_db_recv(struct tevent_req *req, int *perr)
1314 {
1315         return generic_recv(req, perr);
1316 }
1317
1318 /*
1319  * Collect all databases
1320  */
1321
1322 struct collect_all_db_state {
1323         struct tevent_context *ev;
1324         struct ctdb_client_context *client;
1325         uint32_t *pnn_list;
1326         int count;
1327         uint32_t *caps;
1328         uint32_t *ban_credits;
1329         uint32_t db_id;
1330         struct recdb_context *recdb;
1331         struct ctdb_pulldb pulldb;
1332         int index;
1333 };
1334
1335 static void collect_all_db_pulldb_done(struct tevent_req *subreq);
1336
1337 static struct tevent_req *collect_all_db_send(
1338                         TALLOC_CTX *mem_ctx,
1339                         struct tevent_context *ev,
1340                         struct ctdb_client_context *client,
1341                         uint32_t *pnn_list, int count, uint32_t *caps,
1342                         uint32_t *ban_credits, uint32_t db_id,
1343                         struct recdb_context *recdb)
1344 {
1345         struct tevent_req *req, *subreq;
1346         struct collect_all_db_state *state;
1347         uint32_t pnn;
1348
1349         req = tevent_req_create(mem_ctx, &state,
1350                                 struct collect_all_db_state);
1351         if (req == NULL) {
1352                 return NULL;
1353         }
1354
1355         state->ev = ev;
1356         state->client = client;
1357         state->pnn_list = pnn_list;
1358         state->count = count;
1359         state->caps = caps;
1360         state->ban_credits = ban_credits;
1361         state->db_id = db_id;
1362         state->recdb = recdb;
1363         state->index = 0;
1364
1365         pnn = state->pnn_list[state->index];
1366
1367         subreq = pull_database_send(state, ev, client, pnn, caps[pnn], recdb);
1368         if (tevent_req_nomem(subreq, req)) {
1369                 return tevent_req_post(req, ev);
1370         }
1371         tevent_req_set_callback(subreq, collect_all_db_pulldb_done, req);
1372
1373         return req;
1374 }
1375
1376 static void collect_all_db_pulldb_done(struct tevent_req *subreq)
1377 {
1378         struct tevent_req *req = tevent_req_callback_data(
1379                 subreq, struct tevent_req);
1380         struct collect_all_db_state *state = tevent_req_data(
1381                 req, struct collect_all_db_state);
1382         uint32_t pnn;
1383         int ret;
1384         bool status;
1385
1386         status = pull_database_recv(subreq, &ret);
1387         TALLOC_FREE(subreq);
1388         if (! status) {
1389                 pnn = state->pnn_list[state->index];
1390                 state->ban_credits[pnn] += 1;
1391                 tevent_req_error(req, ret);
1392                 return;
1393         }
1394
1395         state->index += 1;
1396         if (state->index == state->count) {
1397                 tevent_req_done(req);
1398                 return;
1399         }
1400
1401         pnn = state->pnn_list[state->index];
1402         subreq = pull_database_send(state, state->ev, state->client,
1403                                     pnn, state->caps[pnn], state->recdb);
1404         if (tevent_req_nomem(subreq, req)) {
1405                 return;
1406         }
1407         tevent_req_set_callback(subreq, collect_all_db_pulldb_done, req);
1408 }
1409
1410 static bool collect_all_db_recv(struct tevent_req *req, int *perr)
1411 {
1412         return generic_recv(req, perr);
1413 }
1414
1415
1416 /**
1417  * For each database do the following:
1418  *  - Get DB name
1419  *  - Get DB path
1420  *  - Freeze database on all nodes
1421  *  - Start transaction on all nodes
1422  *  - Collect database from all nodes
1423  *  - Wipe database on all nodes
1424  *  - Push database to all nodes
1425  *  - Commit transaction on all nodes
1426  *  - Thaw database on all nodes
1427  */
1428
1429 struct recover_db_state {
1430         struct tevent_context *ev;
1431         struct ctdb_client_context *client;
1432         struct ctdb_tunable_list *tun_list;
1433         uint32_t *pnn_list;
1434         int count;
1435         uint32_t *caps;
1436         uint32_t *ban_credits;
1437         uint32_t db_id;
1438         bool persistent;
1439
1440         uint32_t destnode;
1441         struct ctdb_transdb transdb;
1442
1443         const char *db_name, *db_path;
1444         struct recdb_context *recdb;
1445 };
1446
1447 static void recover_db_name_done(struct tevent_req *subreq);
1448 static void recover_db_path_done(struct tevent_req *subreq);
1449 static void recover_db_freeze_done(struct tevent_req *subreq);
1450 static void recover_db_transaction_started(struct tevent_req *subreq);
1451 static void recover_db_collect_done(struct tevent_req *subreq);
1452 static void recover_db_wipedb_done(struct tevent_req *subreq);
1453 static void recover_db_pushdb_done(struct tevent_req *subreq);
1454 static void recover_db_transaction_committed(struct tevent_req *subreq);
1455 static void recover_db_thaw_done(struct tevent_req *subreq);
1456
1457 static struct tevent_req *recover_db_send(TALLOC_CTX *mem_ctx,
1458                                           struct tevent_context *ev,
1459                                           struct ctdb_client_context *client,
1460                                           struct ctdb_tunable_list *tun_list,
1461                                           uint32_t *pnn_list, int count,
1462                                           uint32_t *caps,
1463                                           uint32_t *ban_credits,
1464                                           uint32_t generation,
1465                                           uint32_t db_id, bool persistent)
1466 {
1467         struct tevent_req *req, *subreq;
1468         struct recover_db_state *state;
1469         struct ctdb_req_control request;
1470
1471         req = tevent_req_create(mem_ctx, &state, struct recover_db_state);
1472         if (req == NULL) {
1473                 return NULL;
1474         }
1475
1476         state->ev = ev;
1477         state->client = client;
1478         state->tun_list = tun_list;
1479         state->pnn_list = pnn_list;
1480         state->count = count;
1481         state->caps = caps;
1482         state->ban_credits = ban_credits;
1483         state->db_id = db_id;
1484         state->persistent = persistent;
1485
1486         state->destnode = ctdb_client_pnn(client);
1487         state->transdb.db_id = db_id;
1488         state->transdb.tid = generation;
1489
1490         ctdb_req_control_get_dbname(&request, db_id);
1491         subreq = ctdb_client_control_send(state, ev, client, state->destnode,
1492                                           TIMEOUT(), &request);
1493         if (tevent_req_nomem(subreq, req)) {
1494                 return tevent_req_post(req, ev);
1495         }
1496         tevent_req_set_callback(subreq, recover_db_name_done, req);
1497
1498         return req;
1499 }
1500
1501 static void recover_db_name_done(struct tevent_req *subreq)
1502 {
1503         struct tevent_req *req = tevent_req_callback_data(
1504                 subreq, struct tevent_req);
1505         struct recover_db_state *state = tevent_req_data(
1506                 req, struct recover_db_state);
1507         struct ctdb_reply_control *reply;
1508         struct ctdb_req_control request;
1509         int ret;
1510         bool status;
1511
1512         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
1513         TALLOC_FREE(subreq);
1514         if (! status) {
1515                 D_ERR("control GET_DBNAME failed for db=0x%x, ret=%d\n",
1516                       state->db_id, ret);
1517                 tevent_req_error(req, ret);
1518                 return;
1519         }
1520
1521         ret = ctdb_reply_control_get_dbname(reply, state, &state->db_name);
1522         if (ret != 0) {
1523                 D_ERR("control GET_DBNAME failed for db=0x%x, ret=%d\n",
1524                       state->db_id, ret);
1525                 tevent_req_error(req, EPROTO);
1526                 return;
1527         }
1528
1529         talloc_free(reply);
1530
1531         ctdb_req_control_getdbpath(&request, state->db_id);
1532         subreq = ctdb_client_control_send(state, state->ev, state->client,
1533                                           state->destnode, TIMEOUT(),
1534                                           &request);
1535         if (tevent_req_nomem(subreq, req)) {
1536                 return;
1537         }
1538         tevent_req_set_callback(subreq, recover_db_path_done, req);
1539 }
1540
1541 static void recover_db_path_done(struct tevent_req *subreq)
1542 {
1543         struct tevent_req *req = tevent_req_callback_data(
1544                 subreq, struct tevent_req);
1545         struct recover_db_state *state = tevent_req_data(
1546                 req, struct recover_db_state);
1547         struct ctdb_reply_control *reply;
1548         struct ctdb_req_control request;
1549         int ret;
1550         bool status;
1551
1552         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
1553         TALLOC_FREE(subreq);
1554         if (! status) {
1555                 D_ERR("control GETDBPATH failed for db %s, ret=%d\n",
1556                       state->db_name, ret);
1557                 tevent_req_error(req, ret);
1558                 return;
1559         }
1560
1561         ret = ctdb_reply_control_getdbpath(reply, state, &state->db_path);
1562         if (ret != 0) {
1563                 D_ERR("control GETDBPATH failed for db %s, ret=%d\n",
1564                       state->db_name, ret);
1565                 tevent_req_error(req, EPROTO);
1566                 return;
1567         }
1568
1569         talloc_free(reply);
1570
1571         ctdb_req_control_db_freeze(&request, state->db_id);
1572         subreq = ctdb_client_control_multi_send(state, state->ev,
1573                                                 state->client,
1574                                                 state->pnn_list, state->count,
1575                                                 TIMEOUT(), &request);
1576         if (tevent_req_nomem(subreq, req)) {
1577                 return;
1578         }
1579         tevent_req_set_callback(subreq, recover_db_freeze_done, req);
1580 }
1581
1582 static void recover_db_freeze_done(struct tevent_req *subreq)
1583 {
1584         struct tevent_req *req = tevent_req_callback_data(
1585                 subreq, struct tevent_req);
1586         struct recover_db_state *state = tevent_req_data(
1587                 req, struct recover_db_state);
1588         struct ctdb_req_control request;
1589         int *err_list;
1590         int ret;
1591         bool status;
1592
1593         status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
1594                                                 NULL);
1595         TALLOC_FREE(subreq);
1596         if (! status) {
1597                 int ret2;
1598                 uint32_t pnn;
1599
1600                 ret2 = ctdb_client_control_multi_error(state->pnn_list,
1601                                                        state->count, err_list,
1602                                                        &pnn);
1603                 if (ret2 != 0) {
1604                         D_ERR("control FREEZE_DB failed for db %s"
1605                               " on node %u, ret=%d\n",
1606                               state->db_name, pnn, ret2);
1607                 } else {
1608                         D_ERR("control FREEZE_DB failed for db %s, ret=%d\n",
1609                               state->db_name, ret);
1610                 }
1611                 tevent_req_error(req, ret);
1612                 return;
1613         }
1614
1615         ctdb_req_control_db_transaction_start(&request, &state->transdb);
1616         subreq = ctdb_client_control_multi_send(state, state->ev,
1617                                                 state->client,
1618                                                 state->pnn_list, state->count,
1619                                                 TIMEOUT(), &request);
1620         if (tevent_req_nomem(subreq, req)) {
1621                 return;
1622         }
1623         tevent_req_set_callback(subreq, recover_db_transaction_started, req);
1624 }
1625
1626 static void recover_db_transaction_started(struct tevent_req *subreq)
1627 {
1628         struct tevent_req *req = tevent_req_callback_data(
1629                 subreq, struct tevent_req);
1630         struct recover_db_state *state = tevent_req_data(
1631                 req, struct recover_db_state);
1632         int *err_list;
1633         int ret;
1634         bool status;
1635
1636         status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
1637                                                 NULL);
1638         TALLOC_FREE(subreq);
1639         if (! status) {
1640                 int ret2;
1641                 uint32_t pnn;
1642
1643                 ret2 = ctdb_client_control_multi_error(state->pnn_list,
1644                                                        state->count,
1645                                                        err_list, &pnn);
1646                 if (ret2 != 0) {
1647                         D_ERR("control TRANSACTION_DB failed for db=%s"
1648                               " on node %u, ret=%d\n",
1649                               state->db_name, pnn, ret2);
1650                 } else {
1651                         D_ERR("control TRANSACTION_DB failed for db=%s,"
1652                               " ret=%d\n", state->db_name, ret);
1653                 }
1654                 tevent_req_error(req, ret);
1655                 return;
1656         }
1657
1658         state->recdb = recdb_create(state, state->db_id, state->db_name,
1659                                     state->db_path,
1660                                     state->tun_list->database_hash_size,
1661                                     state->persistent);
1662         if (tevent_req_nomem(state->recdb, req)) {
1663                 return;
1664         }
1665
1666         if (state->persistent) {
1667                 subreq = collect_highseqnum_db_send(
1668                                 state, state->ev, state->client,
1669                                 state->pnn_list, state->count, state->caps,
1670                                 state->ban_credits, state->db_id,
1671                                 state->recdb);
1672         } else {
1673                 subreq = collect_all_db_send(
1674                                 state, state->ev, state->client,
1675                                 state->pnn_list, state->count, state->caps,
1676                                 state->ban_credits, state->db_id,
1677                                 state->recdb);
1678         }
1679         if (tevent_req_nomem(subreq, req)) {
1680                 return;
1681         }
1682         tevent_req_set_callback(subreq, recover_db_collect_done, req);
1683 }
1684
1685 static void recover_db_collect_done(struct tevent_req *subreq)
1686 {
1687         struct tevent_req *req = tevent_req_callback_data(
1688                 subreq, struct tevent_req);
1689         struct recover_db_state *state = tevent_req_data(
1690                 req, struct recover_db_state);
1691         struct ctdb_req_control request;
1692         int ret;
1693         bool status;
1694
1695         if (state->persistent) {
1696                 status = collect_highseqnum_db_recv(subreq, &ret);
1697         } else {
1698                 status = collect_all_db_recv(subreq, &ret);
1699         }
1700         TALLOC_FREE(subreq);
1701         if (! status) {
1702                 tevent_req_error(req, ret);
1703                 return;
1704         }
1705
1706         ctdb_req_control_wipe_database(&request, &state->transdb);
1707         subreq = ctdb_client_control_multi_send(state, state->ev,
1708                                                 state->client,
1709                                                 state->pnn_list, state->count,
1710                                                 TIMEOUT(), &request);
1711         if (tevent_req_nomem(subreq, req)) {
1712                 return;
1713         }
1714         tevent_req_set_callback(subreq, recover_db_wipedb_done, req);
1715 }
1716
1717 static void recover_db_wipedb_done(struct tevent_req *subreq)
1718 {
1719         struct tevent_req *req = tevent_req_callback_data(
1720                 subreq, struct tevent_req);
1721         struct recover_db_state *state = tevent_req_data(
1722                 req, struct recover_db_state);
1723         int *err_list;
1724         int ret;
1725         bool status;
1726
1727         status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
1728                                                 NULL);
1729         TALLOC_FREE(subreq);
1730         if (! status) {
1731                 int ret2;
1732                 uint32_t pnn;
1733
1734                 ret2 = ctdb_client_control_multi_error(state->pnn_list,
1735                                                        state->count,
1736                                                        err_list, &pnn);
1737                 if (ret2 != 0) {
1738                         D_ERR("control WIPEDB failed for db %s on node %u,"
1739                               " ret=%d\n", state->db_name, pnn, ret2);
1740                 } else {
1741                         D_ERR("control WIPEDB failed for db %s, ret=%d\n",
1742                               state->db_name, ret);
1743                 }
1744                 tevent_req_error(req, ret);
1745                 return;
1746         }
1747
1748         subreq = push_database_send(state, state->ev, state->client,
1749                                     state->pnn_list, state->count,
1750                                     state->caps, state->tun_list,
1751                                     state->recdb);
1752         if (tevent_req_nomem(subreq, req)) {
1753                 return;
1754         }
1755         tevent_req_set_callback(subreq, recover_db_pushdb_done, req);
1756 }
1757
1758 static void recover_db_pushdb_done(struct tevent_req *subreq)
1759 {
1760         struct tevent_req *req = tevent_req_callback_data(
1761                 subreq, struct tevent_req);
1762         struct recover_db_state *state = tevent_req_data(
1763                 req, struct recover_db_state);
1764         struct ctdb_req_control request;
1765         int ret;
1766         bool status;
1767
1768         status = push_database_recv(subreq, &ret);
1769         TALLOC_FREE(subreq);
1770         if (! status) {
1771                 tevent_req_error(req, ret);
1772                 return;
1773         }
1774
1775         TALLOC_FREE(state->recdb);
1776
1777         ctdb_req_control_db_transaction_commit(&request, &state->transdb);
1778         subreq = ctdb_client_control_multi_send(state, state->ev,
1779                                                 state->client,
1780                                                 state->pnn_list, state->count,
1781                                                 TIMEOUT(), &request);
1782         if (tevent_req_nomem(subreq, req)) {
1783                 return;
1784         }
1785         tevent_req_set_callback(subreq, recover_db_transaction_committed, req);
1786 }
1787
1788 static void recover_db_transaction_committed(struct tevent_req *subreq)
1789 {
1790         struct tevent_req *req = tevent_req_callback_data(
1791                 subreq, struct tevent_req);
1792         struct recover_db_state *state = tevent_req_data(
1793                 req, struct recover_db_state);
1794         struct ctdb_req_control request;
1795         int *err_list;
1796         int ret;
1797         bool status;
1798
1799         status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
1800                                                 NULL);
1801         TALLOC_FREE(subreq);
1802         if (! status) {
1803                 int ret2;
1804                 uint32_t pnn;
1805
1806                 ret2 = ctdb_client_control_multi_error(state->pnn_list,
1807                                                        state->count,
1808                                                        err_list, &pnn);
1809                 if (ret2 != 0) {
1810                         D_ERR("control DB_TRANSACTION_COMMIT failed for db %s"
1811                               " on node %u, ret=%d\n",
1812                               state->db_name, pnn, ret2);
1813                 } else {
1814                         D_ERR("control DB_TRANSACTION_COMMIT failed for db %s,"
1815                               " ret=%d\n", state->db_name, ret);
1816                 }
1817                 tevent_req_error(req, ret);
1818                 return;
1819         }
1820
1821         ctdb_req_control_db_thaw(&request, state->db_id);
1822         subreq = ctdb_client_control_multi_send(state, state->ev,
1823                                                 state->client,
1824                                                 state->pnn_list, state->count,
1825                                                 TIMEOUT(), &request);
1826         if (tevent_req_nomem(subreq, req)) {
1827                 return;
1828         }
1829         tevent_req_set_callback(subreq, recover_db_thaw_done, req);
1830 }
1831
1832 static void recover_db_thaw_done(struct tevent_req *subreq)
1833 {
1834         struct tevent_req *req = tevent_req_callback_data(
1835                 subreq, struct tevent_req);
1836         struct recover_db_state *state = tevent_req_data(
1837                 req, struct recover_db_state);
1838         int *err_list;
1839         int ret;
1840         bool status;
1841
1842         status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
1843                                                 NULL);
1844         TALLOC_FREE(subreq);
1845         if (! status) {
1846                 int ret2;
1847                 uint32_t pnn;
1848
1849                 ret2 = ctdb_client_control_multi_error(state->pnn_list,
1850                                                        state->count,
1851                                                        err_list, &pnn);
1852                 if (ret2 != 0) {
1853                         D_ERR("control DB_THAW failed for db %s on node %u,"
1854                               " ret=%d\n", state->db_name, pnn, ret2);
1855                 } else {
1856                         D_ERR("control DB_THAW failed for db %s, ret=%d\n",
1857                               state->db_name, ret);
1858                 }
1859                 tevent_req_error(req, ret);
1860                 return;
1861         }
1862
1863         tevent_req_done(req);
1864 }
1865
1866 static bool recover_db_recv(struct tevent_req *req)
1867 {
1868         return generic_recv(req, NULL);
1869 }
1870
1871
1872 /*
1873  * Start database recovery for each database
1874  *
1875  * Try to recover each database 5 times before failing recovery.
1876  */
1877
1878 struct db_recovery_state {
1879         struct tevent_context *ev;
1880         struct ctdb_dbid_map *dbmap;
1881         int num_replies;
1882         int num_failed;
1883 };
1884
1885 struct db_recovery_one_state {
1886         struct tevent_req *req;
1887         struct ctdb_client_context *client;
1888         struct ctdb_dbid_map *dbmap;
1889         struct ctdb_tunable_list *tun_list;
1890         uint32_t *pnn_list;
1891         int count;
1892         uint32_t *caps;
1893         uint32_t *ban_credits;
1894         uint32_t generation;
1895         uint32_t db_id;
1896         bool persistent;
1897         int num_fails;
1898 };
1899
1900 static void db_recovery_one_done(struct tevent_req *subreq);
1901
1902 static struct tevent_req *db_recovery_send(TALLOC_CTX *mem_ctx,
1903                                            struct tevent_context *ev,
1904                                            struct ctdb_client_context *client,
1905                                            struct ctdb_dbid_map *dbmap,
1906                                            struct ctdb_tunable_list *tun_list,
1907                                            uint32_t *pnn_list, int count,
1908                                            uint32_t *caps,
1909                                            uint32_t *ban_credits,
1910                                            uint32_t generation)
1911 {
1912         struct tevent_req *req, *subreq;
1913         struct db_recovery_state *state;
1914         int i;
1915
1916         req = tevent_req_create(mem_ctx, &state, struct db_recovery_state);
1917         if (req == NULL) {
1918                 return NULL;
1919         }
1920
1921         state->ev = ev;
1922         state->dbmap = dbmap;
1923         state->num_replies = 0;
1924         state->num_failed = 0;
1925
1926         if (dbmap->num == 0) {
1927                 tevent_req_done(req);
1928                 return tevent_req_post(req, ev);
1929         }
1930
1931         for (i=0; i<dbmap->num; i++) {
1932                 struct db_recovery_one_state *substate;
1933
1934                 substate = talloc_zero(state, struct db_recovery_one_state);
1935                 if (tevent_req_nomem(substate, req)) {
1936                         return tevent_req_post(req, ev);
1937                 }
1938
1939                 substate->req = req;
1940                 substate->client = client;
1941                 substate->dbmap = dbmap;
1942                 substate->tun_list = tun_list;
1943                 substate->pnn_list = pnn_list;
1944                 substate->count = count;
1945                 substate->caps = caps;
1946                 substate->ban_credits = ban_credits;
1947                 substate->generation = generation;
1948                 substate->db_id = dbmap->dbs[i].db_id;
1949                 substate->persistent = dbmap->dbs[i].flags &
1950                                        CTDB_DB_FLAGS_PERSISTENT;
1951
1952                 subreq = recover_db_send(state, ev, client, tun_list,
1953                                          pnn_list, count, caps, ban_credits,
1954                                          generation, substate->db_id,
1955                                          substate->persistent);
1956                 if (tevent_req_nomem(subreq, req)) {
1957                         return tevent_req_post(req, ev);
1958                 }
1959                 tevent_req_set_callback(subreq, db_recovery_one_done,
1960                                         substate);
1961                 D_NOTICE("recover database 0x%08x\n", substate->db_id);
1962         }
1963
1964         return req;
1965 }
1966
1967 static void db_recovery_one_done(struct tevent_req *subreq)
1968 {
1969         struct db_recovery_one_state *substate = tevent_req_callback_data(
1970                 subreq, struct db_recovery_one_state);
1971         struct tevent_req *req = substate->req;
1972         struct db_recovery_state *state = tevent_req_data(
1973                 req, struct db_recovery_state);
1974         bool status;
1975
1976         status = recover_db_recv(subreq);
1977         TALLOC_FREE(subreq);
1978
1979         if (status) {
1980                 talloc_free(substate);
1981                 goto done;
1982         }
1983
1984         substate->num_fails += 1;
1985         if (substate->num_fails < NUM_RETRIES) {
1986                 subreq = recover_db_send(state, state->ev, substate->client,
1987                                          substate->tun_list,
1988                                          substate->pnn_list, substate->count,
1989                                          substate->caps, substate->ban_credits,
1990                                          substate->generation, substate->db_id,
1991                                          substate->persistent);
1992                 if (tevent_req_nomem(subreq, req)) {
1993                         goto failed;
1994                 }
1995                 tevent_req_set_callback(subreq, db_recovery_one_done, substate);
1996                 D_NOTICE("recover database 0x%08x, attempt %d\n",
1997                          substate->db_id, substate->num_fails+1);
1998                 return;
1999         }
2000
2001 failed:
2002         state->num_failed += 1;
2003
2004 done:
2005         state->num_replies += 1;
2006
2007         if (state->num_replies == state->dbmap->num) {
2008                 tevent_req_done(req);
2009         }
2010 }
2011
2012 static bool db_recovery_recv(struct tevent_req *req, int *count)
2013 {
2014         struct db_recovery_state *state = tevent_req_data(
2015                 req, struct db_recovery_state);
2016         int err;
2017
2018         if (tevent_req_is_unix_error(req, &err)) {
2019                 *count = 0;
2020                 return false;
2021         }
2022
2023         *count = state->num_replies - state->num_failed;
2024
2025         if (state->num_failed > 0) {
2026                 return false;
2027         }
2028
2029         return true;
2030 }
2031
2032
2033 /*
2034  * Run the parallel database recovery
2035  *
2036  * - Get tunables
2037  * - Get nodemap
2038  * - Get vnnmap
2039  * - Get capabilities from all nodes
2040  * - Get dbmap
2041  * - Set RECOVERY_ACTIVE
2042  * - Send START_RECOVERY
2043  * - Update vnnmap on all nodes
2044  * - Run database recovery
2045  * - Set RECOVERY_NORMAL
2046  * - Send END_RECOVERY
2047  */
2048
2049 struct recovery_state {
2050         struct tevent_context *ev;
2051         struct ctdb_client_context *client;
2052         uint32_t generation;
2053         uint32_t *pnn_list;
2054         int count;
2055         uint32_t destnode;
2056         struct ctdb_node_map *nodemap;
2057         uint32_t *caps;
2058         uint32_t *ban_credits;
2059         struct ctdb_tunable_list *tun_list;
2060         struct ctdb_vnn_map *vnnmap;
2061         struct ctdb_dbid_map *dbmap;
2062 };
2063
2064 static void recovery_tunables_done(struct tevent_req *subreq);
2065 static void recovery_nodemap_done(struct tevent_req *subreq);
2066 static void recovery_vnnmap_done(struct tevent_req *subreq);
2067 static void recovery_capabilities_done(struct tevent_req *subreq);
2068 static void recovery_dbmap_done(struct tevent_req *subreq);
2069 static void recovery_active_done(struct tevent_req *subreq);
2070 static void recovery_start_recovery_done(struct tevent_req *subreq);
2071 static void recovery_vnnmap_update_done(struct tevent_req *subreq);
2072 static void recovery_db_recovery_done(struct tevent_req *subreq);
2073 static void recovery_failed_done(struct tevent_req *subreq);
2074 static void recovery_normal_done(struct tevent_req *subreq);
2075 static void recovery_end_recovery_done(struct tevent_req *subreq);
2076
2077 static struct tevent_req *recovery_send(TALLOC_CTX *mem_ctx,
2078                                         struct tevent_context *ev,
2079                                         struct ctdb_client_context *client,
2080                                         uint32_t generation)
2081 {
2082         struct tevent_req *req, *subreq;
2083         struct recovery_state *state;
2084         struct ctdb_req_control request;
2085
2086         req = tevent_req_create(mem_ctx, &state, struct recovery_state);
2087         if (req == NULL) {
2088                 return NULL;
2089         }
2090
2091         state->ev = ev;
2092         state->client = client;
2093         state->generation = generation;
2094         state->destnode = ctdb_client_pnn(client);
2095
2096         ctdb_req_control_get_all_tunables(&request);
2097         subreq = ctdb_client_control_send(state, state->ev, state->client,
2098                                           state->destnode, TIMEOUT(),
2099                                           &request);
2100         if (tevent_req_nomem(subreq, req)) {
2101                 return tevent_req_post(req, ev);
2102         }
2103         tevent_req_set_callback(subreq, recovery_tunables_done, req);
2104
2105         return req;
2106 }
2107
2108 static void recovery_tunables_done(struct tevent_req *subreq)
2109 {
2110         struct tevent_req *req = tevent_req_callback_data(
2111                 subreq, struct tevent_req);
2112         struct recovery_state *state = tevent_req_data(
2113                 req, struct recovery_state);
2114         struct ctdb_reply_control *reply;
2115         struct ctdb_req_control request;
2116         int ret;
2117         bool status;
2118
2119         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
2120         TALLOC_FREE(subreq);
2121         if (! status) {
2122                 D_ERR("control GET_ALL_TUNABLES failed, ret=%d\n", ret);
2123                 tevent_req_error(req, ret);
2124                 return;
2125         }
2126
2127         ret = ctdb_reply_control_get_all_tunables(reply, state,
2128                                                   &state->tun_list);
2129         if (ret != 0) {
2130                 D_ERR("control GET_ALL_TUNABLES failed, ret=%d\n", ret);
2131                 tevent_req_error(req, EPROTO);
2132                 return;
2133         }
2134
2135         talloc_free(reply);
2136
2137         recover_timeout = state->tun_list->recover_timeout;
2138
2139         ctdb_req_control_get_nodemap(&request);
2140         subreq = ctdb_client_control_send(state, state->ev, state->client,
2141                                           state->destnode, TIMEOUT(),
2142                                           &request);
2143         if (tevent_req_nomem(subreq, req)) {
2144                 return;
2145         }
2146         tevent_req_set_callback(subreq, recovery_nodemap_done, req);
2147 }
2148
2149 static void recovery_nodemap_done(struct tevent_req *subreq)
2150 {
2151         struct tevent_req *req = tevent_req_callback_data(
2152                 subreq, struct tevent_req);
2153         struct recovery_state *state = tevent_req_data(
2154                 req, struct recovery_state);
2155         struct ctdb_reply_control *reply;
2156         struct ctdb_req_control request;
2157         bool status;
2158         int ret;
2159
2160         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
2161         TALLOC_FREE(subreq);
2162         if (! status) {
2163                 D_ERR("control GET_NODEMAP failed to node %u, ret=%d\n",
2164                       state->destnode, ret);
2165                 tevent_req_error(req, ret);
2166                 return;
2167         }
2168
2169         ret = ctdb_reply_control_get_nodemap(reply, state, &state->nodemap);
2170         if (ret != 0) {
2171                 D_ERR("control GET_NODEMAP failed, ret=%d\n", ret);
2172                 tevent_req_error(req, ret);
2173                 return;
2174         }
2175
2176         state->count = list_of_active_nodes(state->nodemap, CTDB_UNKNOWN_PNN,
2177                                             state, &state->pnn_list);
2178         if (state->count <= 0) {
2179                 tevent_req_error(req, ENOMEM);
2180                 return;
2181         }
2182
2183         state->ban_credits = talloc_zero_array(state, uint32_t,
2184                                                state->nodemap->num);
2185         if (tevent_req_nomem(state->ban_credits, req)) {
2186                 return;
2187         }
2188
2189         ctdb_req_control_getvnnmap(&request);
2190         subreq = ctdb_client_control_send(state, state->ev, state->client,
2191                                           state->destnode, TIMEOUT(),
2192                                           &request);
2193         if (tevent_req_nomem(subreq, req)) {
2194                 return;
2195         }
2196         tevent_req_set_callback(subreq, recovery_vnnmap_done, req);
2197 }
2198
2199 static void recovery_vnnmap_done(struct tevent_req *subreq)
2200 {
2201         struct tevent_req *req = tevent_req_callback_data(
2202                 subreq, struct tevent_req);
2203         struct recovery_state *state = tevent_req_data(
2204                 req, struct recovery_state);
2205         struct ctdb_reply_control *reply;
2206         struct ctdb_req_control request;
2207         bool status;
2208         int ret;
2209
2210         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
2211         TALLOC_FREE(subreq);
2212         if (! status) {
2213                 D_ERR("control GETVNNMAP failed to node %u, ret=%d\n",
2214                       state->destnode, ret);
2215                 tevent_req_error(req, ret);
2216                 return;
2217         }
2218
2219         ret = ctdb_reply_control_getvnnmap(reply, state, &state->vnnmap);
2220         if (ret != 0) {
2221                 D_ERR("control GETVNNMAP failed, ret=%d\n", ret);
2222                 tevent_req_error(req, ret);
2223                 return;
2224         }
2225
2226         ctdb_req_control_get_capabilities(&request);
2227         subreq = ctdb_client_control_multi_send(state, state->ev,
2228                                                 state->client,
2229                                                 state->pnn_list, state->count,
2230                                                 TIMEOUT(), &request);
2231         if (tevent_req_nomem(subreq, req)) {
2232                 return;
2233         }
2234         tevent_req_set_callback(subreq, recovery_capabilities_done, req);
2235 }
2236
2237 static void recovery_capabilities_done(struct tevent_req *subreq)
2238 {
2239         struct tevent_req *req = tevent_req_callback_data(
2240                 subreq, struct tevent_req);
2241         struct recovery_state *state = tevent_req_data(
2242                 req, struct recovery_state);
2243         struct ctdb_reply_control **reply;
2244         struct ctdb_req_control request;
2245         int *err_list;
2246         int ret, i;
2247         bool status;
2248
2249         status = ctdb_client_control_multi_recv(subreq, &ret, state, &err_list,
2250                                                 &reply);
2251         TALLOC_FREE(subreq);
2252         if (! status) {
2253                 int ret2;
2254                 uint32_t pnn;
2255
2256                 ret2 = ctdb_client_control_multi_error(state->pnn_list,
2257                                                        state->count,
2258                                                        err_list, &pnn);
2259                 if (ret2 != 0) {
2260                         D_ERR("control GET_CAPABILITIES failed on node %u,"
2261                               " ret=%d\n", pnn, ret2);
2262                 } else {
2263                         D_ERR("control GET_CAPABILITIES failed, ret=%d\n",
2264                               ret);
2265                 }
2266                 tevent_req_error(req, ret);
2267                 return;
2268         }
2269
2270         /* Make the array size same as nodemap */
2271         state->caps = talloc_zero_array(state, uint32_t,
2272                                         state->nodemap->num);
2273         if (tevent_req_nomem(state->caps, req)) {
2274                 return;
2275         }
2276
2277         for (i=0; i<state->count; i++) {
2278                 uint32_t pnn;
2279
2280                 pnn = state->pnn_list[i];
2281                 ret = ctdb_reply_control_get_capabilities(reply[i],
2282                                                           &state->caps[pnn]);
2283                 if (ret != 0) {
2284                         D_ERR("control GET_CAPABILITIES failed on node %u\n",
2285                               pnn);
2286                         tevent_req_error(req, EPROTO);
2287                         return;
2288                 }
2289         }
2290
2291         talloc_free(reply);
2292
2293         ctdb_req_control_get_dbmap(&request);
2294         subreq = ctdb_client_control_send(state, state->ev, state->client,
2295                                           state->destnode, TIMEOUT(),
2296                                           &request);
2297         if (tevent_req_nomem(subreq, req)) {
2298                 return;
2299         }
2300         tevent_req_set_callback(subreq, recovery_dbmap_done, req);
2301 }
2302
2303 static void recovery_dbmap_done(struct tevent_req *subreq)
2304 {
2305         struct tevent_req *req = tevent_req_callback_data(
2306                 subreq, struct tevent_req);
2307         struct recovery_state *state = tevent_req_data(
2308                 req, struct recovery_state);
2309         struct ctdb_reply_control *reply;
2310         struct ctdb_req_control request;
2311         int ret;
2312         bool status;
2313
2314         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
2315         TALLOC_FREE(subreq);
2316         if (! status) {
2317                 D_ERR("control GET_DBMAP failed to node %u, ret=%d\n",
2318                       state->destnode, ret);
2319                 tevent_req_error(req, ret);
2320                 return;
2321         }
2322
2323         ret = ctdb_reply_control_get_dbmap(reply, state, &state->dbmap);
2324         if (ret != 0) {
2325                 D_ERR("control GET_DBMAP failed, ret=%d\n", ret);
2326                 tevent_req_error(req, ret);
2327                 return;
2328         }
2329
2330         ctdb_req_control_set_recmode(&request, CTDB_RECOVERY_ACTIVE);
2331         subreq = ctdb_client_control_multi_send(state, state->ev,
2332                                                 state->client,
2333                                                 state->pnn_list, state->count,
2334                                                 TIMEOUT(), &request);
2335         if (tevent_req_nomem(subreq, req)) {
2336                 return;
2337         }
2338         tevent_req_set_callback(subreq, recovery_active_done, req);
2339 }
2340
2341 static void recovery_active_done(struct tevent_req *subreq)
2342 {
2343         struct tevent_req *req = tevent_req_callback_data(
2344                 subreq, struct tevent_req);
2345         struct recovery_state *state = tevent_req_data(
2346                 req, struct recovery_state);
2347         struct ctdb_req_control request;
2348         struct ctdb_vnn_map *vnnmap;
2349         int *err_list;
2350         int ret, count, i;
2351         bool status;
2352
2353         status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
2354                                                 NULL);
2355         TALLOC_FREE(subreq);
2356         if (! status) {
2357                 int ret2;
2358                 uint32_t pnn;
2359
2360                 ret2 = ctdb_client_control_multi_error(state->pnn_list,
2361                                                        state->count,
2362                                                        err_list, &pnn);
2363                 if (ret2 != 0) {
2364                         D_ERR("failed to set recovery mode ACTIVE on node %u,"
2365                               " ret=%d\n", pnn, ret2);
2366                 } else {
2367                         D_ERR("failed to set recovery mode ACTIVE, ret=%d\n",
2368                               ret);
2369                 }
2370                 tevent_req_error(req, ret);
2371                 return;
2372         }
2373
2374         D_ERR("Set recovery mode to ACTIVE\n");
2375
2376         /* Calculate new VNNMAP */
2377         count = 0;
2378         for (i=0; i<state->nodemap->num; i++) {
2379                 if (state->nodemap->node[i].flags & NODE_FLAGS_INACTIVE) {
2380                         continue;
2381                 }
2382                 if (!(state->caps[i] & CTDB_CAP_LMASTER)) {
2383                         continue;
2384                 }
2385                 count += 1;
2386         }
2387
2388         if (count == 0) {
2389                 D_WARNING("No active lmasters found. Adding recmaster anyway\n");
2390         }
2391
2392         vnnmap = talloc_zero(state, struct ctdb_vnn_map);
2393         if (tevent_req_nomem(vnnmap, req)) {
2394                 return;
2395         }
2396
2397         vnnmap->size = (count == 0 ? 1 : count);
2398         vnnmap->map = talloc_array(vnnmap, uint32_t, vnnmap->size);
2399         if (tevent_req_nomem(vnnmap->map, req)) {
2400                 return;
2401         }
2402
2403         if (count == 0) {
2404                 vnnmap->map[0] = state->destnode;
2405         } else {
2406                 count = 0;
2407                 for (i=0; i<state->nodemap->num; i++) {
2408                         if (state->nodemap->node[i].flags &
2409                             NODE_FLAGS_INACTIVE) {
2410                                 continue;
2411                         }
2412                         if (!(state->caps[i] & CTDB_CAP_LMASTER)) {
2413                                 continue;
2414                         }
2415
2416                         vnnmap->map[count] = state->nodemap->node[i].pnn;
2417                         count += 1;
2418                 }
2419         }
2420
2421         vnnmap->generation = state->generation;
2422
2423         talloc_free(state->vnnmap);
2424         state->vnnmap = vnnmap;
2425
2426         ctdb_req_control_start_recovery(&request);
2427         subreq = ctdb_client_control_multi_send(state, state->ev,
2428                                                 state->client,
2429                                                 state->pnn_list, state->count,
2430                                                 TIMEOUT(), &request);
2431         if (tevent_req_nomem(subreq, req)) {
2432                 return;
2433         }
2434         tevent_req_set_callback(subreq, recovery_start_recovery_done, req);
2435 }
2436
2437 static void recovery_start_recovery_done(struct tevent_req *subreq)
2438 {
2439         struct tevent_req *req = tevent_req_callback_data(
2440                 subreq, struct tevent_req);
2441         struct recovery_state *state = tevent_req_data(
2442                 req, struct recovery_state);
2443         struct ctdb_req_control request;
2444         int *err_list;
2445         int ret;
2446         bool status;
2447
2448         status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
2449                                                 NULL);
2450         TALLOC_FREE(subreq);
2451         if (! status) {
2452                 int ret2;
2453                 uint32_t pnn;
2454
2455                 ret2 = ctdb_client_control_multi_error(state->pnn_list,
2456                                                        state->count,
2457                                                        err_list, &pnn);
2458                 if (ret2 != 0) {
2459                         D_ERR("failed to run start_recovery event on node %u,"
2460                               " ret=%d\n", pnn, ret2);
2461                 } else {
2462                         D_ERR("failed to run start_recovery event, ret=%d\n",
2463                               ret);
2464                 }
2465                 tevent_req_error(req, ret);
2466                 return;
2467         }
2468
2469         D_ERR("start_recovery event finished\n");
2470
2471         ctdb_req_control_setvnnmap(&request, state->vnnmap);
2472         subreq = ctdb_client_control_multi_send(state, state->ev,
2473                                                 state->client,
2474                                                 state->pnn_list, state->count,
2475                                                 TIMEOUT(), &request);
2476         if (tevent_req_nomem(subreq, req)) {
2477                 return;
2478         }
2479         tevent_req_set_callback(subreq, recovery_vnnmap_update_done, req);
2480 }
2481
2482 static void recovery_vnnmap_update_done(struct tevent_req *subreq)
2483 {
2484         struct tevent_req *req = tevent_req_callback_data(
2485                 subreq, struct tevent_req);
2486         struct recovery_state *state = tevent_req_data(
2487                 req, struct recovery_state);
2488         int *err_list;
2489         int ret;
2490         bool status;
2491
2492         status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
2493                                                 NULL);
2494         TALLOC_FREE(subreq);
2495         if (! status) {
2496                 int ret2;
2497                 uint32_t pnn;
2498
2499                 ret2 = ctdb_client_control_multi_error(state->pnn_list,
2500                                                        state->count,
2501                                                        err_list, &pnn);
2502                 if (ret2 != 0) {
2503                         D_ERR("failed to update VNNMAP on node %u, ret=%d\n",
2504                               pnn, ret2);
2505                 } else {
2506                         D_ERR("failed to update VNNMAP, ret=%d\n", ret);
2507                 }
2508                 tevent_req_error(req, ret);
2509                 return;
2510         }
2511
2512         D_NOTICE("updated VNNMAP\n");
2513
2514         subreq = db_recovery_send(state, state->ev, state->client,
2515                                   state->dbmap, state->tun_list,
2516                                   state->pnn_list, state->count,
2517                                   state->caps, state->ban_credits,
2518                                   state->vnnmap->generation);
2519         if (tevent_req_nomem(subreq, req)) {
2520                 return;
2521         }
2522         tevent_req_set_callback(subreq, recovery_db_recovery_done, req);
2523 }
2524
2525 static void recovery_db_recovery_done(struct tevent_req *subreq)
2526 {
2527         struct tevent_req *req = tevent_req_callback_data(
2528                 subreq, struct tevent_req);
2529         struct recovery_state *state = tevent_req_data(
2530                 req, struct recovery_state);
2531         struct ctdb_req_control request;
2532         bool status;
2533         int count;
2534
2535         status = db_recovery_recv(subreq, &count);
2536         TALLOC_FREE(subreq);
2537
2538         D_ERR("%d of %d databases recovered\n", count, state->dbmap->num);
2539
2540         if (! status) {
2541                 uint32_t max_pnn = CTDB_UNKNOWN_PNN, max_credits = 0;
2542                 int i;
2543
2544                 /* Bans are not enabled */
2545                 if (state->tun_list->enable_bans == 0) {
2546                         tevent_req_error(req, EIO);
2547                         return;
2548                 }
2549
2550                 for (i=0; i<state->count; i++) {
2551                         uint32_t pnn;
2552                         pnn = state->pnn_list[i];
2553                         if (state->ban_credits[pnn] > max_credits) {
2554                                 max_pnn = pnn;
2555                                 max_credits = state->ban_credits[pnn];
2556                         }
2557                 }
2558
2559                 /* If pulling database fails multiple times */
2560                 if (max_credits >= NUM_RETRIES) {
2561                         struct ctdb_req_message message;
2562
2563                         D_ERR("Assigning banning credits to node %u\n",
2564                               max_pnn);
2565
2566                         message.srvid = CTDB_SRVID_BANNING;
2567                         message.data.pnn = max_pnn;
2568
2569                         subreq = ctdb_client_message_send(
2570                                         state, state->ev, state->client,
2571                                         ctdb_client_pnn(state->client),
2572                                         &message);
2573                         if (tevent_req_nomem(subreq, req)) {
2574                                 return;
2575                         }
2576                         tevent_req_set_callback(subreq, recovery_failed_done,
2577                                                 req);
2578                 } else {
2579                         tevent_req_error(req, EIO);
2580                 }
2581                 return;
2582         }
2583
2584         ctdb_req_control_set_recmode(&request, CTDB_RECOVERY_NORMAL);
2585         subreq = ctdb_client_control_multi_send(state, state->ev,
2586                                                 state->client,
2587                                                 state->pnn_list, state->count,
2588                                                 TIMEOUT(), &request);
2589         if (tevent_req_nomem(subreq, req)) {
2590                 return;
2591         }
2592         tevent_req_set_callback(subreq, recovery_normal_done, req);
2593 }
2594
2595 static void recovery_failed_done(struct tevent_req *subreq)
2596 {
2597         struct tevent_req *req = tevent_req_callback_data(
2598                 subreq, struct tevent_req);
2599         int ret;
2600         bool status;
2601
2602         status = ctdb_client_message_recv(subreq, &ret);
2603         TALLOC_FREE(subreq);
2604         if (! status) {
2605                 D_ERR("failed to assign banning credits, ret=%d\n", ret);
2606         }
2607
2608         tevent_req_error(req, EIO);
2609 }
2610
2611 static void recovery_normal_done(struct tevent_req *subreq)
2612 {
2613         struct tevent_req *req = tevent_req_callback_data(
2614                 subreq, struct tevent_req);
2615         struct recovery_state *state = tevent_req_data(
2616                 req, struct recovery_state);
2617         struct ctdb_req_control request;
2618         int *err_list;
2619         int ret;
2620         bool status;
2621
2622         status = ctdb_client_control_multi_recv(subreq, &ret, state, &err_list,
2623                                                 NULL);
2624         TALLOC_FREE(subreq);
2625         if (! status) {
2626                 int ret2;
2627                 uint32_t pnn;
2628
2629                 ret2 = ctdb_client_control_multi_error(state->pnn_list,
2630                                                        state->count,
2631                                                        err_list, &pnn);
2632                 if (ret2 != 0) {
2633                         D_ERR("failed to set recovery mode NORMAL on node %u,"
2634                               " ret=%d\n", pnn, ret2);
2635                 } else {
2636                         D_ERR("failed to set recovery mode NORMAL, ret=%d\n",
2637                               ret);
2638                 }
2639                 tevent_req_error(req, ret);
2640                 return;
2641         }
2642
2643         D_ERR("Set recovery mode to NORMAL\n");
2644
2645         ctdb_req_control_end_recovery(&request);
2646         subreq = ctdb_client_control_multi_send(state, state->ev,
2647                                                 state->client,
2648                                                 state->pnn_list, state->count,
2649                                                 TIMEOUT(), &request);
2650         if (tevent_req_nomem(subreq, req)) {
2651                 return;
2652         }
2653         tevent_req_set_callback(subreq, recovery_end_recovery_done, req);
2654 }
2655
2656 static void recovery_end_recovery_done(struct tevent_req *subreq)
2657 {
2658         struct tevent_req *req = tevent_req_callback_data(
2659                 subreq, struct tevent_req);
2660         struct recovery_state *state = tevent_req_data(
2661                 req, struct recovery_state);
2662         int *err_list;
2663         int ret;
2664         bool status;
2665
2666         status = ctdb_client_control_multi_recv(subreq, &ret, state, &err_list,
2667                                                 NULL);
2668         TALLOC_FREE(subreq);
2669         if (! status) {
2670                 int ret2;
2671                 uint32_t pnn;
2672
2673                 ret2 = ctdb_client_control_multi_error(state->pnn_list,
2674                                                        state->count,
2675                                                        err_list, &pnn);
2676                 if (ret2 != 0) {
2677                         D_ERR("failed to run recovered event on node %u,"
2678                               " ret=%d\n", pnn, ret2);
2679                 } else {
2680                         D_ERR("failed to run recovered event, ret=%d\n", ret);
2681                 }
2682                 tevent_req_error(req, ret);
2683                 return;
2684         }
2685
2686         D_ERR("recovered event finished\n");
2687
2688         tevent_req_done(req);
2689 }
2690
2691 static void recovery_recv(struct tevent_req *req, int *perr)
2692 {
2693         generic_recv(req, perr);
2694 }
2695
2696 static void usage(const char *progname)
2697 {
2698         fprintf(stderr, "\nUsage: %s <output-fd> <ctdb-socket-path> <generation>\n",
2699                 progname);
2700 }
2701
2702
2703 /*
2704  * Arguments - log fd, write fd, socket path, generation
2705  */
2706 int main(int argc, char *argv[])
2707 {
2708         int write_fd;
2709         const char *sockpath;
2710         TALLOC_CTX *mem_ctx;
2711         struct tevent_context *ev;
2712         struct ctdb_client_context *client;
2713         int ret;
2714         struct tevent_req *req;
2715         uint32_t generation;
2716
2717         if (argc != 4) {
2718                 usage(argv[0]);
2719                 exit(1);
2720         }
2721
2722         write_fd = atoi(argv[1]);
2723         sockpath = argv[2];
2724         generation = (uint32_t)strtoul(argv[3], NULL, 0);
2725
2726         mem_ctx = talloc_new(NULL);
2727         if (mem_ctx == NULL) {
2728                 fprintf(stderr, "recovery: talloc_new() failed\n");
2729                 goto failed;
2730         }
2731
2732         ret = logging_init(mem_ctx, NULL, NULL, "ctdb-recovery");
2733         if (ret != 0) {
2734                 fprintf(stderr, "recovery: Unable to initialize logging\n");
2735                 goto failed;
2736         }
2737
2738         ev = tevent_context_init(mem_ctx);
2739         if (ev == NULL) {
2740                 D_ERR("tevent_context_init() failed\n");
2741                 goto failed;
2742         }
2743
2744         ret = ctdb_client_init(mem_ctx, ev, sockpath, &client);
2745         if (ret != 0) {
2746                 D_ERR("ctdb_client_init() failed, ret=%d\n", ret);
2747                 goto failed;
2748         }
2749
2750         req = recovery_send(mem_ctx, ev, client, generation);
2751         if (req == NULL) {
2752                 D_ERR("database_recover_send() failed\n");
2753                 goto failed;
2754         }
2755
2756         if (! tevent_req_poll(req, ev)) {
2757                 D_ERR("tevent_req_poll() failed\n");
2758                 goto failed;
2759         }
2760
2761         recovery_recv(req, &ret);
2762         TALLOC_FREE(req);
2763         if (ret != 0) {
2764                 D_ERR("database recovery failed, ret=%d\n", ret);
2765                 goto failed;
2766         }
2767
2768         sys_write(write_fd, &ret, sizeof(ret));
2769         return 0;
2770
2771 failed:
2772         TALLOC_FREE(mem_ctx);
2773         return 1;
2774 }