ctdb-protocol: Fix marshalling for ctdb_rec_buffer
[vlendec/samba-autobuild/.git] / ctdb / server / ctdb_recovery_helper.c
1 /*
2    ctdb parallel database recovery
3
4    Copyright (C) Amitay Isaacs  2015
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "replace.h"
21 #include "system/network.h"
22 #include "system/filesys.h"
23
24 #include <talloc.h>
25 #include <tevent.h>
26 #include <tdb.h>
27 #include <libgen.h>
28
29 #include "lib/tdb_wrap/tdb_wrap.h"
30 #include "lib/util/sys_rw.h"
31 #include "lib/util/time.h"
32 #include "lib/util/tevent_unix.h"
33
34 #include "protocol/protocol.h"
35 #include "protocol/protocol_api.h"
36 #include "client/client.h"
37
38 #include "common/logging.h"
39
40 static int recover_timeout = 30;
41
42 #define NUM_RETRIES     3
43
44 #define TIMEOUT()       timeval_current_ofs(recover_timeout, 0)
45
46 /*
47  * Utility functions
48  */
49
50 static bool generic_recv(struct tevent_req *req, int *perr)
51 {
52         int err;
53
54         if (tevent_req_is_unix_error(req, &err)) {
55                 if (perr != NULL) {
56                         *perr = err;
57                 }
58                 return false;
59         }
60
61         return true;
62 }
63
64 static uint64_t rec_srvid = CTDB_SRVID_RECOVERY;
65
66 static uint64_t srvid_next(void)
67 {
68         rec_srvid += 1;
69         return rec_srvid;
70 }
71
72 /*
73  * Recovery database functions
74  */
75
76 struct recdb_context {
77         uint32_t db_id;
78         const char *db_name;
79         const char *db_path;
80         struct tdb_wrap *db;
81         bool persistent;
82 };
83
84 static struct recdb_context *recdb_create(TALLOC_CTX *mem_ctx, uint32_t db_id,
85                                           const char *db_name,
86                                           const char *db_path,
87                                           uint32_t hash_size, bool persistent)
88 {
89         static char *db_dir_state = NULL;
90         struct recdb_context *recdb;
91         unsigned int tdb_flags;
92
93         recdb = talloc(mem_ctx, struct recdb_context);
94         if (recdb == NULL) {
95                 return NULL;
96         }
97
98         if (db_dir_state == NULL) {
99                 db_dir_state = getenv("CTDB_DBDIR_STATE");
100         }
101
102         recdb->db_name = db_name;
103         recdb->db_id = db_id;
104         recdb->db_path = talloc_asprintf(recdb, "%s/recdb.%s",
105                                          db_dir_state != NULL ?
106                                             db_dir_state :
107                                             dirname(discard_const(db_path)),
108                                          db_name);
109         if (recdb->db_path == NULL) {
110                 talloc_free(recdb);
111                 return NULL;
112         }
113         unlink(recdb->db_path);
114
115         tdb_flags = TDB_NOLOCK | TDB_INCOMPATIBLE_HASH | TDB_DISALLOW_NESTING;
116         recdb->db = tdb_wrap_open(mem_ctx, recdb->db_path, hash_size,
117                                   tdb_flags, O_RDWR|O_CREAT|O_EXCL, 0600);
118         if (recdb->db == NULL) {
119                 talloc_free(recdb);
120                 D_ERR("failed to create recovery db %s\n", recdb->db_path);
121                 return NULL;
122         }
123
124         recdb->persistent = persistent;
125
126         return recdb;
127 }
128
129 static uint32_t recdb_id(struct recdb_context *recdb)
130 {
131         return recdb->db_id;
132 }
133
134 static const char *recdb_name(struct recdb_context *recdb)
135 {
136         return recdb->db_name;
137 }
138
139 static const char *recdb_path(struct recdb_context *recdb)
140 {
141         return recdb->db_path;
142 }
143
144 static struct tdb_context *recdb_tdb(struct recdb_context *recdb)
145 {
146         return recdb->db->tdb;
147 }
148
149 static bool recdb_persistent(struct recdb_context *recdb)
150 {
151         return recdb->persistent;
152 }
153
154 struct recdb_add_traverse_state {
155         struct recdb_context *recdb;
156         int mypnn;
157 };
158
159 static int recdb_add_traverse(uint32_t reqid, struct ctdb_ltdb_header *header,
160                               TDB_DATA key, TDB_DATA data,
161                               void *private_data)
162 {
163         struct recdb_add_traverse_state *state =
164                 (struct recdb_add_traverse_state *)private_data;
165         struct ctdb_ltdb_header *hdr;
166         TDB_DATA prev_data;
167         int ret;
168
169         /* header is not marshalled separately in the pulldb control */
170         if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
171                 return -1;
172         }
173
174         hdr = (struct ctdb_ltdb_header *)data.dptr;
175
176         /* fetch the existing record, if any */
177         prev_data = tdb_fetch(recdb_tdb(state->recdb), key);
178
179         if (prev_data.dptr != NULL) {
180                 struct ctdb_ltdb_header prev_hdr;
181
182                 prev_hdr = *(struct ctdb_ltdb_header *)prev_data.dptr;
183                 free(prev_data.dptr);
184                 if (hdr->rsn < prev_hdr.rsn ||
185                     (hdr->rsn == prev_hdr.rsn &&
186                      prev_hdr.dmaster != state->mypnn)) {
187                         return 0;
188                 }
189         }
190
191         ret = tdb_store(recdb_tdb(state->recdb), key, data, TDB_REPLACE);
192         if (ret != 0) {
193                 return -1;
194         }
195         return 0;
196 }
197
198 static bool recdb_add(struct recdb_context *recdb, int mypnn,
199                       struct ctdb_rec_buffer *recbuf)
200 {
201         struct recdb_add_traverse_state state;
202         int ret;
203
204         state.recdb = recdb;
205         state.mypnn = mypnn;
206
207         ret = ctdb_rec_buffer_traverse(recbuf, recdb_add_traverse, &state);
208         if (ret != 0) {
209                 return false;
210         }
211
212         return true;
213 }
214
215 /* This function decides which records from recdb are retained */
216 static int recbuf_filter_add(struct ctdb_rec_buffer *recbuf, bool persistent,
217                              uint32_t reqid, uint32_t dmaster,
218                              TDB_DATA key, TDB_DATA data)
219 {
220         struct ctdb_ltdb_header *header;
221         int ret;
222
223         /* Skip empty records */
224         if (data.dsize <= sizeof(struct ctdb_ltdb_header)) {
225                 return 0;
226         }
227
228         /* update the dmaster field to point to us */
229         header = (struct ctdb_ltdb_header *)data.dptr;
230         if (!persistent) {
231                 header->dmaster = dmaster;
232                 header->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
233         }
234
235         ret = ctdb_rec_buffer_add(recbuf, recbuf, reqid, NULL, key, data);
236         if (ret != 0) {
237                 return ret;
238         }
239
240         return 0;
241 }
242
243 struct recdb_records_traverse_state {
244         struct ctdb_rec_buffer *recbuf;
245         uint32_t dmaster;
246         uint32_t reqid;
247         bool persistent;
248         bool failed;
249 };
250
251 static int recdb_records_traverse(struct tdb_context *tdb,
252                                   TDB_DATA key, TDB_DATA data,
253                                   void *private_data)
254 {
255         struct recdb_records_traverse_state *state =
256                 (struct recdb_records_traverse_state *)private_data;
257         int ret;
258
259         ret = recbuf_filter_add(state->recbuf, state->persistent,
260                                 state->reqid, state->dmaster, key, data);
261         if (ret != 0) {
262                 state->failed = true;
263                 return ret;
264         }
265
266         return 0;
267 }
268
269 static struct ctdb_rec_buffer *recdb_records(struct recdb_context *recdb,
270                                              TALLOC_CTX *mem_ctx,
271                                              uint32_t dmaster)
272 {
273         struct recdb_records_traverse_state state;
274         int ret;
275
276         state.recbuf = ctdb_rec_buffer_init(mem_ctx, recdb_id(recdb));
277         if (state.recbuf == NULL) {
278                 return NULL;
279         }
280         state.dmaster = dmaster;
281         state.reqid = 0;
282         state.persistent = recdb_persistent(recdb);
283         state.failed = false;
284
285         ret = tdb_traverse_read(recdb_tdb(recdb), recdb_records_traverse,
286                                 &state);
287         if (ret == -1 || state.failed) {
288                 D_ERR("Failed to marshall recovery records for %s\n",
289                       recdb_name(recdb));
290                 TALLOC_FREE(state.recbuf);
291                 return NULL;
292         }
293
294         return state.recbuf;
295 }
296
297 struct recdb_file_traverse_state {
298         struct ctdb_rec_buffer *recbuf;
299         struct recdb_context *recdb;
300         TALLOC_CTX *mem_ctx;
301         uint32_t dmaster;
302         uint32_t reqid;
303         bool persistent;
304         bool failed;
305         int fd;
306         int max_size;
307         int num_buffers;
308 };
309
310 static int recdb_file_traverse(struct tdb_context *tdb,
311                                TDB_DATA key, TDB_DATA data,
312                                void *private_data)
313 {
314         struct recdb_file_traverse_state *state =
315                 (struct recdb_file_traverse_state *)private_data;
316         int ret;
317
318         ret = recbuf_filter_add(state->recbuf, state->persistent,
319                                 state->reqid, state->dmaster, key, data);
320         if (ret != 0) {
321                 state->failed = true;
322                 return ret;
323         }
324
325         if (ctdb_rec_buffer_len(state->recbuf) > state->max_size) {
326                 ret = ctdb_rec_buffer_write(state->recbuf, state->fd);
327                 if (ret != 0) {
328                         D_ERR("Failed to collect recovery records for %s\n",
329                               recdb_name(state->recdb));
330                         state->failed = true;
331                         return ret;
332                 }
333
334                 state->num_buffers += 1;
335
336                 TALLOC_FREE(state->recbuf);
337                 state->recbuf = ctdb_rec_buffer_init(state->mem_ctx,
338                                                      recdb_id(state->recdb));
339                 if (state->recbuf == NULL) {
340                         state->failed = true;
341                         return ENOMEM;
342                 }
343         }
344
345         return 0;
346 }
347
348 static int recdb_file(struct recdb_context *recdb, TALLOC_CTX *mem_ctx,
349                       uint32_t dmaster, int fd, int max_size)
350 {
351         struct recdb_file_traverse_state state;
352         int ret;
353
354         state.recbuf = ctdb_rec_buffer_init(mem_ctx, recdb_id(recdb));
355         if (state.recbuf == NULL) {
356                 return -1;
357         }
358         state.recdb = recdb;
359         state.mem_ctx = mem_ctx;
360         state.dmaster = dmaster;
361         state.reqid = 0;
362         state.persistent = recdb_persistent(recdb);
363         state.failed = false;
364         state.fd = fd;
365         state.max_size = max_size;
366         state.num_buffers = 0;
367
368         ret = tdb_traverse_read(recdb_tdb(recdb), recdb_file_traverse, &state);
369         if (ret == -1 || state.failed) {
370                 TALLOC_FREE(state.recbuf);
371                 return -1;
372         }
373
374         ret = ctdb_rec_buffer_write(state.recbuf, fd);
375         if (ret != 0) {
376                 D_ERR("Failed to collect recovery records for %s\n",
377                       recdb_name(recdb));
378                 TALLOC_FREE(state.recbuf);
379                 return -1;
380         }
381         state.num_buffers += 1;
382
383         D_DEBUG("Wrote %d buffers of recovery records for %s\n",
384                 state.num_buffers, recdb_name(recdb));
385
386         return state.num_buffers;
387 }
388
389 /*
390  * Pull database from a single node
391  */
392
393 struct pull_database_state {
394         struct tevent_context *ev;
395         struct ctdb_client_context *client;
396         struct recdb_context *recdb;
397         uint32_t pnn;
398         uint64_t srvid;
399         int num_records;
400 };
401
402 static void pull_database_handler(uint64_t srvid, TDB_DATA data,
403                                   void *private_data);
404 static void pull_database_register_done(struct tevent_req *subreq);
405 static void pull_database_old_done(struct tevent_req *subreq);
406 static void pull_database_unregister_done(struct tevent_req *subreq);
407 static void pull_database_new_done(struct tevent_req *subreq);
408
409 static struct tevent_req *pull_database_send(
410                         TALLOC_CTX *mem_ctx,
411                         struct tevent_context *ev,
412                         struct ctdb_client_context *client,
413                         uint32_t pnn, uint32_t caps,
414                         struct recdb_context *recdb)
415 {
416         struct tevent_req *req, *subreq;
417         struct pull_database_state *state;
418         struct ctdb_req_control request;
419
420         req = tevent_req_create(mem_ctx, &state, struct pull_database_state);
421         if (req == NULL) {
422                 return NULL;
423         }
424
425         state->ev = ev;
426         state->client = client;
427         state->recdb = recdb;
428         state->pnn = pnn;
429         state->srvid = srvid_next();
430
431         if (caps & CTDB_CAP_FRAGMENTED_CONTROLS) {
432                 subreq = ctdb_client_set_message_handler_send(
433                                         state, state->ev, state->client,
434                                         state->srvid, pull_database_handler,
435                                         req);
436                 if (tevent_req_nomem(subreq, req)) {
437                         return tevent_req_post(req, ev);
438                 }
439
440                 tevent_req_set_callback(subreq, pull_database_register_done,
441                                         req);
442
443         } else {
444                 struct ctdb_pulldb pulldb;
445
446                 pulldb.db_id = recdb_id(recdb);
447                 pulldb.lmaster = CTDB_LMASTER_ANY;
448
449                 ctdb_req_control_pull_db(&request, &pulldb);
450                 subreq = ctdb_client_control_send(state, state->ev,
451                                                   state->client,
452                                                   pnn, TIMEOUT(),
453                                                   &request);
454                 if (tevent_req_nomem(subreq, req)) {
455                         return tevent_req_post(req, ev);
456                 }
457                 tevent_req_set_callback(subreq, pull_database_old_done, req);
458         }
459
460         return req;
461 }
462
463 static void pull_database_handler(uint64_t srvid, TDB_DATA data,
464                                   void *private_data)
465 {
466         struct tevent_req *req = talloc_get_type_abort(
467                 private_data, struct tevent_req);
468         struct pull_database_state *state = tevent_req_data(
469                 req, struct pull_database_state);
470         struct ctdb_rec_buffer *recbuf;
471         size_t np;
472         int ret;
473         bool status;
474
475         if (srvid != state->srvid) {
476                 return;
477         }
478
479         ret = ctdb_rec_buffer_pull(data.dptr, data.dsize, state, &recbuf, &np);
480         if (ret != 0) {
481                 D_ERR("Invalid data received for DB_PULL messages\n");
482                 return;
483         }
484
485         if (recbuf->db_id != recdb_id(state->recdb)) {
486                 talloc_free(recbuf);
487                 D_ERR("Invalid dbid:%08x for DB_PULL messages for %s\n",
488                       recbuf->db_id, recdb_name(state->recdb));
489                 return;
490         }
491
492         status = recdb_add(state->recdb, ctdb_client_pnn(state->client),
493                            recbuf);
494         if (! status) {
495                 talloc_free(recbuf);
496                 D_ERR("Failed to add records to recdb for %s\n",
497                       recdb_name(state->recdb));
498                 return;
499         }
500
501         state->num_records += recbuf->count;
502         talloc_free(recbuf);
503 }
504
505 static void pull_database_register_done(struct tevent_req *subreq)
506 {
507         struct tevent_req *req = tevent_req_callback_data(
508                 subreq, struct tevent_req);
509         struct pull_database_state *state = tevent_req_data(
510                 req, struct pull_database_state);
511         struct ctdb_req_control request;
512         struct ctdb_pulldb_ext pulldb_ext;
513         int ret;
514         bool status;
515
516         status = ctdb_client_set_message_handler_recv(subreq, &ret);
517         TALLOC_FREE(subreq);
518         if (! status) {
519                 D_ERR("Failed to set message handler for DB_PULL for %s\n",
520                       recdb_name(state->recdb));
521                 tevent_req_error(req, ret);
522                 return;
523         }
524
525         pulldb_ext.db_id = recdb_id(state->recdb);
526         pulldb_ext.lmaster = CTDB_LMASTER_ANY;
527         pulldb_ext.srvid = state->srvid;
528
529         ctdb_req_control_db_pull(&request, &pulldb_ext);
530         subreq = ctdb_client_control_send(state, state->ev, state->client,
531                                           state->pnn, TIMEOUT(), &request);
532         if (tevent_req_nomem(subreq, req)) {
533                 return;
534         }
535         tevent_req_set_callback(subreq, pull_database_new_done, req);
536 }
537
538 static void pull_database_old_done(struct tevent_req *subreq)
539 {
540         struct tevent_req *req = tevent_req_callback_data(
541                 subreq, struct tevent_req);
542         struct pull_database_state *state = tevent_req_data(
543                 req, struct pull_database_state);
544         struct ctdb_reply_control *reply;
545         struct ctdb_rec_buffer *recbuf;
546         int ret;
547         bool status;
548
549         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
550         TALLOC_FREE(subreq);
551         if (! status) {
552                 D_ERR("control PULL_DB failed for %s on node %u, ret=%d\n",
553                       recdb_name(state->recdb), state->pnn, ret);
554                 tevent_req_error(req, ret);
555                 return;
556         }
557
558         ret = ctdb_reply_control_pull_db(reply, state, &recbuf);
559         talloc_free(reply);
560         if (ret != 0) {
561                 tevent_req_error(req, ret);
562                 return;
563         }
564
565         status = recdb_add(state->recdb, ctdb_client_pnn(state->client),
566                            recbuf);
567         if (! status) {
568                 talloc_free(recbuf);
569                 tevent_req_error(req, EIO);
570                 return;
571         }
572
573         state->num_records = recbuf->count;
574         talloc_free(recbuf);
575
576         D_INFO("Pulled %d records for db %s from node %d\n",
577                state->num_records, recdb_name(state->recdb), state->pnn);
578
579         tevent_req_done(req);
580 }
581
582 static void pull_database_new_done(struct tevent_req *subreq)
583 {
584         struct tevent_req *req = tevent_req_callback_data(
585                 subreq, struct tevent_req);
586         struct pull_database_state *state = tevent_req_data(
587                 req, struct pull_database_state);
588         struct ctdb_reply_control *reply;
589         uint32_t num_records;
590         int ret;
591         bool status;
592
593         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
594         TALLOC_FREE(subreq);
595         if (! status) {
596                 D_ERR("control DB_PULL failed for %s on node %u, ret=%d\n",
597                       recdb_name(state->recdb), state->pnn, ret);
598                 tevent_req_error(req, ret);
599                 return;
600         }
601
602         ret = ctdb_reply_control_db_pull(reply, &num_records);
603         talloc_free(reply);
604         if (num_records != state->num_records) {
605                 D_ERR("mismatch (%u != %u) in DB_PULL records for db %s\n",
606                       num_records, state->num_records,
607                       recdb_name(state->recdb));
608                 tevent_req_error(req, EIO);
609                 return;
610         }
611
612         D_INFO("Pulled %d records for db %s from node %d\n",
613                state->num_records, recdb_name(state->recdb), state->pnn);
614
615         subreq = ctdb_client_remove_message_handler_send(
616                                         state, state->ev, state->client,
617                                         state->srvid, req);
618         if (tevent_req_nomem(subreq, req)) {
619                 return;
620         }
621         tevent_req_set_callback(subreq, pull_database_unregister_done, req);
622 }
623
624 static void pull_database_unregister_done(struct tevent_req *subreq)
625 {
626         struct tevent_req *req = tevent_req_callback_data(
627                 subreq, struct tevent_req);
628         struct pull_database_state *state = tevent_req_data(
629                 req, struct pull_database_state);
630         int ret;
631         bool status;
632
633         status = ctdb_client_remove_message_handler_recv(subreq, &ret);
634         TALLOC_FREE(subreq);
635         if (! status) {
636                 D_ERR("failed to remove message handler for DB_PULL for db %s\n",
637                       recdb_name(state->recdb));
638                 tevent_req_error(req, ret);
639                 return;
640         }
641
642         tevent_req_done(req);
643 }
644
645 static bool pull_database_recv(struct tevent_req *req, int *perr)
646 {
647         return generic_recv(req, perr);
648 }
649
650 /*
651  * Push database to specified nodes (old style)
652  */
653
654 struct push_database_old_state {
655         struct tevent_context *ev;
656         struct ctdb_client_context *client;
657         struct recdb_context *recdb;
658         uint32_t *pnn_list;
659         int count;
660         struct ctdb_rec_buffer *recbuf;
661         int index;
662 };
663
664 static void push_database_old_push_done(struct tevent_req *subreq);
665
666 static struct tevent_req *push_database_old_send(
667                         TALLOC_CTX *mem_ctx,
668                         struct tevent_context *ev,
669                         struct ctdb_client_context *client,
670                         uint32_t *pnn_list, int count,
671                         struct recdb_context *recdb)
672 {
673         struct tevent_req *req, *subreq;
674         struct push_database_old_state *state;
675         struct ctdb_req_control request;
676         uint32_t pnn;
677
678         req = tevent_req_create(mem_ctx, &state,
679                                 struct push_database_old_state);
680         if (req == NULL) {
681                 return NULL;
682         }
683
684         state->ev = ev;
685         state->client = client;
686         state->recdb = recdb;
687         state->pnn_list = pnn_list;
688         state->count = count;
689         state->index = 0;
690
691         state->recbuf = recdb_records(recdb, state,
692                                       ctdb_client_pnn(client));
693         if (tevent_req_nomem(state->recbuf, req)) {
694                 return tevent_req_post(req, ev);
695         }
696
697         pnn = state->pnn_list[state->index];
698
699         ctdb_req_control_push_db(&request, state->recbuf);
700         subreq = ctdb_client_control_send(state, ev, client, pnn,
701                                           TIMEOUT(), &request);
702         if (tevent_req_nomem(subreq, req)) {
703                 return tevent_req_post(req, ev);
704         }
705         tevent_req_set_callback(subreq, push_database_old_push_done, req);
706
707         return req;
708 }
709
710 static void push_database_old_push_done(struct tevent_req *subreq)
711 {
712         struct tevent_req *req = tevent_req_callback_data(
713                 subreq, struct tevent_req);
714         struct push_database_old_state *state = tevent_req_data(
715                 req, struct push_database_old_state);
716         struct ctdb_req_control request;
717         uint32_t pnn;
718         int ret;
719         bool status;
720
721         status = ctdb_client_control_recv(subreq, &ret, NULL, NULL);
722         TALLOC_FREE(subreq);
723         if (! status) {
724                 D_ERR("control PUSH_DB failed for db %s on node %u, ret=%d\n",
725                       recdb_name(state->recdb), state->pnn_list[state->index],
726                       ret);
727                 tevent_req_error(req, ret);
728                 return;
729         }
730
731         state->index += 1;
732         if (state->index == state->count) {
733                 TALLOC_FREE(state->recbuf);
734                 tevent_req_done(req);
735                 return;
736         }
737
738         pnn = state->pnn_list[state->index];
739
740         ctdb_req_control_push_db(&request, state->recbuf);
741         subreq = ctdb_client_control_send(state, state->ev, state->client,
742                                           pnn, TIMEOUT(), &request);
743         if (tevent_req_nomem(subreq, req)) {
744                 return;
745         }
746         tevent_req_set_callback(subreq, push_database_old_push_done, req);
747 }
748
749 static bool push_database_old_recv(struct tevent_req *req, int *perr)
750 {
751         return generic_recv(req, perr);
752 }
753
754 /*
755  * Push database to specified nodes (new style)
756  */
757
758 struct push_database_new_state {
759         struct tevent_context *ev;
760         struct ctdb_client_context *client;
761         struct recdb_context *recdb;
762         uint32_t *pnn_list;
763         int count;
764         uint64_t srvid;
765         uint32_t dmaster;
766         int fd;
767         int num_buffers;
768         int num_buffers_sent;
769         int num_records;
770 };
771
772 static void push_database_new_started(struct tevent_req *subreq);
773 static void push_database_new_send_msg(struct tevent_req *req);
774 static void push_database_new_send_done(struct tevent_req *subreq);
775 static void push_database_new_confirmed(struct tevent_req *subreq);
776
777 static struct tevent_req *push_database_new_send(
778                         TALLOC_CTX *mem_ctx,
779                         struct tevent_context *ev,
780                         struct ctdb_client_context *client,
781                         uint32_t *pnn_list, int count,
782                         struct recdb_context *recdb,
783                         int max_size)
784 {
785         struct tevent_req *req, *subreq;
786         struct push_database_new_state *state;
787         struct ctdb_req_control request;
788         struct ctdb_pulldb_ext pulldb_ext;
789         char *filename;
790         off_t offset;
791
792         req = tevent_req_create(mem_ctx, &state,
793                                 struct push_database_new_state);
794         if (req == NULL) {
795                 return NULL;
796         }
797
798         state->ev = ev;
799         state->client = client;
800         state->recdb = recdb;
801         state->pnn_list = pnn_list;
802         state->count = count;
803
804         state->srvid = srvid_next();
805         state->dmaster = ctdb_client_pnn(client);
806         state->num_buffers_sent = 0;
807         state->num_records = 0;
808
809         filename = talloc_asprintf(state, "%s.dat", recdb_path(recdb));
810         if (tevent_req_nomem(filename, req)) {
811                 return tevent_req_post(req, ev);
812         }
813
814         state->fd = open(filename, O_RDWR|O_CREAT, 0644);
815         if (state->fd == -1) {
816                 tevent_req_error(req, errno);
817                 return tevent_req_post(req, ev);
818         }
819         unlink(filename);
820         talloc_free(filename);
821
822         state->num_buffers = recdb_file(recdb, state, state->dmaster,
823                                         state->fd, max_size);
824         if (state->num_buffers == -1) {
825                 tevent_req_error(req, ENOMEM);
826                 return tevent_req_post(req, ev);
827         }
828
829         offset = lseek(state->fd, 0, SEEK_SET);
830         if (offset != 0) {
831                 tevent_req_error(req, EIO);
832                 return tevent_req_post(req, ev);
833         }
834
835         pulldb_ext.db_id = recdb_id(recdb);
836         pulldb_ext.srvid = state->srvid;
837
838         ctdb_req_control_db_push_start(&request, &pulldb_ext);
839         subreq = ctdb_client_control_multi_send(state, ev, client,
840                                                 pnn_list, count,
841                                                 TIMEOUT(), &request);
842         if (tevent_req_nomem(subreq, req)) {
843                 return tevent_req_post(req, ev);
844         }
845         tevent_req_set_callback(subreq, push_database_new_started, req);
846
847         return req;
848 }
849
850 static void push_database_new_started(struct tevent_req *subreq)
851 {
852         struct tevent_req *req = tevent_req_callback_data(
853                 subreq, struct tevent_req);
854         struct push_database_new_state *state = tevent_req_data(
855                 req, struct push_database_new_state);
856         int *err_list;
857         int ret;
858         bool status;
859
860         status = ctdb_client_control_multi_recv(subreq, &ret, state,
861                                                 &err_list, NULL);
862         TALLOC_FREE(subreq);
863         if (! status) {
864                 int ret2;
865                 uint32_t pnn;
866
867                 ret2 = ctdb_client_control_multi_error(state->pnn_list,
868                                                        state->count,
869                                                        err_list, &pnn);
870                 if (ret2 != 0) {
871                         D_ERR("control DB_PUSH_START failed for db %s"
872                               " on node %u, ret=%d\n",
873                               recdb_name(state->recdb), pnn, ret2);
874                 } else {
875                         D_ERR("control DB_PUSH_START failed for db %s,"
876                               " ret=%d\n",
877                               recdb_name(state->recdb), ret);
878                 }
879                 talloc_free(err_list);
880
881                 tevent_req_error(req, ret);
882                 return;
883         }
884
885         push_database_new_send_msg(req);
886 }
887
888 static void push_database_new_send_msg(struct tevent_req *req)
889 {
890         struct push_database_new_state *state = tevent_req_data(
891                 req, struct push_database_new_state);
892         struct tevent_req *subreq;
893         struct ctdb_rec_buffer *recbuf;
894         struct ctdb_req_message message;
895         TDB_DATA data;
896         size_t np;
897         int ret;
898
899         if (state->num_buffers_sent == state->num_buffers) {
900                 struct ctdb_req_control request;
901
902                 ctdb_req_control_db_push_confirm(&request,
903                                                  recdb_id(state->recdb));
904                 subreq = ctdb_client_control_multi_send(state, state->ev,
905                                                         state->client,
906                                                         state->pnn_list,
907                                                         state->count,
908                                                         TIMEOUT(), &request);
909                 if (tevent_req_nomem(subreq, req)) {
910                         return;
911                 }
912                 tevent_req_set_callback(subreq, push_database_new_confirmed,
913                                         req);
914                 return;
915         }
916
917         ret = ctdb_rec_buffer_read(state->fd, state, &recbuf);
918         if (ret != 0) {
919                 tevent_req_error(req, ret);
920                 return;
921         }
922
923         data.dsize = ctdb_rec_buffer_len(recbuf);
924         data.dptr = talloc_size(state, data.dsize);
925         if (tevent_req_nomem(data.dptr, req)) {
926                 return;
927         }
928
929         ctdb_rec_buffer_push(recbuf, data.dptr, &np);
930
931         message.srvid = state->srvid;
932         message.data.data = data;
933
934         D_DEBUG("Pushing buffer %d with %d records for db %s\n",
935                 state->num_buffers_sent, recbuf->count,
936                 recdb_name(state->recdb));
937
938         subreq = ctdb_client_message_multi_send(state, state->ev,
939                                                 state->client,
940                                                 state->pnn_list, state->count,
941                                                 &message);
942         if (tevent_req_nomem(subreq, req)) {
943                 return;
944         }
945         tevent_req_set_callback(subreq, push_database_new_send_done, req);
946
947         state->num_records += recbuf->count;
948
949         talloc_free(data.dptr);
950         talloc_free(recbuf);
951 }
952
953 static void push_database_new_send_done(struct tevent_req *subreq)
954 {
955         struct tevent_req *req = tevent_req_callback_data(
956                 subreq, struct tevent_req);
957         struct push_database_new_state *state = tevent_req_data(
958                 req, struct push_database_new_state);
959         bool status;
960         int ret;
961
962         status = ctdb_client_message_multi_recv(subreq, &ret, NULL, NULL);
963         TALLOC_FREE(subreq);
964         if (! status) {
965                 D_ERR("Sending recovery records failed for %s\n",
966                       recdb_name(state->recdb));
967                 tevent_req_error(req, ret);
968                 return;
969         }
970
971         state->num_buffers_sent += 1;
972
973         push_database_new_send_msg(req);
974 }
975
976 static void push_database_new_confirmed(struct tevent_req *subreq)
977 {
978         struct tevent_req *req = tevent_req_callback_data(
979                 subreq, struct tevent_req);
980         struct push_database_new_state *state = tevent_req_data(
981                 req, struct push_database_new_state);
982         struct ctdb_reply_control **reply;
983         int *err_list;
984         bool status;
985         int ret, i;
986         uint32_t num_records;
987
988         status = ctdb_client_control_multi_recv(subreq, &ret, state,
989                                                 &err_list, &reply);
990         TALLOC_FREE(subreq);
991         if (! status) {
992                 int ret2;
993                 uint32_t pnn;
994
995                 ret2 = ctdb_client_control_multi_error(state->pnn_list,
996                                                        state->count, err_list,
997                                                        &pnn);
998                 if (ret2 != 0) {
999                         D_ERR("control DB_PUSH_CONFIRM failed for db %s"
1000                               " on node %u, ret=%d\n",
1001                               recdb_name(state->recdb), pnn, ret2);
1002                 } else {
1003                         D_ERR("control DB_PUSH_CONFIRM failed for db %s,"
1004                               " ret=%d\n",
1005                               recdb_name(state->recdb), ret);
1006                 }
1007                 tevent_req_error(req, ret);
1008                 return;
1009         }
1010
1011         for (i=0; i<state->count; i++) {
1012                 ret = ctdb_reply_control_db_push_confirm(reply[i],
1013                                                          &num_records);
1014                 if (ret != 0) {
1015                         tevent_req_error(req, EPROTO);
1016                         return;
1017                 }
1018
1019                 if (num_records != state->num_records) {
1020                         D_ERR("Node %u received %d of %d records for %s\n",
1021                               state->pnn_list[i], num_records,
1022                               state->num_records, recdb_name(state->recdb));
1023                         tevent_req_error(req, EPROTO);
1024                         return;
1025                 }
1026         }
1027
1028         talloc_free(reply);
1029
1030         D_INFO("Pushed %d records for db %s\n",
1031                state->num_records, recdb_name(state->recdb));
1032
1033         tevent_req_done(req);
1034 }
1035
1036 static bool push_database_new_recv(struct tevent_req *req, int *perr)
1037 {
1038         return generic_recv(req, perr);
1039 }
1040
1041 /*
1042  * wrapper for push_database_old and push_database_new
1043  */
1044
1045 struct push_database_state {
1046         bool old_done, new_done;
1047 };
1048
1049 static void push_database_old_done(struct tevent_req *subreq);
1050 static void push_database_new_done(struct tevent_req *subreq);
1051
1052 static struct tevent_req *push_database_send(
1053                         TALLOC_CTX *mem_ctx,
1054                         struct tevent_context *ev,
1055                         struct ctdb_client_context *client,
1056                         uint32_t *pnn_list, int count, uint32_t *caps,
1057                         struct ctdb_tunable_list *tun_list,
1058                         struct recdb_context *recdb)
1059 {
1060         struct tevent_req *req, *subreq;
1061         struct push_database_state *state;
1062         uint32_t *old_list, *new_list;
1063         int old_count, new_count;
1064         int i;
1065
1066         req = tevent_req_create(mem_ctx, &state, struct push_database_state);
1067         if (req == NULL) {
1068                 return NULL;
1069         }
1070
1071         state->old_done = false;
1072         state->new_done = false;
1073
1074         old_count = 0;
1075         new_count = 0;
1076         old_list = talloc_array(state, uint32_t, count);
1077         new_list = talloc_array(state, uint32_t, count);
1078         if (tevent_req_nomem(old_list, req) ||
1079             tevent_req_nomem(new_list,req)) {
1080                 return tevent_req_post(req, ev);
1081         }
1082
1083         for (i=0; i<count; i++) {
1084                 uint32_t pnn = pnn_list[i];
1085
1086                 if (caps[pnn] & CTDB_CAP_FRAGMENTED_CONTROLS) {
1087                         new_list[new_count] = pnn;
1088                         new_count += 1;
1089                 } else {
1090                         old_list[old_count] = pnn;
1091                         old_count += 1;
1092                 }
1093         }
1094
1095         if (old_count > 0) {
1096                 subreq = push_database_old_send(state, ev, client,
1097                                                 old_list, old_count, recdb);
1098                 if (tevent_req_nomem(subreq, req)) {
1099                         return tevent_req_post(req, ev);
1100                 }
1101                 tevent_req_set_callback(subreq, push_database_old_done, req);
1102         } else {
1103                 state->old_done = true;
1104         }
1105
1106         if (new_count > 0) {
1107                 subreq = push_database_new_send(state, ev, client,
1108                                                 new_list, new_count, recdb,
1109                                                 tun_list->rec_buffer_size_limit);
1110                 if (tevent_req_nomem(subreq, req)) {
1111                         return tevent_req_post(req, ev);
1112                 }
1113                 tevent_req_set_callback(subreq, push_database_new_done, req);
1114         } else {
1115                 state->new_done = true;
1116         }
1117
1118         return req;
1119 }
1120
1121 static void push_database_old_done(struct tevent_req *subreq)
1122 {
1123         struct tevent_req *req = tevent_req_callback_data(
1124                 subreq, struct tevent_req);
1125         struct push_database_state *state = tevent_req_data(
1126                 req, struct push_database_state);
1127         bool status;
1128         int ret;
1129
1130         status = push_database_old_recv(subreq, &ret);
1131         if (! status) {
1132                 tevent_req_error(req, ret);
1133                 return;
1134         }
1135
1136         state->old_done = true;
1137
1138         if (state->old_done && state->new_done) {
1139                 tevent_req_done(req);
1140         }
1141 }
1142
1143 static void push_database_new_done(struct tevent_req *subreq)
1144 {
1145         struct tevent_req *req = tevent_req_callback_data(
1146                 subreq, struct tevent_req);
1147         struct push_database_state *state = tevent_req_data(
1148                 req, struct push_database_state);
1149         bool status;
1150         int ret;
1151
1152         status = push_database_new_recv(subreq, &ret);
1153         if (! status) {
1154                 tevent_req_error(req, ret);
1155                 return;
1156         }
1157
1158         state->new_done = true;
1159
1160         if (state->old_done && state->new_done) {
1161                 tevent_req_done(req);
1162         }
1163 }
1164
1165 static bool push_database_recv(struct tevent_req *req, int *perr)
1166 {
1167         return generic_recv(req, perr);
1168 }
1169
1170 /*
1171  * Collect databases using highest sequence number
1172  */
1173
1174 struct collect_highseqnum_db_state {
1175         struct tevent_context *ev;
1176         struct ctdb_client_context *client;
1177         uint32_t *pnn_list;
1178         int count;
1179         uint32_t *caps;
1180         uint32_t *ban_credits;
1181         uint32_t db_id;
1182         struct recdb_context *recdb;
1183         uint32_t max_pnn;
1184 };
1185
1186 static void collect_highseqnum_db_seqnum_done(struct tevent_req *subreq);
1187 static void collect_highseqnum_db_pulldb_done(struct tevent_req *subreq);
1188
1189 static struct tevent_req *collect_highseqnum_db_send(
1190                         TALLOC_CTX *mem_ctx,
1191                         struct tevent_context *ev,
1192                         struct ctdb_client_context *client,
1193                         uint32_t *pnn_list, int count, uint32_t *caps,
1194                         uint32_t *ban_credits, uint32_t db_id,
1195                         struct recdb_context *recdb)
1196 {
1197         struct tevent_req *req, *subreq;
1198         struct collect_highseqnum_db_state *state;
1199         struct ctdb_req_control request;
1200
1201         req = tevent_req_create(mem_ctx, &state,
1202                                 struct collect_highseqnum_db_state);
1203         if (req == NULL) {
1204                 return NULL;
1205         }
1206
1207         state->ev = ev;
1208         state->client = client;
1209         state->pnn_list = pnn_list;
1210         state->count = count;
1211         state->caps = caps;
1212         state->ban_credits = ban_credits;
1213         state->db_id = db_id;
1214         state->recdb = recdb;
1215
1216         ctdb_req_control_get_db_seqnum(&request, db_id);
1217         subreq = ctdb_client_control_multi_send(mem_ctx, ev, client,
1218                                                 state->pnn_list, state->count,
1219                                                 TIMEOUT(), &request);
1220         if (tevent_req_nomem(subreq, req)) {
1221                 return tevent_req_post(req, ev);
1222         }
1223         tevent_req_set_callback(subreq, collect_highseqnum_db_seqnum_done,
1224                                 req);
1225
1226         return req;
1227 }
1228
1229 static void collect_highseqnum_db_seqnum_done(struct tevent_req *subreq)
1230 {
1231         struct tevent_req *req = tevent_req_callback_data(
1232                 subreq, struct tevent_req);
1233         struct collect_highseqnum_db_state *state = tevent_req_data(
1234                 req, struct collect_highseqnum_db_state);
1235         struct ctdb_reply_control **reply;
1236         int *err_list;
1237         bool status;
1238         int ret, i;
1239         uint64_t seqnum, max_seqnum;
1240
1241         status = ctdb_client_control_multi_recv(subreq, &ret, state,
1242                                                 &err_list, &reply);
1243         TALLOC_FREE(subreq);
1244         if (! status) {
1245                 int ret2;
1246                 uint32_t pnn;
1247
1248                 ret2 = ctdb_client_control_multi_error(state->pnn_list,
1249                                                        state->count, err_list,
1250                                                        &pnn);
1251                 if (ret2 != 0) {
1252                         D_ERR("control GET_DB_SEQNUM failed for db %s"
1253                               " on node %u, ret=%d\n",
1254                               recdb_name(state->recdb), pnn, ret2);
1255                 } else {
1256                         D_ERR("control GET_DB_SEQNUM failed for db %s,"
1257                               " ret=%d\n",
1258                               recdb_name(state->recdb), ret);
1259                 }
1260                 tevent_req_error(req, ret);
1261                 return;
1262         }
1263
1264         max_seqnum = 0;
1265         state->max_pnn = state->pnn_list[0];
1266         for (i=0; i<state->count; i++) {
1267                 ret = ctdb_reply_control_get_db_seqnum(reply[i], &seqnum);
1268                 if (ret != 0) {
1269                         tevent_req_error(req, EPROTO);
1270                         return;
1271                 }
1272
1273                 if (max_seqnum < seqnum) {
1274                         max_seqnum = seqnum;
1275                         state->max_pnn = state->pnn_list[i];
1276                 }
1277         }
1278
1279         talloc_free(reply);
1280
1281         D_INFO("Pull persistent db %s from node %d with seqnum 0x%"PRIx64"\n",
1282                recdb_name(state->recdb), state->max_pnn, max_seqnum);
1283
1284         subreq = pull_database_send(state, state->ev, state->client,
1285                                     state->max_pnn,
1286                                     state->caps[state->max_pnn],
1287                                     state->recdb);
1288         if (tevent_req_nomem(subreq, req)) {
1289                 return;
1290         }
1291         tevent_req_set_callback(subreq, collect_highseqnum_db_pulldb_done,
1292                                 req);
1293 }
1294
1295 static void collect_highseqnum_db_pulldb_done(struct tevent_req *subreq)
1296 {
1297         struct tevent_req *req = tevent_req_callback_data(
1298                 subreq, struct tevent_req);
1299         struct collect_highseqnum_db_state *state = tevent_req_data(
1300                 req, struct collect_highseqnum_db_state);
1301         int ret;
1302         bool status;
1303
1304         status = pull_database_recv(subreq, &ret);
1305         TALLOC_FREE(subreq);
1306         if (! status) {
1307                 state->ban_credits[state->max_pnn] += 1;
1308                 tevent_req_error(req, ret);
1309                 return;
1310         }
1311
1312         tevent_req_done(req);
1313 }
1314
1315 static bool collect_highseqnum_db_recv(struct tevent_req *req, int *perr)
1316 {
1317         return generic_recv(req, perr);
1318 }
1319
1320 /*
1321  * Collect all databases
1322  */
1323
1324 struct collect_all_db_state {
1325         struct tevent_context *ev;
1326         struct ctdb_client_context *client;
1327         uint32_t *pnn_list;
1328         int count;
1329         uint32_t *caps;
1330         uint32_t *ban_credits;
1331         uint32_t db_id;
1332         struct recdb_context *recdb;
1333         struct ctdb_pulldb pulldb;
1334         int index;
1335 };
1336
1337 static void collect_all_db_pulldb_done(struct tevent_req *subreq);
1338
1339 static struct tevent_req *collect_all_db_send(
1340                         TALLOC_CTX *mem_ctx,
1341                         struct tevent_context *ev,
1342                         struct ctdb_client_context *client,
1343                         uint32_t *pnn_list, int count, uint32_t *caps,
1344                         uint32_t *ban_credits, uint32_t db_id,
1345                         struct recdb_context *recdb)
1346 {
1347         struct tevent_req *req, *subreq;
1348         struct collect_all_db_state *state;
1349         uint32_t pnn;
1350
1351         req = tevent_req_create(mem_ctx, &state,
1352                                 struct collect_all_db_state);
1353         if (req == NULL) {
1354                 return NULL;
1355         }
1356
1357         state->ev = ev;
1358         state->client = client;
1359         state->pnn_list = pnn_list;
1360         state->count = count;
1361         state->caps = caps;
1362         state->ban_credits = ban_credits;
1363         state->db_id = db_id;
1364         state->recdb = recdb;
1365         state->index = 0;
1366
1367         pnn = state->pnn_list[state->index];
1368
1369         subreq = pull_database_send(state, ev, client, pnn, caps[pnn], recdb);
1370         if (tevent_req_nomem(subreq, req)) {
1371                 return tevent_req_post(req, ev);
1372         }
1373         tevent_req_set_callback(subreq, collect_all_db_pulldb_done, req);
1374
1375         return req;
1376 }
1377
1378 static void collect_all_db_pulldb_done(struct tevent_req *subreq)
1379 {
1380         struct tevent_req *req = tevent_req_callback_data(
1381                 subreq, struct tevent_req);
1382         struct collect_all_db_state *state = tevent_req_data(
1383                 req, struct collect_all_db_state);
1384         uint32_t pnn;
1385         int ret;
1386         bool status;
1387
1388         status = pull_database_recv(subreq, &ret);
1389         TALLOC_FREE(subreq);
1390         if (! status) {
1391                 pnn = state->pnn_list[state->index];
1392                 state->ban_credits[pnn] += 1;
1393                 tevent_req_error(req, ret);
1394                 return;
1395         }
1396
1397         state->index += 1;
1398         if (state->index == state->count) {
1399                 tevent_req_done(req);
1400                 return;
1401         }
1402
1403         pnn = state->pnn_list[state->index];
1404         subreq = pull_database_send(state, state->ev, state->client,
1405                                     pnn, state->caps[pnn], state->recdb);
1406         if (tevent_req_nomem(subreq, req)) {
1407                 return;
1408         }
1409         tevent_req_set_callback(subreq, collect_all_db_pulldb_done, req);
1410 }
1411
1412 static bool collect_all_db_recv(struct tevent_req *req, int *perr)
1413 {
1414         return generic_recv(req, perr);
1415 }
1416
1417
1418 /**
1419  * For each database do the following:
1420  *  - Get DB name
1421  *  - Get DB path
1422  *  - Freeze database on all nodes
1423  *  - Start transaction on all nodes
1424  *  - Collect database from all nodes
1425  *  - Wipe database on all nodes
1426  *  - Push database to all nodes
1427  *  - Commit transaction on all nodes
1428  *  - Thaw database on all nodes
1429  */
1430
1431 struct recover_db_state {
1432         struct tevent_context *ev;
1433         struct ctdb_client_context *client;
1434         struct ctdb_tunable_list *tun_list;
1435         uint32_t *pnn_list;
1436         int count;
1437         uint32_t *caps;
1438         uint32_t *ban_credits;
1439         uint32_t db_id;
1440         uint8_t db_flags;
1441
1442         uint32_t destnode;
1443         struct ctdb_transdb transdb;
1444
1445         const char *db_name, *db_path;
1446         struct recdb_context *recdb;
1447 };
1448
1449 static void recover_db_name_done(struct tevent_req *subreq);
1450 static void recover_db_path_done(struct tevent_req *subreq);
1451 static void recover_db_freeze_done(struct tevent_req *subreq);
1452 static void recover_db_transaction_started(struct tevent_req *subreq);
1453 static void recover_db_collect_done(struct tevent_req *subreq);
1454 static void recover_db_wipedb_done(struct tevent_req *subreq);
1455 static void recover_db_pushdb_done(struct tevent_req *subreq);
1456 static void recover_db_transaction_committed(struct tevent_req *subreq);
1457 static void recover_db_thaw_done(struct tevent_req *subreq);
1458
1459 static struct tevent_req *recover_db_send(TALLOC_CTX *mem_ctx,
1460                                           struct tevent_context *ev,
1461                                           struct ctdb_client_context *client,
1462                                           struct ctdb_tunable_list *tun_list,
1463                                           uint32_t *pnn_list, int count,
1464                                           uint32_t *caps,
1465                                           uint32_t *ban_credits,
1466                                           uint32_t generation,
1467                                           uint32_t db_id, uint8_t db_flags)
1468 {
1469         struct tevent_req *req, *subreq;
1470         struct recover_db_state *state;
1471         struct ctdb_req_control request;
1472
1473         req = tevent_req_create(mem_ctx, &state, struct recover_db_state);
1474         if (req == NULL) {
1475                 return NULL;
1476         }
1477
1478         state->ev = ev;
1479         state->client = client;
1480         state->tun_list = tun_list;
1481         state->pnn_list = pnn_list;
1482         state->count = count;
1483         state->caps = caps;
1484         state->ban_credits = ban_credits;
1485         state->db_id = db_id;
1486         state->db_flags = db_flags;
1487
1488         state->destnode = ctdb_client_pnn(client);
1489         state->transdb.db_id = db_id;
1490         state->transdb.tid = generation;
1491
1492         ctdb_req_control_get_dbname(&request, db_id);
1493         subreq = ctdb_client_control_send(state, ev, client, state->destnode,
1494                                           TIMEOUT(), &request);
1495         if (tevent_req_nomem(subreq, req)) {
1496                 return tevent_req_post(req, ev);
1497         }
1498         tevent_req_set_callback(subreq, recover_db_name_done, req);
1499
1500         return req;
1501 }
1502
1503 static void recover_db_name_done(struct tevent_req *subreq)
1504 {
1505         struct tevent_req *req = tevent_req_callback_data(
1506                 subreq, struct tevent_req);
1507         struct recover_db_state *state = tevent_req_data(
1508                 req, struct recover_db_state);
1509         struct ctdb_reply_control *reply;
1510         struct ctdb_req_control request;
1511         int ret;
1512         bool status;
1513
1514         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
1515         TALLOC_FREE(subreq);
1516         if (! status) {
1517                 D_ERR("control GET_DBNAME failed for db=0x%x, ret=%d\n",
1518                       state->db_id, ret);
1519                 tevent_req_error(req, ret);
1520                 return;
1521         }
1522
1523         ret = ctdb_reply_control_get_dbname(reply, state, &state->db_name);
1524         if (ret != 0) {
1525                 D_ERR("control GET_DBNAME failed for db=0x%x, ret=%d\n",
1526                       state->db_id, ret);
1527                 tevent_req_error(req, EPROTO);
1528                 return;
1529         }
1530
1531         talloc_free(reply);
1532
1533         ctdb_req_control_getdbpath(&request, state->db_id);
1534         subreq = ctdb_client_control_send(state, state->ev, state->client,
1535                                           state->destnode, TIMEOUT(),
1536                                           &request);
1537         if (tevent_req_nomem(subreq, req)) {
1538                 return;
1539         }
1540         tevent_req_set_callback(subreq, recover_db_path_done, req);
1541 }
1542
1543 static void recover_db_path_done(struct tevent_req *subreq)
1544 {
1545         struct tevent_req *req = tevent_req_callback_data(
1546                 subreq, struct tevent_req);
1547         struct recover_db_state *state = tevent_req_data(
1548                 req, struct recover_db_state);
1549         struct ctdb_reply_control *reply;
1550         struct ctdb_req_control request;
1551         int ret;
1552         bool status;
1553
1554         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
1555         TALLOC_FREE(subreq);
1556         if (! status) {
1557                 D_ERR("control GETDBPATH failed for db %s, ret=%d\n",
1558                       state->db_name, ret);
1559                 tevent_req_error(req, ret);
1560                 return;
1561         }
1562
1563         ret = ctdb_reply_control_getdbpath(reply, state, &state->db_path);
1564         if (ret != 0) {
1565                 D_ERR("control GETDBPATH failed for db %s, ret=%d\n",
1566                       state->db_name, ret);
1567                 tevent_req_error(req, EPROTO);
1568                 return;
1569         }
1570
1571         talloc_free(reply);
1572
1573         ctdb_req_control_db_freeze(&request, state->db_id);
1574         subreq = ctdb_client_control_multi_send(state, state->ev,
1575                                                 state->client,
1576                                                 state->pnn_list, state->count,
1577                                                 TIMEOUT(), &request);
1578         if (tevent_req_nomem(subreq, req)) {
1579                 return;
1580         }
1581         tevent_req_set_callback(subreq, recover_db_freeze_done, req);
1582 }
1583
1584 static void recover_db_freeze_done(struct tevent_req *subreq)
1585 {
1586         struct tevent_req *req = tevent_req_callback_data(
1587                 subreq, struct tevent_req);
1588         struct recover_db_state *state = tevent_req_data(
1589                 req, struct recover_db_state);
1590         struct ctdb_req_control request;
1591         int *err_list;
1592         int ret;
1593         bool status;
1594
1595         status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
1596                                                 NULL);
1597         TALLOC_FREE(subreq);
1598         if (! status) {
1599                 int ret2;
1600                 uint32_t pnn;
1601
1602                 ret2 = ctdb_client_control_multi_error(state->pnn_list,
1603                                                        state->count, err_list,
1604                                                        &pnn);
1605                 if (ret2 != 0) {
1606                         D_ERR("control FREEZE_DB failed for db %s"
1607                               " on node %u, ret=%d\n",
1608                               state->db_name, pnn, ret2);
1609                         state->ban_credits[pnn] += 1;
1610                 } else {
1611                         D_ERR("control FREEZE_DB failed for db %s, ret=%d\n",
1612                               state->db_name, ret);
1613                 }
1614                 tevent_req_error(req, ret);
1615                 return;
1616         }
1617
1618         ctdb_req_control_db_transaction_start(&request, &state->transdb);
1619         subreq = ctdb_client_control_multi_send(state, state->ev,
1620                                                 state->client,
1621                                                 state->pnn_list, state->count,
1622                                                 TIMEOUT(), &request);
1623         if (tevent_req_nomem(subreq, req)) {
1624                 return;
1625         }
1626         tevent_req_set_callback(subreq, recover_db_transaction_started, req);
1627 }
1628
1629 static void recover_db_transaction_started(struct tevent_req *subreq)
1630 {
1631         struct tevent_req *req = tevent_req_callback_data(
1632                 subreq, struct tevent_req);
1633         struct recover_db_state *state = tevent_req_data(
1634                 req, struct recover_db_state);
1635         int *err_list;
1636         int ret;
1637         bool status;
1638
1639         status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
1640                                                 NULL);
1641         TALLOC_FREE(subreq);
1642         if (! status) {
1643                 int ret2;
1644                 uint32_t pnn;
1645
1646                 ret2 = ctdb_client_control_multi_error(state->pnn_list,
1647                                                        state->count,
1648                                                        err_list, &pnn);
1649                 if (ret2 != 0) {
1650                         D_ERR("control TRANSACTION_DB failed for db=%s"
1651                               " on node %u, ret=%d\n",
1652                               state->db_name, pnn, ret2);
1653                 } else {
1654                         D_ERR("control TRANSACTION_DB failed for db=%s,"
1655                               " ret=%d\n", state->db_name, ret);
1656                 }
1657                 tevent_req_error(req, ret);
1658                 return;
1659         }
1660
1661         state->recdb = recdb_create(state, state->db_id, state->db_name,
1662                                     state->db_path,
1663                                     state->tun_list->database_hash_size,
1664                                     state->db_flags & CTDB_DB_FLAGS_PERSISTENT);
1665         if (tevent_req_nomem(state->recdb, req)) {
1666                 return;
1667         }
1668
1669         if ((state->db_flags & CTDB_DB_FLAGS_PERSISTENT) ||
1670             (state->db_flags & CTDB_DB_FLAGS_REPLICATED)) {
1671                 subreq = collect_highseqnum_db_send(
1672                                 state, state->ev, state->client,
1673                                 state->pnn_list, state->count, state->caps,
1674                                 state->ban_credits, state->db_id,
1675                                 state->recdb);
1676         } else {
1677                 subreq = collect_all_db_send(
1678                                 state, state->ev, state->client,
1679                                 state->pnn_list, state->count, state->caps,
1680                                 state->ban_credits, state->db_id,
1681                                 state->recdb);
1682         }
1683         if (tevent_req_nomem(subreq, req)) {
1684                 return;
1685         }
1686         tevent_req_set_callback(subreq, recover_db_collect_done, req);
1687 }
1688
1689 static void recover_db_collect_done(struct tevent_req *subreq)
1690 {
1691         struct tevent_req *req = tevent_req_callback_data(
1692                 subreq, struct tevent_req);
1693         struct recover_db_state *state = tevent_req_data(
1694                 req, struct recover_db_state);
1695         struct ctdb_req_control request;
1696         int ret;
1697         bool status;
1698
1699         if ((state->db_flags & CTDB_DB_FLAGS_PERSISTENT) ||
1700             (state->db_flags & CTDB_DB_FLAGS_REPLICATED)) {
1701                 status = collect_highseqnum_db_recv(subreq, &ret);
1702         } else {
1703                 status = collect_all_db_recv(subreq, &ret);
1704         }
1705         TALLOC_FREE(subreq);
1706         if (! status) {
1707                 tevent_req_error(req, ret);
1708                 return;
1709         }
1710
1711         ctdb_req_control_wipe_database(&request, &state->transdb);
1712         subreq = ctdb_client_control_multi_send(state, state->ev,
1713                                                 state->client,
1714                                                 state->pnn_list, state->count,
1715                                                 TIMEOUT(), &request);
1716         if (tevent_req_nomem(subreq, req)) {
1717                 return;
1718         }
1719         tevent_req_set_callback(subreq, recover_db_wipedb_done, req);
1720 }
1721
1722 static void recover_db_wipedb_done(struct tevent_req *subreq)
1723 {
1724         struct tevent_req *req = tevent_req_callback_data(
1725                 subreq, struct tevent_req);
1726         struct recover_db_state *state = tevent_req_data(
1727                 req, struct recover_db_state);
1728         int *err_list;
1729         int ret;
1730         bool status;
1731
1732         status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
1733                                                 NULL);
1734         TALLOC_FREE(subreq);
1735         if (! status) {
1736                 int ret2;
1737                 uint32_t pnn;
1738
1739                 ret2 = ctdb_client_control_multi_error(state->pnn_list,
1740                                                        state->count,
1741                                                        err_list, &pnn);
1742                 if (ret2 != 0) {
1743                         D_ERR("control WIPEDB failed for db %s on node %u,"
1744                               " ret=%d\n", state->db_name, pnn, ret2);
1745                 } else {
1746                         D_ERR("control WIPEDB failed for db %s, ret=%d\n",
1747                               state->db_name, ret);
1748                 }
1749                 tevent_req_error(req, ret);
1750                 return;
1751         }
1752
1753         subreq = push_database_send(state, state->ev, state->client,
1754                                     state->pnn_list, state->count,
1755                                     state->caps, state->tun_list,
1756                                     state->recdb);
1757         if (tevent_req_nomem(subreq, req)) {
1758                 return;
1759         }
1760         tevent_req_set_callback(subreq, recover_db_pushdb_done, req);
1761 }
1762
1763 static void recover_db_pushdb_done(struct tevent_req *subreq)
1764 {
1765         struct tevent_req *req = tevent_req_callback_data(
1766                 subreq, struct tevent_req);
1767         struct recover_db_state *state = tevent_req_data(
1768                 req, struct recover_db_state);
1769         struct ctdb_req_control request;
1770         int ret;
1771         bool status;
1772
1773         status = push_database_recv(subreq, &ret);
1774         TALLOC_FREE(subreq);
1775         if (! status) {
1776                 tevent_req_error(req, ret);
1777                 return;
1778         }
1779
1780         TALLOC_FREE(state->recdb);
1781
1782         ctdb_req_control_db_transaction_commit(&request, &state->transdb);
1783         subreq = ctdb_client_control_multi_send(state, state->ev,
1784                                                 state->client,
1785                                                 state->pnn_list, state->count,
1786                                                 TIMEOUT(), &request);
1787         if (tevent_req_nomem(subreq, req)) {
1788                 return;
1789         }
1790         tevent_req_set_callback(subreq, recover_db_transaction_committed, req);
1791 }
1792
1793 static void recover_db_transaction_committed(struct tevent_req *subreq)
1794 {
1795         struct tevent_req *req = tevent_req_callback_data(
1796                 subreq, struct tevent_req);
1797         struct recover_db_state *state = tevent_req_data(
1798                 req, struct recover_db_state);
1799         struct ctdb_req_control request;
1800         int *err_list;
1801         int ret;
1802         bool status;
1803
1804         status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
1805                                                 NULL);
1806         TALLOC_FREE(subreq);
1807         if (! status) {
1808                 int ret2;
1809                 uint32_t pnn;
1810
1811                 ret2 = ctdb_client_control_multi_error(state->pnn_list,
1812                                                        state->count,
1813                                                        err_list, &pnn);
1814                 if (ret2 != 0) {
1815                         D_ERR("control DB_TRANSACTION_COMMIT failed for db %s"
1816                               " on node %u, ret=%d\n",
1817                               state->db_name, pnn, ret2);
1818                 } else {
1819                         D_ERR("control DB_TRANSACTION_COMMIT failed for db %s,"
1820                               " ret=%d\n", state->db_name, ret);
1821                 }
1822                 tevent_req_error(req, ret);
1823                 return;
1824         }
1825
1826         ctdb_req_control_db_thaw(&request, state->db_id);
1827         subreq = ctdb_client_control_multi_send(state, state->ev,
1828                                                 state->client,
1829                                                 state->pnn_list, state->count,
1830                                                 TIMEOUT(), &request);
1831         if (tevent_req_nomem(subreq, req)) {
1832                 return;
1833         }
1834         tevent_req_set_callback(subreq, recover_db_thaw_done, req);
1835 }
1836
1837 static void recover_db_thaw_done(struct tevent_req *subreq)
1838 {
1839         struct tevent_req *req = tevent_req_callback_data(
1840                 subreq, struct tevent_req);
1841         struct recover_db_state *state = tevent_req_data(
1842                 req, struct recover_db_state);
1843         int *err_list;
1844         int ret;
1845         bool status;
1846
1847         status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
1848                                                 NULL);
1849         TALLOC_FREE(subreq);
1850         if (! status) {
1851                 int ret2;
1852                 uint32_t pnn;
1853
1854                 ret2 = ctdb_client_control_multi_error(state->pnn_list,
1855                                                        state->count,
1856                                                        err_list, &pnn);
1857                 if (ret2 != 0) {
1858                         D_ERR("control DB_THAW failed for db %s on node %u,"
1859                               " ret=%d\n", state->db_name, pnn, ret2);
1860                 } else {
1861                         D_ERR("control DB_THAW failed for db %s, ret=%d\n",
1862                               state->db_name, ret);
1863                 }
1864                 tevent_req_error(req, ret);
1865                 return;
1866         }
1867
1868         tevent_req_done(req);
1869 }
1870
1871 static bool recover_db_recv(struct tevent_req *req)
1872 {
1873         return generic_recv(req, NULL);
1874 }
1875
1876
1877 /*
1878  * Start database recovery for each database
1879  *
1880  * Try to recover each database 5 times before failing recovery.
1881  */
1882
1883 struct db_recovery_state {
1884         struct tevent_context *ev;
1885         struct ctdb_dbid_map *dbmap;
1886         int num_replies;
1887         int num_failed;
1888 };
1889
1890 struct db_recovery_one_state {
1891         struct tevent_req *req;
1892         struct ctdb_client_context *client;
1893         struct ctdb_dbid_map *dbmap;
1894         struct ctdb_tunable_list *tun_list;
1895         uint32_t *pnn_list;
1896         int count;
1897         uint32_t *caps;
1898         uint32_t *ban_credits;
1899         uint32_t generation;
1900         uint32_t db_id;
1901         uint8_t db_flags;
1902         int num_fails;
1903 };
1904
1905 static void db_recovery_one_done(struct tevent_req *subreq);
1906
1907 static struct tevent_req *db_recovery_send(TALLOC_CTX *mem_ctx,
1908                                            struct tevent_context *ev,
1909                                            struct ctdb_client_context *client,
1910                                            struct ctdb_dbid_map *dbmap,
1911                                            struct ctdb_tunable_list *tun_list,
1912                                            uint32_t *pnn_list, int count,
1913                                            uint32_t *caps,
1914                                            uint32_t *ban_credits,
1915                                            uint32_t generation)
1916 {
1917         struct tevent_req *req, *subreq;
1918         struct db_recovery_state *state;
1919         int i;
1920
1921         req = tevent_req_create(mem_ctx, &state, struct db_recovery_state);
1922         if (req == NULL) {
1923                 return NULL;
1924         }
1925
1926         state->ev = ev;
1927         state->dbmap = dbmap;
1928         state->num_replies = 0;
1929         state->num_failed = 0;
1930
1931         if (dbmap->num == 0) {
1932                 tevent_req_done(req);
1933                 return tevent_req_post(req, ev);
1934         }
1935
1936         for (i=0; i<dbmap->num; i++) {
1937                 struct db_recovery_one_state *substate;
1938
1939                 substate = talloc_zero(state, struct db_recovery_one_state);
1940                 if (tevent_req_nomem(substate, req)) {
1941                         return tevent_req_post(req, ev);
1942                 }
1943
1944                 substate->req = req;
1945                 substate->client = client;
1946                 substate->dbmap = dbmap;
1947                 substate->tun_list = tun_list;
1948                 substate->pnn_list = pnn_list;
1949                 substate->count = count;
1950                 substate->caps = caps;
1951                 substate->ban_credits = ban_credits;
1952                 substate->generation = generation;
1953                 substate->db_id = dbmap->dbs[i].db_id;
1954                 substate->db_flags = dbmap->dbs[i].flags;
1955
1956                 subreq = recover_db_send(state, ev, client, tun_list,
1957                                          pnn_list, count, caps, ban_credits,
1958                                          generation, substate->db_id,
1959                                          substate->db_flags);
1960                 if (tevent_req_nomem(subreq, req)) {
1961                         return tevent_req_post(req, ev);
1962                 }
1963                 tevent_req_set_callback(subreq, db_recovery_one_done,
1964                                         substate);
1965                 D_NOTICE("recover database 0x%08x\n", substate->db_id);
1966         }
1967
1968         return req;
1969 }
1970
1971 static void db_recovery_one_done(struct tevent_req *subreq)
1972 {
1973         struct db_recovery_one_state *substate = tevent_req_callback_data(
1974                 subreq, struct db_recovery_one_state);
1975         struct tevent_req *req = substate->req;
1976         struct db_recovery_state *state = tevent_req_data(
1977                 req, struct db_recovery_state);
1978         bool status;
1979
1980         status = recover_db_recv(subreq);
1981         TALLOC_FREE(subreq);
1982
1983         if (status) {
1984                 talloc_free(substate);
1985                 goto done;
1986         }
1987
1988         substate->num_fails += 1;
1989         if (substate->num_fails < NUM_RETRIES) {
1990                 subreq = recover_db_send(state, state->ev, substate->client,
1991                                          substate->tun_list,
1992                                          substate->pnn_list, substate->count,
1993                                          substate->caps, substate->ban_credits,
1994                                          substate->generation, substate->db_id,
1995                                          substate->db_flags);
1996                 if (tevent_req_nomem(subreq, req)) {
1997                         goto failed;
1998                 }
1999                 tevent_req_set_callback(subreq, db_recovery_one_done, substate);
2000                 D_NOTICE("recover database 0x%08x, attempt %d\n",
2001                          substate->db_id, substate->num_fails+1);
2002                 return;
2003         }
2004
2005 failed:
2006         state->num_failed += 1;
2007
2008 done:
2009         state->num_replies += 1;
2010
2011         if (state->num_replies == state->dbmap->num) {
2012                 tevent_req_done(req);
2013         }
2014 }
2015
2016 static bool db_recovery_recv(struct tevent_req *req, int *count)
2017 {
2018         struct db_recovery_state *state = tevent_req_data(
2019                 req, struct db_recovery_state);
2020         int err;
2021
2022         if (tevent_req_is_unix_error(req, &err)) {
2023                 *count = 0;
2024                 return false;
2025         }
2026
2027         *count = state->num_replies - state->num_failed;
2028
2029         if (state->num_failed > 0) {
2030                 return false;
2031         }
2032
2033         return true;
2034 }
2035
2036
2037 /*
2038  * Run the parallel database recovery
2039  *
2040  * - Get tunables
2041  * - Get nodemap
2042  * - Get vnnmap
2043  * - Get capabilities from all nodes
2044  * - Get dbmap
2045  * - Set RECOVERY_ACTIVE
2046  * - Send START_RECOVERY
2047  * - Update vnnmap on all nodes
2048  * - Run database recovery
2049  * - Set RECOVERY_NORMAL
2050  * - Send END_RECOVERY
2051  */
2052
2053 struct recovery_state {
2054         struct tevent_context *ev;
2055         struct ctdb_client_context *client;
2056         uint32_t generation;
2057         uint32_t *pnn_list;
2058         int count;
2059         uint32_t destnode;
2060         struct ctdb_node_map *nodemap;
2061         uint32_t *caps;
2062         uint32_t *ban_credits;
2063         struct ctdb_tunable_list *tun_list;
2064         struct ctdb_vnn_map *vnnmap;
2065         struct ctdb_dbid_map *dbmap;
2066 };
2067
2068 static void recovery_tunables_done(struct tevent_req *subreq);
2069 static void recovery_nodemap_done(struct tevent_req *subreq);
2070 static void recovery_vnnmap_done(struct tevent_req *subreq);
2071 static void recovery_capabilities_done(struct tevent_req *subreq);
2072 static void recovery_dbmap_done(struct tevent_req *subreq);
2073 static void recovery_active_done(struct tevent_req *subreq);
2074 static void recovery_start_recovery_done(struct tevent_req *subreq);
2075 static void recovery_vnnmap_update_done(struct tevent_req *subreq);
2076 static void recovery_db_recovery_done(struct tevent_req *subreq);
2077 static void recovery_failed_done(struct tevent_req *subreq);
2078 static void recovery_normal_done(struct tevent_req *subreq);
2079 static void recovery_end_recovery_done(struct tevent_req *subreq);
2080
2081 static struct tevent_req *recovery_send(TALLOC_CTX *mem_ctx,
2082                                         struct tevent_context *ev,
2083                                         struct ctdb_client_context *client,
2084                                         uint32_t generation)
2085 {
2086         struct tevent_req *req, *subreq;
2087         struct recovery_state *state;
2088         struct ctdb_req_control request;
2089
2090         req = tevent_req_create(mem_ctx, &state, struct recovery_state);
2091         if (req == NULL) {
2092                 return NULL;
2093         }
2094
2095         state->ev = ev;
2096         state->client = client;
2097         state->generation = generation;
2098         state->destnode = ctdb_client_pnn(client);
2099
2100         ctdb_req_control_get_all_tunables(&request);
2101         subreq = ctdb_client_control_send(state, state->ev, state->client,
2102                                           state->destnode, TIMEOUT(),
2103                                           &request);
2104         if (tevent_req_nomem(subreq, req)) {
2105                 return tevent_req_post(req, ev);
2106         }
2107         tevent_req_set_callback(subreq, recovery_tunables_done, req);
2108
2109         return req;
2110 }
2111
2112 static void recovery_tunables_done(struct tevent_req *subreq)
2113 {
2114         struct tevent_req *req = tevent_req_callback_data(
2115                 subreq, struct tevent_req);
2116         struct recovery_state *state = tevent_req_data(
2117                 req, struct recovery_state);
2118         struct ctdb_reply_control *reply;
2119         struct ctdb_req_control request;
2120         int ret;
2121         bool status;
2122
2123         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
2124         TALLOC_FREE(subreq);
2125         if (! status) {
2126                 D_ERR("control GET_ALL_TUNABLES failed, ret=%d\n", ret);
2127                 tevent_req_error(req, ret);
2128                 return;
2129         }
2130
2131         ret = ctdb_reply_control_get_all_tunables(reply, state,
2132                                                   &state->tun_list);
2133         if (ret != 0) {
2134                 D_ERR("control GET_ALL_TUNABLES failed, ret=%d\n", ret);
2135                 tevent_req_error(req, EPROTO);
2136                 return;
2137         }
2138
2139         talloc_free(reply);
2140
2141         recover_timeout = state->tun_list->recover_timeout;
2142
2143         ctdb_req_control_get_nodemap(&request);
2144         subreq = ctdb_client_control_send(state, state->ev, state->client,
2145                                           state->destnode, TIMEOUT(),
2146                                           &request);
2147         if (tevent_req_nomem(subreq, req)) {
2148                 return;
2149         }
2150         tevent_req_set_callback(subreq, recovery_nodemap_done, req);
2151 }
2152
2153 static void recovery_nodemap_done(struct tevent_req *subreq)
2154 {
2155         struct tevent_req *req = tevent_req_callback_data(
2156                 subreq, struct tevent_req);
2157         struct recovery_state *state = tevent_req_data(
2158                 req, struct recovery_state);
2159         struct ctdb_reply_control *reply;
2160         struct ctdb_req_control request;
2161         bool status;
2162         int ret;
2163
2164         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
2165         TALLOC_FREE(subreq);
2166         if (! status) {
2167                 D_ERR("control GET_NODEMAP failed to node %u, ret=%d\n",
2168                       state->destnode, ret);
2169                 tevent_req_error(req, ret);
2170                 return;
2171         }
2172
2173         ret = ctdb_reply_control_get_nodemap(reply, state, &state->nodemap);
2174         if (ret != 0) {
2175                 D_ERR("control GET_NODEMAP failed, ret=%d\n", ret);
2176                 tevent_req_error(req, ret);
2177                 return;
2178         }
2179
2180         state->count = list_of_active_nodes(state->nodemap, CTDB_UNKNOWN_PNN,
2181                                             state, &state->pnn_list);
2182         if (state->count <= 0) {
2183                 tevent_req_error(req, ENOMEM);
2184                 return;
2185         }
2186
2187         state->ban_credits = talloc_zero_array(state, uint32_t,
2188                                                state->nodemap->num);
2189         if (tevent_req_nomem(state->ban_credits, req)) {
2190                 return;
2191         }
2192
2193         ctdb_req_control_getvnnmap(&request);
2194         subreq = ctdb_client_control_send(state, state->ev, state->client,
2195                                           state->destnode, TIMEOUT(),
2196                                           &request);
2197         if (tevent_req_nomem(subreq, req)) {
2198                 return;
2199         }
2200         tevent_req_set_callback(subreq, recovery_vnnmap_done, req);
2201 }
2202
2203 static void recovery_vnnmap_done(struct tevent_req *subreq)
2204 {
2205         struct tevent_req *req = tevent_req_callback_data(
2206                 subreq, struct tevent_req);
2207         struct recovery_state *state = tevent_req_data(
2208                 req, struct recovery_state);
2209         struct ctdb_reply_control *reply;
2210         struct ctdb_req_control request;
2211         bool status;
2212         int ret;
2213
2214         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
2215         TALLOC_FREE(subreq);
2216         if (! status) {
2217                 D_ERR("control GETVNNMAP failed to node %u, ret=%d\n",
2218                       state->destnode, ret);
2219                 tevent_req_error(req, ret);
2220                 return;
2221         }
2222
2223         ret = ctdb_reply_control_getvnnmap(reply, state, &state->vnnmap);
2224         if (ret != 0) {
2225                 D_ERR("control GETVNNMAP failed, ret=%d\n", ret);
2226                 tevent_req_error(req, ret);
2227                 return;
2228         }
2229
2230         ctdb_req_control_get_capabilities(&request);
2231         subreq = ctdb_client_control_multi_send(state, state->ev,
2232                                                 state->client,
2233                                                 state->pnn_list, state->count,
2234                                                 TIMEOUT(), &request);
2235         if (tevent_req_nomem(subreq, req)) {
2236                 return;
2237         }
2238         tevent_req_set_callback(subreq, recovery_capabilities_done, req);
2239 }
2240
2241 static void recovery_capabilities_done(struct tevent_req *subreq)
2242 {
2243         struct tevent_req *req = tevent_req_callback_data(
2244                 subreq, struct tevent_req);
2245         struct recovery_state *state = tevent_req_data(
2246                 req, struct recovery_state);
2247         struct ctdb_reply_control **reply;
2248         struct ctdb_req_control request;
2249         int *err_list;
2250         int ret, i;
2251         bool status;
2252
2253         status = ctdb_client_control_multi_recv(subreq, &ret, state, &err_list,
2254                                                 &reply);
2255         TALLOC_FREE(subreq);
2256         if (! status) {
2257                 int ret2;
2258                 uint32_t pnn;
2259
2260                 ret2 = ctdb_client_control_multi_error(state->pnn_list,
2261                                                        state->count,
2262                                                        err_list, &pnn);
2263                 if (ret2 != 0) {
2264                         D_ERR("control GET_CAPABILITIES failed on node %u,"
2265                               " ret=%d\n", pnn, ret2);
2266                 } else {
2267                         D_ERR("control GET_CAPABILITIES failed, ret=%d\n",
2268                               ret);
2269                 }
2270                 tevent_req_error(req, ret);
2271                 return;
2272         }
2273
2274         /* Make the array size same as nodemap */
2275         state->caps = talloc_zero_array(state, uint32_t,
2276                                         state->nodemap->num);
2277         if (tevent_req_nomem(state->caps, req)) {
2278                 return;
2279         }
2280
2281         for (i=0; i<state->count; i++) {
2282                 uint32_t pnn;
2283
2284                 pnn = state->pnn_list[i];
2285                 ret = ctdb_reply_control_get_capabilities(reply[i],
2286                                                           &state->caps[pnn]);
2287                 if (ret != 0) {
2288                         D_ERR("control GET_CAPABILITIES failed on node %u\n",
2289                               pnn);
2290                         tevent_req_error(req, EPROTO);
2291                         return;
2292                 }
2293         }
2294
2295         talloc_free(reply);
2296
2297         ctdb_req_control_get_dbmap(&request);
2298         subreq = ctdb_client_control_send(state, state->ev, state->client,
2299                                           state->destnode, TIMEOUT(),
2300                                           &request);
2301         if (tevent_req_nomem(subreq, req)) {
2302                 return;
2303         }
2304         tevent_req_set_callback(subreq, recovery_dbmap_done, req);
2305 }
2306
2307 static void recovery_dbmap_done(struct tevent_req *subreq)
2308 {
2309         struct tevent_req *req = tevent_req_callback_data(
2310                 subreq, struct tevent_req);
2311         struct recovery_state *state = tevent_req_data(
2312                 req, struct recovery_state);
2313         struct ctdb_reply_control *reply;
2314         struct ctdb_req_control request;
2315         int ret;
2316         bool status;
2317
2318         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
2319         TALLOC_FREE(subreq);
2320         if (! status) {
2321                 D_ERR("control GET_DBMAP failed to node %u, ret=%d\n",
2322                       state->destnode, ret);
2323                 tevent_req_error(req, ret);
2324                 return;
2325         }
2326
2327         ret = ctdb_reply_control_get_dbmap(reply, state, &state->dbmap);
2328         if (ret != 0) {
2329                 D_ERR("control GET_DBMAP failed, ret=%d\n", ret);
2330                 tevent_req_error(req, ret);
2331                 return;
2332         }
2333
2334         ctdb_req_control_set_recmode(&request, CTDB_RECOVERY_ACTIVE);
2335         subreq = ctdb_client_control_multi_send(state, state->ev,
2336                                                 state->client,
2337                                                 state->pnn_list, state->count,
2338                                                 TIMEOUT(), &request);
2339         if (tevent_req_nomem(subreq, req)) {
2340                 return;
2341         }
2342         tevent_req_set_callback(subreq, recovery_active_done, req);
2343 }
2344
2345 static void recovery_active_done(struct tevent_req *subreq)
2346 {
2347         struct tevent_req *req = tevent_req_callback_data(
2348                 subreq, struct tevent_req);
2349         struct recovery_state *state = tevent_req_data(
2350                 req, struct recovery_state);
2351         struct ctdb_req_control request;
2352         struct ctdb_vnn_map *vnnmap;
2353         int *err_list;
2354         int ret, count, i;
2355         bool status;
2356
2357         status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
2358                                                 NULL);
2359         TALLOC_FREE(subreq);
2360         if (! status) {
2361                 int ret2;
2362                 uint32_t pnn;
2363
2364                 ret2 = ctdb_client_control_multi_error(state->pnn_list,
2365                                                        state->count,
2366                                                        err_list, &pnn);
2367                 if (ret2 != 0) {
2368                         D_ERR("failed to set recovery mode ACTIVE on node %u,"
2369                               " ret=%d\n", pnn, ret2);
2370                 } else {
2371                         D_ERR("failed to set recovery mode ACTIVE, ret=%d\n",
2372                               ret);
2373                 }
2374                 tevent_req_error(req, ret);
2375                 return;
2376         }
2377
2378         D_ERR("Set recovery mode to ACTIVE\n");
2379
2380         /* Calculate new VNNMAP */
2381         count = 0;
2382         for (i=0; i<state->nodemap->num; i++) {
2383                 if (state->nodemap->node[i].flags & NODE_FLAGS_INACTIVE) {
2384                         continue;
2385                 }
2386                 if (!(state->caps[i] & CTDB_CAP_LMASTER)) {
2387                         continue;
2388                 }
2389                 count += 1;
2390         }
2391
2392         if (count == 0) {
2393                 D_WARNING("No active lmasters found. Adding recmaster anyway\n");
2394         }
2395
2396         vnnmap = talloc_zero(state, struct ctdb_vnn_map);
2397         if (tevent_req_nomem(vnnmap, req)) {
2398                 return;
2399         }
2400
2401         vnnmap->size = (count == 0 ? 1 : count);
2402         vnnmap->map = talloc_array(vnnmap, uint32_t, vnnmap->size);
2403         if (tevent_req_nomem(vnnmap->map, req)) {
2404                 return;
2405         }
2406
2407         if (count == 0) {
2408                 vnnmap->map[0] = state->destnode;
2409         } else {
2410                 count = 0;
2411                 for (i=0; i<state->nodemap->num; i++) {
2412                         if (state->nodemap->node[i].flags &
2413                             NODE_FLAGS_INACTIVE) {
2414                                 continue;
2415                         }
2416                         if (!(state->caps[i] & CTDB_CAP_LMASTER)) {
2417                                 continue;
2418                         }
2419
2420                         vnnmap->map[count] = state->nodemap->node[i].pnn;
2421                         count += 1;
2422                 }
2423         }
2424
2425         vnnmap->generation = state->generation;
2426
2427         talloc_free(state->vnnmap);
2428         state->vnnmap = vnnmap;
2429
2430         ctdb_req_control_start_recovery(&request);
2431         subreq = ctdb_client_control_multi_send(state, state->ev,
2432                                                 state->client,
2433                                                 state->pnn_list, state->count,
2434                                                 TIMEOUT(), &request);
2435         if (tevent_req_nomem(subreq, req)) {
2436                 return;
2437         }
2438         tevent_req_set_callback(subreq, recovery_start_recovery_done, req);
2439 }
2440
2441 static void recovery_start_recovery_done(struct tevent_req *subreq)
2442 {
2443         struct tevent_req *req = tevent_req_callback_data(
2444                 subreq, struct tevent_req);
2445         struct recovery_state *state = tevent_req_data(
2446                 req, struct recovery_state);
2447         struct ctdb_req_control request;
2448         int *err_list;
2449         int ret;
2450         bool status;
2451
2452         status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
2453                                                 NULL);
2454         TALLOC_FREE(subreq);
2455         if (! status) {
2456                 int ret2;
2457                 uint32_t pnn;
2458
2459                 ret2 = ctdb_client_control_multi_error(state->pnn_list,
2460                                                        state->count,
2461                                                        err_list, &pnn);
2462                 if (ret2 != 0) {
2463                         D_ERR("failed to run start_recovery event on node %u,"
2464                               " ret=%d\n", pnn, ret2);
2465                 } else {
2466                         D_ERR("failed to run start_recovery event, ret=%d\n",
2467                               ret);
2468                 }
2469                 tevent_req_error(req, ret);
2470                 return;
2471         }
2472
2473         D_ERR("start_recovery event finished\n");
2474
2475         ctdb_req_control_setvnnmap(&request, state->vnnmap);
2476         subreq = ctdb_client_control_multi_send(state, state->ev,
2477                                                 state->client,
2478                                                 state->pnn_list, state->count,
2479                                                 TIMEOUT(), &request);
2480         if (tevent_req_nomem(subreq, req)) {
2481                 return;
2482         }
2483         tevent_req_set_callback(subreq, recovery_vnnmap_update_done, req);
2484 }
2485
2486 static void recovery_vnnmap_update_done(struct tevent_req *subreq)
2487 {
2488         struct tevent_req *req = tevent_req_callback_data(
2489                 subreq, struct tevent_req);
2490         struct recovery_state *state = tevent_req_data(
2491                 req, struct recovery_state);
2492         int *err_list;
2493         int ret;
2494         bool status;
2495
2496         status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
2497                                                 NULL);
2498         TALLOC_FREE(subreq);
2499         if (! status) {
2500                 int ret2;
2501                 uint32_t pnn;
2502
2503                 ret2 = ctdb_client_control_multi_error(state->pnn_list,
2504                                                        state->count,
2505                                                        err_list, &pnn);
2506                 if (ret2 != 0) {
2507                         D_ERR("failed to update VNNMAP on node %u, ret=%d\n",
2508                               pnn, ret2);
2509                 } else {
2510                         D_ERR("failed to update VNNMAP, ret=%d\n", ret);
2511                 }
2512                 tevent_req_error(req, ret);
2513                 return;
2514         }
2515
2516         D_NOTICE("updated VNNMAP\n");
2517
2518         subreq = db_recovery_send(state, state->ev, state->client,
2519                                   state->dbmap, state->tun_list,
2520                                   state->pnn_list, state->count,
2521                                   state->caps, state->ban_credits,
2522                                   state->vnnmap->generation);
2523         if (tevent_req_nomem(subreq, req)) {
2524                 return;
2525         }
2526         tevent_req_set_callback(subreq, recovery_db_recovery_done, req);
2527 }
2528
2529 static void recovery_db_recovery_done(struct tevent_req *subreq)
2530 {
2531         struct tevent_req *req = tevent_req_callback_data(
2532                 subreq, struct tevent_req);
2533         struct recovery_state *state = tevent_req_data(
2534                 req, struct recovery_state);
2535         struct ctdb_req_control request;
2536         bool status;
2537         int count;
2538
2539         status = db_recovery_recv(subreq, &count);
2540         TALLOC_FREE(subreq);
2541
2542         D_ERR("%d of %d databases recovered\n", count, state->dbmap->num);
2543
2544         if (! status) {
2545                 uint32_t max_pnn = CTDB_UNKNOWN_PNN, max_credits = 0;
2546                 int i;
2547
2548                 /* Bans are not enabled */
2549                 if (state->tun_list->enable_bans == 0) {
2550                         tevent_req_error(req, EIO);
2551                         return;
2552                 }
2553
2554                 for (i=0; i<state->count; i++) {
2555                         uint32_t pnn;
2556                         pnn = state->pnn_list[i];
2557                         if (state->ban_credits[pnn] > max_credits) {
2558                                 max_pnn = pnn;
2559                                 max_credits = state->ban_credits[pnn];
2560                         }
2561                 }
2562
2563                 /* If pulling database fails multiple times */
2564                 if (max_credits >= NUM_RETRIES) {
2565                         struct ctdb_req_message message;
2566
2567                         D_ERR("Assigning banning credits to node %u\n",
2568                               max_pnn);
2569
2570                         message.srvid = CTDB_SRVID_BANNING;
2571                         message.data.pnn = max_pnn;
2572
2573                         subreq = ctdb_client_message_send(
2574                                         state, state->ev, state->client,
2575                                         ctdb_client_pnn(state->client),
2576                                         &message);
2577                         if (tevent_req_nomem(subreq, req)) {
2578                                 return;
2579                         }
2580                         tevent_req_set_callback(subreq, recovery_failed_done,
2581                                                 req);
2582                 } else {
2583                         tevent_req_error(req, EIO);
2584                 }
2585                 return;
2586         }
2587
2588         ctdb_req_control_set_recmode(&request, CTDB_RECOVERY_NORMAL);
2589         subreq = ctdb_client_control_multi_send(state, state->ev,
2590                                                 state->client,
2591                                                 state->pnn_list, state->count,
2592                                                 TIMEOUT(), &request);
2593         if (tevent_req_nomem(subreq, req)) {
2594                 return;
2595         }
2596         tevent_req_set_callback(subreq, recovery_normal_done, req);
2597 }
2598
2599 static void recovery_failed_done(struct tevent_req *subreq)
2600 {
2601         struct tevent_req *req = tevent_req_callback_data(
2602                 subreq, struct tevent_req);
2603         int ret;
2604         bool status;
2605
2606         status = ctdb_client_message_recv(subreq, &ret);
2607         TALLOC_FREE(subreq);
2608         if (! status) {
2609                 D_ERR("failed to assign banning credits, ret=%d\n", ret);
2610         }
2611
2612         tevent_req_error(req, EIO);
2613 }
2614
2615 static void recovery_normal_done(struct tevent_req *subreq)
2616 {
2617         struct tevent_req *req = tevent_req_callback_data(
2618                 subreq, struct tevent_req);
2619         struct recovery_state *state = tevent_req_data(
2620                 req, struct recovery_state);
2621         struct ctdb_req_control request;
2622         int *err_list;
2623         int ret;
2624         bool status;
2625
2626         status = ctdb_client_control_multi_recv(subreq, &ret, state, &err_list,
2627                                                 NULL);
2628         TALLOC_FREE(subreq);
2629         if (! status) {
2630                 int ret2;
2631                 uint32_t pnn;
2632
2633                 ret2 = ctdb_client_control_multi_error(state->pnn_list,
2634                                                        state->count,
2635                                                        err_list, &pnn);
2636                 if (ret2 != 0) {
2637                         D_ERR("failed to set recovery mode NORMAL on node %u,"
2638                               " ret=%d\n", pnn, ret2);
2639                 } else {
2640                         D_ERR("failed to set recovery mode NORMAL, ret=%d\n",
2641                               ret);
2642                 }
2643                 tevent_req_error(req, ret);
2644                 return;
2645         }
2646
2647         D_ERR("Set recovery mode to NORMAL\n");
2648
2649         ctdb_req_control_end_recovery(&request);
2650         subreq = ctdb_client_control_multi_send(state, state->ev,
2651                                                 state->client,
2652                                                 state->pnn_list, state->count,
2653                                                 TIMEOUT(), &request);
2654         if (tevent_req_nomem(subreq, req)) {
2655                 return;
2656         }
2657         tevent_req_set_callback(subreq, recovery_end_recovery_done, req);
2658 }
2659
2660 static void recovery_end_recovery_done(struct tevent_req *subreq)
2661 {
2662         struct tevent_req *req = tevent_req_callback_data(
2663                 subreq, struct tevent_req);
2664         struct recovery_state *state = tevent_req_data(
2665                 req, struct recovery_state);
2666         int *err_list;
2667         int ret;
2668         bool status;
2669
2670         status = ctdb_client_control_multi_recv(subreq, &ret, state, &err_list,
2671                                                 NULL);
2672         TALLOC_FREE(subreq);
2673         if (! status) {
2674                 int ret2;
2675                 uint32_t pnn;
2676
2677                 ret2 = ctdb_client_control_multi_error(state->pnn_list,
2678                                                        state->count,
2679                                                        err_list, &pnn);
2680                 if (ret2 != 0) {
2681                         D_ERR("failed to run recovered event on node %u,"
2682                               " ret=%d\n", pnn, ret2);
2683                 } else {
2684                         D_ERR("failed to run recovered event, ret=%d\n", ret);
2685                 }
2686                 tevent_req_error(req, ret);
2687                 return;
2688         }
2689
2690         D_ERR("recovered event finished\n");
2691
2692         tevent_req_done(req);
2693 }
2694
2695 static void recovery_recv(struct tevent_req *req, int *perr)
2696 {
2697         generic_recv(req, perr);
2698 }
2699
2700 static void usage(const char *progname)
2701 {
2702         fprintf(stderr, "\nUsage: %s <output-fd> <ctdb-socket-path> <generation>\n",
2703                 progname);
2704 }
2705
2706
2707 /*
2708  * Arguments - log fd, write fd, socket path, generation
2709  */
2710 int main(int argc, char *argv[])
2711 {
2712         int write_fd;
2713         const char *sockpath;
2714         TALLOC_CTX *mem_ctx;
2715         struct tevent_context *ev;
2716         struct ctdb_client_context *client;
2717         int ret;
2718         struct tevent_req *req;
2719         uint32_t generation;
2720
2721         if (argc != 4) {
2722                 usage(argv[0]);
2723                 exit(1);
2724         }
2725
2726         write_fd = atoi(argv[1]);
2727         sockpath = argv[2];
2728         generation = (uint32_t)strtoul(argv[3], NULL, 0);
2729
2730         mem_ctx = talloc_new(NULL);
2731         if (mem_ctx == NULL) {
2732                 fprintf(stderr, "recovery: talloc_new() failed\n");
2733                 goto failed;
2734         }
2735
2736         ret = logging_init(mem_ctx, NULL, NULL, "ctdb-recovery");
2737         if (ret != 0) {
2738                 fprintf(stderr, "recovery: Unable to initialize logging\n");
2739                 goto failed;
2740         }
2741
2742         ev = tevent_context_init(mem_ctx);
2743         if (ev == NULL) {
2744                 D_ERR("tevent_context_init() failed\n");
2745                 goto failed;
2746         }
2747
2748         ret = ctdb_client_init(mem_ctx, ev, sockpath, &client);
2749         if (ret != 0) {
2750                 D_ERR("ctdb_client_init() failed, ret=%d\n", ret);
2751                 goto failed;
2752         }
2753
2754         req = recovery_send(mem_ctx, ev, client, generation);
2755         if (req == NULL) {
2756                 D_ERR("database_recover_send() failed\n");
2757                 goto failed;
2758         }
2759
2760         if (! tevent_req_poll(req, ev)) {
2761                 D_ERR("tevent_req_poll() failed\n");
2762                 goto failed;
2763         }
2764
2765         recovery_recv(req, &ret);
2766         TALLOC_FREE(req);
2767         if (ret != 0) {
2768                 D_ERR("database recovery failed, ret=%d\n", ret);
2769                 goto failed;
2770         }
2771
2772         sys_write(write_fd, &ret, sizeof(ret));
2773         return 0;
2774
2775 failed:
2776         TALLOC_FREE(mem_ctx);
2777         return 1;
2778 }