ctdb/events: add 47.samba-dcerpcd.script
[metze/samba-autobuild/.git] / ctdb / server / ctdb_recovery_helper.c
1 /*
2    ctdb parallel database recovery
3
4    Copyright (C) Amitay Isaacs  2015
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "replace.h"
21 #include "system/network.h"
22 #include "system/filesys.h"
23
24 #include <talloc.h>
25 #include <tevent.h>
26 #include <tdb.h>
27 #include <libgen.h>
28
29 #include "lib/tdb_wrap/tdb_wrap.h"
30 #include "lib/util/dlinklist.h"
31 #include "lib/util/sys_rw.h"
32 #include "lib/util/time.h"
33 #include "lib/util/tevent_unix.h"
34 #include "lib/util/util.h"
35 #include "lib/util/smb_strtox.h"
36
37 #include "protocol/protocol.h"
38 #include "protocol/protocol_api.h"
39 #include "client/client.h"
40
41 #include "common/logging.h"
42
43 static int recover_timeout = 30;
44
45 #define NUM_RETRIES     3
46
47 #define TIMEOUT()       timeval_current_ofs(recover_timeout, 0)
48
49 /*
50  * Utility functions
51  */
52
53 static bool generic_recv(struct tevent_req *req, int *perr)
54 {
55         int err;
56
57         if (tevent_req_is_unix_error(req, &err)) {
58                 if (perr != NULL) {
59                         *perr = err;
60                 }
61                 return false;
62         }
63
64         return true;
65 }
66
67 static uint64_t rec_srvid = CTDB_SRVID_RECOVERY;
68
69 static uint64_t srvid_next(void)
70 {
71         rec_srvid += 1;
72         return rec_srvid;
73 }
74
75 /*
76  * Node related functions
77  */
78
79 struct node_list {
80         uint32_t *pnn_list;
81         uint32_t *caps;
82         uint32_t *ban_credits;
83         unsigned int size;
84         unsigned int count;
85 };
86
87 static struct node_list *node_list_init(TALLOC_CTX *mem_ctx, unsigned int size)
88 {
89         struct node_list *nlist;
90         unsigned int i;
91
92         nlist = talloc_zero(mem_ctx, struct node_list);
93         if (nlist == NULL) {
94                 return NULL;
95         }
96
97         nlist->pnn_list = talloc_array(nlist, uint32_t, size);
98         nlist->caps = talloc_zero_array(nlist, uint32_t, size);
99         nlist->ban_credits = talloc_zero_array(nlist, uint32_t, size);
100
101         if (nlist->pnn_list == NULL ||
102             nlist->caps == NULL ||
103             nlist->ban_credits == NULL) {
104                 talloc_free(nlist);
105                 return NULL;
106         }
107         nlist->size = size;
108
109         for (i=0; i<nlist->size; i++) {
110                 nlist->pnn_list[i] = CTDB_UNKNOWN_PNN;
111         }
112
113         return nlist;
114 }
115
116 static bool node_list_add(struct node_list *nlist, uint32_t pnn)
117 {
118         unsigned int i;
119
120         if (nlist->count == nlist->size) {
121                 return false;
122         }
123
124         for (i=0; i<nlist->count; i++) {
125                 if (nlist->pnn_list[i] == pnn) {
126                         return false;
127                 }
128         }
129
130         nlist->pnn_list[nlist->count] = pnn;
131         nlist->count += 1;
132
133         return true;
134 }
135
136 static uint32_t *node_list_lmaster(struct node_list *nlist,
137                                    TALLOC_CTX *mem_ctx,
138                                    unsigned int *pnn_count)
139 {
140         uint32_t *pnn_list;
141         unsigned int count, i;
142
143         pnn_list = talloc_zero_array(mem_ctx, uint32_t, nlist->count);
144         if (pnn_list == NULL) {
145                 return NULL;
146         }
147
148         count = 0;
149         for (i=0; i<nlist->count; i++) {
150                 if (!(nlist->caps[i] & CTDB_CAP_LMASTER)) {
151                         continue;
152                 }
153
154                 pnn_list[count] = nlist->pnn_list[i];
155                 count += 1;
156         }
157
158         *pnn_count = count;
159         return pnn_list;
160 }
161
162 static void node_list_ban_credits(struct node_list *nlist, uint32_t pnn)
163 {
164         unsigned int i;
165
166         for (i=0; i<nlist->count; i++) {
167                 if (nlist->pnn_list[i] == pnn) {
168                         nlist->ban_credits[i] += 1;
169                         break;
170                 }
171         }
172 }
173
174 /*
175  * Database list functions
176  *
177  * Simple, naive implementation that could be updated to a db_hash or similar
178  */
179
180 struct db {
181         struct db *prev, *next;
182
183         uint32_t db_id;
184         uint32_t db_flags;
185         uint32_t *pnn_list;
186         unsigned int num_nodes;
187 };
188
189 struct db_list {
190         unsigned int num_dbs;
191         struct db *db;
192         unsigned int num_nodes;
193 };
194
195 static struct db_list *db_list_init(TALLOC_CTX *mem_ctx, unsigned int num_nodes)
196 {
197         struct db_list *l;
198
199         l = talloc_zero(mem_ctx, struct db_list);
200         l->num_nodes = num_nodes;
201
202         return l;
203 }
204
205 static struct db *db_list_find(struct db_list *dblist, uint32_t db_id)
206 {
207         struct db *db;
208
209         if (dblist == NULL) {
210                 return NULL;
211         }
212
213         db = dblist->db;
214         while (db != NULL && db->db_id != db_id) {
215                 db = db->next;
216         }
217
218         return db;
219 }
220
221 static int db_list_add(struct db_list *dblist,
222                        uint32_t db_id,
223                        uint32_t db_flags,
224                        uint32_t node)
225 {
226         struct db *db = NULL;
227
228         if (dblist == NULL) {
229                 return EINVAL;
230         }
231
232         db = talloc_zero(dblist, struct db);
233         if (db == NULL) {
234                 return ENOMEM;
235         }
236
237         db->db_id = db_id;
238         db->db_flags = db_flags;
239         db->pnn_list = talloc_zero_array(db, uint32_t, dblist->num_nodes);
240         if (db->pnn_list == NULL) {
241                 talloc_free(db);
242                 return ENOMEM;
243         }
244         db->pnn_list[0] = node;
245         db->num_nodes = 1;
246
247         DLIST_ADD_END(dblist->db, db);
248         dblist->num_dbs++;
249
250         return 0;
251 }
252
253 static int db_list_check_and_add(struct db_list *dblist,
254                        uint32_t db_id,
255                        uint32_t db_flags,
256                        uint32_t node)
257 {
258         struct db *db = NULL;
259         int ret;
260
261         /*
262          * These flags are masked out because they are only set on a
263          * node when a client attaches to that node, so they might not
264          * be set yet.  They can't be passed as part of the attach, so
265          * they're no use here.
266          */
267         db_flags &= ~(CTDB_DB_FLAGS_READONLY | CTDB_DB_FLAGS_STICKY);
268
269         if (dblist == NULL) {
270                 return EINVAL;
271         }
272
273         db = db_list_find(dblist, db_id);
274         if (db == NULL) {
275                 ret = db_list_add(dblist, db_id, db_flags, node);
276                 return ret;
277         }
278
279         if (db->db_flags != db_flags) {
280                 D_ERR("Incompatible database flags for 0x%"PRIx32" "
281                       "(0x%"PRIx32" != 0x%"PRIx32")\n",
282                       db_id,
283                       db_flags,
284                       db->db_flags);
285                 return EINVAL;
286         }
287
288         if (db->num_nodes >= dblist->num_nodes) {
289                 return EINVAL;
290         }
291
292         db->pnn_list[db->num_nodes] = node;
293         db->num_nodes++;
294
295         return 0;
296 }
297
298 /*
299  * Create database on nodes where it is missing
300  */
301
302 struct db_create_missing_state {
303         struct tevent_context *ev;
304         struct ctdb_client_context *client;
305
306         struct node_list *nlist;
307
308         const char *db_name;
309         uint32_t *missing_pnn_list;
310         int missing_num_nodes;
311 };
312
313 static void db_create_missing_done(struct tevent_req *subreq);
314
315 static struct tevent_req *db_create_missing_send(
316                                         TALLOC_CTX *mem_ctx,
317                                         struct tevent_context *ev,
318                                         struct ctdb_client_context *client,
319                                         struct node_list *nlist,
320                                         const char *db_name,
321                                         struct db *db)
322 {
323         struct tevent_req *req, *subreq;
324         struct db_create_missing_state *state;
325         struct ctdb_req_control request;
326         unsigned int i, j;
327
328         req = tevent_req_create(mem_ctx,
329                                 &state,
330                                 struct db_create_missing_state);
331         if (req == NULL) {
332                 return NULL;
333         }
334
335         state->ev = ev;
336         state->client = client;
337         state->nlist = nlist;
338         state->db_name = db_name;
339
340         if (nlist->count == db->num_nodes) {
341                 tevent_req_done(req);
342                 return tevent_req_post(req, ev);
343         }
344
345         state->missing_pnn_list = talloc_array(mem_ctx, uint32_t, nlist->count);
346         if (tevent_req_nomem(state->missing_pnn_list, req)) {
347                 return tevent_req_post(req, ev);
348         }
349
350         for (i = 0; i < nlist->count; i++) {
351                 uint32_t pnn = nlist->pnn_list[i] ;
352
353                 for (j = 0; j < db->num_nodes; j++) {
354                         if (pnn == db->pnn_list[j]) {
355                                 break;
356                         }
357                 }
358
359                 if (j < db->num_nodes) {
360                         continue;
361                 }
362
363                 DBG_INFO("Create database %s on node %u\n",
364                          state->db_name,
365                          pnn);
366                 state->missing_pnn_list[state->missing_num_nodes] = pnn;
367                 state->missing_num_nodes++;
368         }
369
370         if (db->db_flags & CTDB_DB_FLAGS_PERSISTENT) {
371                 ctdb_req_control_db_attach_persistent(&request, db_name);
372         } else if (db->db_flags & CTDB_DB_FLAGS_REPLICATED) {
373                 ctdb_req_control_db_attach_replicated(&request, db_name);
374         } else {
375                 ctdb_req_control_db_attach(&request, db_name);
376         }
377         request.flags = CTDB_CTRL_FLAG_ATTACH_RECOVERY;
378         subreq = ctdb_client_control_multi_send(state,
379                                                 state->ev,
380                                                 state->client,
381                                                 state->missing_pnn_list,
382                                                 state->missing_num_nodes,
383                                                 TIMEOUT(),
384                                                 &request);
385         if (tevent_req_nomem(subreq, req)) {
386                 return tevent_req_post(req, ev);
387         }
388         tevent_req_set_callback(subreq, db_create_missing_done, req);
389
390         return req;
391 }
392
393 static void db_create_missing_done(struct tevent_req *subreq)
394 {
395         struct tevent_req *req = tevent_req_callback_data(
396                 subreq, struct tevent_req);
397         struct db_create_missing_state *state = tevent_req_data(
398                 req, struct db_create_missing_state);
399         int *err_list;
400         int ret;
401         bool status;
402
403         status = ctdb_client_control_multi_recv(subreq,
404                                                 &ret,
405                                                 NULL,
406                                                 &err_list,
407                                                 NULL);
408         TALLOC_FREE(subreq);
409         if (! status) {
410                 int ret2;
411                 uint32_t pnn;
412
413                 ret2 = ctdb_client_control_multi_error(
414                                                 state->missing_pnn_list,
415                                                 state->missing_num_nodes,
416                                                 err_list,
417                                                 &pnn);
418                 if (ret2 != 0) {
419                         D_ERR("control DB_ATTACH failed for db %s"
420                               " on node %u, ret=%d\n",
421                               state->db_name,
422                               pnn,
423                               ret2);
424                         node_list_ban_credits(state->nlist, pnn);
425                 } else {
426                         D_ERR("control DB_ATTACH failed for db %s, ret=%d\n",
427                               state->db_name,
428                               ret);
429                 }
430                 tevent_req_error(req, ret);
431                 return;
432         }
433
434         tevent_req_done(req);
435 }
436
437 static bool db_create_missing_recv(struct tevent_req *req, int *perr)
438 {
439         return generic_recv(req, perr);
440 }
441
442 /*
443  * Recovery database functions
444  */
445
446 struct recdb_context {
447         uint32_t db_id;
448         const char *db_name;
449         const char *db_path;
450         struct tdb_wrap *db;
451         bool persistent;
452 };
453
454 static struct recdb_context *recdb_create(TALLOC_CTX *mem_ctx, uint32_t db_id,
455                                           const char *db_name,
456                                           const char *db_path,
457                                           uint32_t hash_size, bool persistent)
458 {
459         static char *db_dir_state = NULL;
460         struct recdb_context *recdb;
461         unsigned int tdb_flags;
462
463         recdb = talloc(mem_ctx, struct recdb_context);
464         if (recdb == NULL) {
465                 return NULL;
466         }
467
468         if (db_dir_state == NULL) {
469                 db_dir_state = getenv("CTDB_DBDIR_STATE");
470         }
471
472         recdb->db_name = db_name;
473         recdb->db_id = db_id;
474         recdb->db_path = talloc_asprintf(recdb, "%s/recdb.%s",
475                                          db_dir_state != NULL ?
476                                             db_dir_state :
477                                             dirname(discard_const(db_path)),
478                                          db_name);
479         if (recdb->db_path == NULL) {
480                 talloc_free(recdb);
481                 return NULL;
482         }
483         unlink(recdb->db_path);
484
485         tdb_flags = TDB_NOLOCK | TDB_INCOMPATIBLE_HASH | TDB_DISALLOW_NESTING;
486         recdb->db = tdb_wrap_open(mem_ctx, recdb->db_path, hash_size,
487                                   tdb_flags, O_RDWR|O_CREAT|O_EXCL, 0600);
488         if (recdb->db == NULL) {
489                 talloc_free(recdb);
490                 D_ERR("failed to create recovery db %s\n", recdb->db_path);
491                 return NULL;
492         }
493
494         recdb->persistent = persistent;
495
496         return recdb;
497 }
498
499 static uint32_t recdb_id(struct recdb_context *recdb)
500 {
501         return recdb->db_id;
502 }
503
504 static const char *recdb_name(struct recdb_context *recdb)
505 {
506         return recdb->db_name;
507 }
508
509 static const char *recdb_path(struct recdb_context *recdb)
510 {
511         return recdb->db_path;
512 }
513
514 static struct tdb_context *recdb_tdb(struct recdb_context *recdb)
515 {
516         return recdb->db->tdb;
517 }
518
519 static bool recdb_persistent(struct recdb_context *recdb)
520 {
521         return recdb->persistent;
522 }
523
524 struct recdb_add_traverse_state {
525         struct recdb_context *recdb;
526         uint32_t mypnn;
527 };
528
529 static int recdb_add_traverse(uint32_t reqid, struct ctdb_ltdb_header *header,
530                               TDB_DATA key, TDB_DATA data,
531                               void *private_data)
532 {
533         struct recdb_add_traverse_state *state =
534                 (struct recdb_add_traverse_state *)private_data;
535         struct ctdb_ltdb_header *hdr;
536         TDB_DATA prev_data;
537         int ret;
538
539         /* header is not marshalled separately in the pulldb control */
540         if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
541                 return -1;
542         }
543
544         hdr = (struct ctdb_ltdb_header *)data.dptr;
545
546         /* fetch the existing record, if any */
547         prev_data = tdb_fetch(recdb_tdb(state->recdb), key);
548
549         if (prev_data.dptr != NULL) {
550                 struct ctdb_ltdb_header prev_hdr;
551
552                 prev_hdr = *(struct ctdb_ltdb_header *)prev_data.dptr;
553                 free(prev_data.dptr);
554                 if (hdr->rsn < prev_hdr.rsn ||
555                     (hdr->rsn == prev_hdr.rsn &&
556                      prev_hdr.dmaster != state->mypnn)) {
557                         return 0;
558                 }
559         }
560
561         ret = tdb_store(recdb_tdb(state->recdb), key, data, TDB_REPLACE);
562         if (ret != 0) {
563                 return -1;
564         }
565         return 0;
566 }
567
568 static bool recdb_add(struct recdb_context *recdb, int mypnn,
569                       struct ctdb_rec_buffer *recbuf)
570 {
571         struct recdb_add_traverse_state state;
572         int ret;
573
574         state.recdb = recdb;
575         state.mypnn = mypnn;
576
577         ret = ctdb_rec_buffer_traverse(recbuf, recdb_add_traverse, &state);
578         if (ret != 0) {
579                 return false;
580         }
581
582         return true;
583 }
584
585 /* This function decides which records from recdb are retained */
586 static int recbuf_filter_add(struct ctdb_rec_buffer *recbuf, bool persistent,
587                              uint32_t reqid, uint32_t dmaster,
588                              TDB_DATA key, TDB_DATA data)
589 {
590         struct ctdb_ltdb_header *header;
591         int ret;
592
593         /* Skip empty records */
594         if (data.dsize <= sizeof(struct ctdb_ltdb_header)) {
595                 return 0;
596         }
597
598         /* update the dmaster field to point to us */
599         header = (struct ctdb_ltdb_header *)data.dptr;
600         if (!persistent) {
601                 header->dmaster = dmaster;
602                 header->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
603         }
604
605         ret = ctdb_rec_buffer_add(recbuf, recbuf, reqid, NULL, key, data);
606         if (ret != 0) {
607                 return ret;
608         }
609
610         return 0;
611 }
612
613 struct recdb_file_traverse_state {
614         struct ctdb_rec_buffer *recbuf;
615         struct recdb_context *recdb;
616         TALLOC_CTX *mem_ctx;
617         uint32_t dmaster;
618         uint32_t reqid;
619         bool persistent;
620         bool failed;
621         int fd;
622         size_t max_size;
623         unsigned int num_buffers;
624 };
625
626 static int recdb_file_traverse(struct tdb_context *tdb,
627                                TDB_DATA key, TDB_DATA data,
628                                void *private_data)
629 {
630         struct recdb_file_traverse_state *state =
631                 (struct recdb_file_traverse_state *)private_data;
632         int ret;
633
634         ret = recbuf_filter_add(state->recbuf, state->persistent,
635                                 state->reqid, state->dmaster, key, data);
636         if (ret != 0) {
637                 state->failed = true;
638                 return ret;
639         }
640
641         if (ctdb_rec_buffer_len(state->recbuf) > state->max_size) {
642                 ret = ctdb_rec_buffer_write(state->recbuf, state->fd);
643                 if (ret != 0) {
644                         D_ERR("Failed to collect recovery records for %s\n",
645                               recdb_name(state->recdb));
646                         state->failed = true;
647                         return ret;
648                 }
649
650                 state->num_buffers += 1;
651
652                 TALLOC_FREE(state->recbuf);
653                 state->recbuf = ctdb_rec_buffer_init(state->mem_ctx,
654                                                      recdb_id(state->recdb));
655                 if (state->recbuf == NULL) {
656                         state->failed = true;
657                         return ENOMEM;
658                 }
659         }
660
661         return 0;
662 }
663
664 static int recdb_file(struct recdb_context *recdb, TALLOC_CTX *mem_ctx,
665                       uint32_t dmaster, int fd, int max_size)
666 {
667         struct recdb_file_traverse_state state;
668         int ret;
669
670         state.recbuf = ctdb_rec_buffer_init(mem_ctx, recdb_id(recdb));
671         if (state.recbuf == NULL) {
672                 return -1;
673         }
674         state.recdb = recdb;
675         state.mem_ctx = mem_ctx;
676         state.dmaster = dmaster;
677         state.reqid = 0;
678         state.persistent = recdb_persistent(recdb);
679         state.failed = false;
680         state.fd = fd;
681         state.max_size = max_size;
682         state.num_buffers = 0;
683
684         ret = tdb_traverse_read(recdb_tdb(recdb), recdb_file_traverse, &state);
685         if (ret == -1 || state.failed) {
686                 TALLOC_FREE(state.recbuf);
687                 return -1;
688         }
689
690         ret = ctdb_rec_buffer_write(state.recbuf, fd);
691         if (ret != 0) {
692                 D_ERR("Failed to collect recovery records for %s\n",
693                       recdb_name(recdb));
694                 TALLOC_FREE(state.recbuf);
695                 return -1;
696         }
697         state.num_buffers += 1;
698
699         D_DEBUG("Wrote %d buffers of recovery records for %s\n",
700                 state.num_buffers, recdb_name(recdb));
701
702         return state.num_buffers;
703 }
704
705 /*
706  * Pull database from a single node
707  */
708
709 struct pull_database_state {
710         struct tevent_context *ev;
711         struct ctdb_client_context *client;
712         struct recdb_context *recdb;
713         uint32_t pnn;
714         uint64_t srvid;
715         unsigned int num_records;
716         int result;
717 };
718
719 static void pull_database_handler(uint64_t srvid, TDB_DATA data,
720                                   void *private_data);
721 static void pull_database_register_done(struct tevent_req *subreq);
722 static void pull_database_unregister_done(struct tevent_req *subreq);
723 static void pull_database_done(struct tevent_req *subreq);
724
725 static struct tevent_req *pull_database_send(
726                         TALLOC_CTX *mem_ctx,
727                         struct tevent_context *ev,
728                         struct ctdb_client_context *client,
729                         uint32_t pnn,
730                         struct recdb_context *recdb)
731 {
732         struct tevent_req *req, *subreq;
733         struct pull_database_state *state;
734
735         req = tevent_req_create(mem_ctx, &state, struct pull_database_state);
736         if (req == NULL) {
737                 return NULL;
738         }
739
740         state->ev = ev;
741         state->client = client;
742         state->recdb = recdb;
743         state->pnn = pnn;
744         state->srvid = srvid_next();
745
746         subreq = ctdb_client_set_message_handler_send(
747                                         state, state->ev, state->client,
748                                         state->srvid, pull_database_handler,
749                                         req);
750         if (tevent_req_nomem(subreq, req)) {
751                 return tevent_req_post(req, ev);
752         }
753
754         tevent_req_set_callback(subreq, pull_database_register_done, req);
755
756         return req;
757 }
758
759 static void pull_database_handler(uint64_t srvid, TDB_DATA data,
760                                   void *private_data)
761 {
762         struct tevent_req *req = talloc_get_type_abort(
763                 private_data, struct tevent_req);
764         struct pull_database_state *state = tevent_req_data(
765                 req, struct pull_database_state);
766         struct ctdb_rec_buffer *recbuf;
767         size_t np;
768         int ret;
769         bool status;
770
771         if (srvid != state->srvid) {
772                 return;
773         }
774
775         ret = ctdb_rec_buffer_pull(data.dptr, data.dsize, state, &recbuf, &np);
776         if (ret != 0) {
777                 D_ERR("Invalid data received for DB_PULL messages\n");
778                 return;
779         }
780
781         if (recbuf->db_id != recdb_id(state->recdb)) {
782                 talloc_free(recbuf);
783                 D_ERR("Invalid dbid:%08x for DB_PULL messages for %s\n",
784                       recbuf->db_id, recdb_name(state->recdb));
785                 return;
786         }
787
788         status = recdb_add(state->recdb, ctdb_client_pnn(state->client),
789                            recbuf);
790         if (! status) {
791                 talloc_free(recbuf);
792                 D_ERR("Failed to add records to recdb for %s\n",
793                       recdb_name(state->recdb));
794                 return;
795         }
796
797         state->num_records += recbuf->count;
798         talloc_free(recbuf);
799 }
800
801 static void pull_database_register_done(struct tevent_req *subreq)
802 {
803         struct tevent_req *req = tevent_req_callback_data(
804                 subreq, struct tevent_req);
805         struct pull_database_state *state = tevent_req_data(
806                 req, struct pull_database_state);
807         struct ctdb_req_control request;
808         struct ctdb_pulldb_ext pulldb_ext;
809         int ret;
810         bool status;
811
812         status = ctdb_client_set_message_handler_recv(subreq, &ret);
813         TALLOC_FREE(subreq);
814         if (! status) {
815                 D_ERR("Failed to set message handler for DB_PULL for %s\n",
816                       recdb_name(state->recdb));
817                 tevent_req_error(req, ret);
818                 return;
819         }
820
821         pulldb_ext.db_id = recdb_id(state->recdb);
822         pulldb_ext.lmaster = CTDB_LMASTER_ANY;
823         pulldb_ext.srvid = state->srvid;
824
825         ctdb_req_control_db_pull(&request, &pulldb_ext);
826         subreq = ctdb_client_control_send(state, state->ev, state->client,
827                                           state->pnn, TIMEOUT(), &request);
828         if (tevent_req_nomem(subreq, req)) {
829                 return;
830         }
831         tevent_req_set_callback(subreq, pull_database_done, req);
832 }
833
834 static void pull_database_done(struct tevent_req *subreq)
835 {
836         struct tevent_req *req = tevent_req_callback_data(
837                 subreq, struct tevent_req);
838         struct pull_database_state *state = tevent_req_data(
839                 req, struct pull_database_state);
840         struct ctdb_reply_control *reply;
841         uint32_t num_records;
842         int ret;
843         bool status;
844
845         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
846         TALLOC_FREE(subreq);
847         if (! status) {
848                 D_ERR("control DB_PULL failed for %s on node %u, ret=%d\n",
849                       recdb_name(state->recdb), state->pnn, ret);
850                 state->result = ret;
851                 goto unregister;
852         }
853
854         ret = ctdb_reply_control_db_pull(reply, &num_records);
855         talloc_free(reply);
856         if (num_records != state->num_records) {
857                 D_ERR("mismatch (%u != %u) in DB_PULL records for db %s\n",
858                       num_records, state->num_records,
859                       recdb_name(state->recdb));
860                 state->result = EIO;
861                 goto unregister;
862         }
863
864         D_INFO("Pulled %d records for db %s from node %d\n",
865                state->num_records, recdb_name(state->recdb), state->pnn);
866
867 unregister:
868
869         subreq = ctdb_client_remove_message_handler_send(
870                                         state, state->ev, state->client,
871                                         state->srvid, req);
872         if (tevent_req_nomem(subreq, req)) {
873                 return;
874         }
875         tevent_req_set_callback(subreq, pull_database_unregister_done, req);
876 }
877
878 static void pull_database_unregister_done(struct tevent_req *subreq)
879 {
880         struct tevent_req *req = tevent_req_callback_data(
881                 subreq, struct tevent_req);
882         struct pull_database_state *state = tevent_req_data(
883                 req, struct pull_database_state);
884         int ret;
885         bool status;
886
887         status = ctdb_client_remove_message_handler_recv(subreq, &ret);
888         TALLOC_FREE(subreq);
889         if (! status) {
890                 D_ERR("failed to remove message handler for DB_PULL for db %s\n",
891                       recdb_name(state->recdb));
892                 tevent_req_error(req, ret);
893                 return;
894         }
895
896         if (state->result != 0) {
897                 tevent_req_error(req, state->result);
898                 return;
899         }
900
901         tevent_req_done(req);
902 }
903
904 static bool pull_database_recv(struct tevent_req *req, int *perr)
905 {
906         return generic_recv(req, perr);
907 }
908
909 /*
910  * Push database to specified nodes (new style)
911  */
912
913 struct push_database_state {
914         struct tevent_context *ev;
915         struct ctdb_client_context *client;
916         struct recdb_context *recdb;
917         uint32_t *pnn_list;
918         unsigned int count;
919         uint64_t srvid;
920         uint32_t dmaster;
921         int fd;
922         int num_buffers;
923         int num_buffers_sent;
924         unsigned int num_records;
925 };
926
927 static void push_database_started(struct tevent_req *subreq);
928 static void push_database_send_msg(struct tevent_req *req);
929 static void push_database_send_done(struct tevent_req *subreq);
930 static void push_database_confirmed(struct tevent_req *subreq);
931
932 static struct tevent_req *push_database_send(
933                         TALLOC_CTX *mem_ctx,
934                         struct tevent_context *ev,
935                         struct ctdb_client_context *client,
936                         uint32_t *pnn_list,
937                         unsigned int count,
938                         struct recdb_context *recdb,
939                         int max_size)
940 {
941         struct tevent_req *req, *subreq;
942         struct push_database_state *state;
943         struct ctdb_req_control request;
944         struct ctdb_pulldb_ext pulldb_ext;
945         char *filename;
946         off_t offset;
947
948         req = tevent_req_create(mem_ctx, &state,
949                                 struct push_database_state);
950         if (req == NULL) {
951                 return NULL;
952         }
953
954         state->ev = ev;
955         state->client = client;
956         state->recdb = recdb;
957         state->pnn_list = pnn_list;
958         state->count = count;
959
960         state->srvid = srvid_next();
961         state->dmaster = ctdb_client_pnn(client);
962         state->num_buffers_sent = 0;
963         state->num_records = 0;
964
965         filename = talloc_asprintf(state, "%s.dat", recdb_path(recdb));
966         if (tevent_req_nomem(filename, req)) {
967                 return tevent_req_post(req, ev);
968         }
969
970         state->fd = open(filename, O_RDWR|O_CREAT, 0644);
971         if (state->fd == -1) {
972                 tevent_req_error(req, errno);
973                 return tevent_req_post(req, ev);
974         }
975         unlink(filename);
976         talloc_free(filename);
977
978         state->num_buffers = recdb_file(recdb, state, state->dmaster,
979                                         state->fd, max_size);
980         if (state->num_buffers == -1) {
981                 tevent_req_error(req, ENOMEM);
982                 return tevent_req_post(req, ev);
983         }
984
985         offset = lseek(state->fd, 0, SEEK_SET);
986         if (offset != 0) {
987                 tevent_req_error(req, EIO);
988                 return tevent_req_post(req, ev);
989         }
990
991         pulldb_ext.db_id = recdb_id(recdb);
992         pulldb_ext.srvid = state->srvid;
993
994         ctdb_req_control_db_push_start(&request, &pulldb_ext);
995         subreq = ctdb_client_control_multi_send(state, ev, client,
996                                                 pnn_list, count,
997                                                 TIMEOUT(), &request);
998         if (tevent_req_nomem(subreq, req)) {
999                 return tevent_req_post(req, ev);
1000         }
1001         tevent_req_set_callback(subreq, push_database_started, req);
1002
1003         return req;
1004 }
1005
1006 static void push_database_started(struct tevent_req *subreq)
1007 {
1008         struct tevent_req *req = tevent_req_callback_data(
1009                 subreq, struct tevent_req);
1010         struct push_database_state *state = tevent_req_data(
1011                 req, struct push_database_state);
1012         int *err_list;
1013         int ret;
1014         bool status;
1015
1016         status = ctdb_client_control_multi_recv(subreq, &ret, state,
1017                                                 &err_list, NULL);
1018         TALLOC_FREE(subreq);
1019         if (! status) {
1020                 int ret2;
1021                 uint32_t pnn;
1022
1023                 ret2 = ctdb_client_control_multi_error(state->pnn_list,
1024                                                        state->count,
1025                                                        err_list, &pnn);
1026                 if (ret2 != 0) {
1027                         D_ERR("control DB_PUSH_START failed for db %s"
1028                               " on node %u, ret=%d\n",
1029                               recdb_name(state->recdb), pnn, ret2);
1030                 } else {
1031                         D_ERR("control DB_PUSH_START failed for db %s,"
1032                               " ret=%d\n",
1033                               recdb_name(state->recdb), ret);
1034                 }
1035                 talloc_free(err_list);
1036
1037                 tevent_req_error(req, ret);
1038                 return;
1039         }
1040
1041         push_database_send_msg(req);
1042 }
1043
1044 static void push_database_send_msg(struct tevent_req *req)
1045 {
1046         struct push_database_state *state = tevent_req_data(
1047                 req, struct push_database_state);
1048         struct tevent_req *subreq;
1049         struct ctdb_rec_buffer *recbuf;
1050         struct ctdb_req_message message;
1051         TDB_DATA data;
1052         size_t np;
1053         int ret;
1054
1055         if (state->num_buffers_sent == state->num_buffers) {
1056                 struct ctdb_req_control request;
1057
1058                 ctdb_req_control_db_push_confirm(&request,
1059                                                  recdb_id(state->recdb));
1060                 subreq = ctdb_client_control_multi_send(state, state->ev,
1061                                                         state->client,
1062                                                         state->pnn_list,
1063                                                         state->count,
1064                                                         TIMEOUT(), &request);
1065                 if (tevent_req_nomem(subreq, req)) {
1066                         return;
1067                 }
1068                 tevent_req_set_callback(subreq, push_database_confirmed, req);
1069                 return;
1070         }
1071
1072         ret = ctdb_rec_buffer_read(state->fd, state, &recbuf);
1073         if (ret != 0) {
1074                 tevent_req_error(req, ret);
1075                 return;
1076         }
1077
1078         data.dsize = ctdb_rec_buffer_len(recbuf);
1079         data.dptr = talloc_size(state, data.dsize);
1080         if (tevent_req_nomem(data.dptr, req)) {
1081                 return;
1082         }
1083
1084         ctdb_rec_buffer_push(recbuf, data.dptr, &np);
1085
1086         message.srvid = state->srvid;
1087         message.data.data = data;
1088
1089         D_DEBUG("Pushing buffer %d with %d records for db %s\n",
1090                 state->num_buffers_sent, recbuf->count,
1091                 recdb_name(state->recdb));
1092
1093         subreq = ctdb_client_message_multi_send(state, state->ev,
1094                                                 state->client,
1095                                                 state->pnn_list, state->count,
1096                                                 &message);
1097         if (tevent_req_nomem(subreq, req)) {
1098                 return;
1099         }
1100         tevent_req_set_callback(subreq, push_database_send_done, req);
1101
1102         state->num_records += recbuf->count;
1103
1104         talloc_free(data.dptr);
1105         talloc_free(recbuf);
1106 }
1107
1108 static void push_database_send_done(struct tevent_req *subreq)
1109 {
1110         struct tevent_req *req = tevent_req_callback_data(
1111                 subreq, struct tevent_req);
1112         struct push_database_state *state = tevent_req_data(
1113                 req, struct push_database_state);
1114         bool status;
1115         int ret;
1116
1117         status = ctdb_client_message_multi_recv(subreq, &ret, NULL, NULL);
1118         TALLOC_FREE(subreq);
1119         if (! status) {
1120                 D_ERR("Sending recovery records failed for %s\n",
1121                       recdb_name(state->recdb));
1122                 tevent_req_error(req, ret);
1123                 return;
1124         }
1125
1126         state->num_buffers_sent += 1;
1127
1128         push_database_send_msg(req);
1129 }
1130
1131 static void push_database_confirmed(struct tevent_req *subreq)
1132 {
1133         struct tevent_req *req = tevent_req_callback_data(
1134                 subreq, struct tevent_req);
1135         struct push_database_state *state = tevent_req_data(
1136                 req, struct push_database_state);
1137         struct ctdb_reply_control **reply;
1138         int *err_list;
1139         bool status;
1140         unsigned int i;
1141         int ret;
1142         uint32_t num_records;
1143
1144         status = ctdb_client_control_multi_recv(subreq, &ret, state,
1145                                                 &err_list, &reply);
1146         TALLOC_FREE(subreq);
1147         if (! status) {
1148                 int ret2;
1149                 uint32_t pnn;
1150
1151                 ret2 = ctdb_client_control_multi_error(state->pnn_list,
1152                                                        state->count, err_list,
1153                                                        &pnn);
1154                 if (ret2 != 0) {
1155                         D_ERR("control DB_PUSH_CONFIRM failed for db %s"
1156                               " on node %u, ret=%d\n",
1157                               recdb_name(state->recdb), pnn, ret2);
1158                 } else {
1159                         D_ERR("control DB_PUSH_CONFIRM failed for db %s,"
1160                               " ret=%d\n",
1161                               recdb_name(state->recdb), ret);
1162                 }
1163                 tevent_req_error(req, ret);
1164                 return;
1165         }
1166
1167         for (i=0; i<state->count; i++) {
1168                 ret = ctdb_reply_control_db_push_confirm(reply[i],
1169                                                          &num_records);
1170                 if (ret != 0) {
1171                         tevent_req_error(req, EPROTO);
1172                         return;
1173                 }
1174
1175                 if (num_records != state->num_records) {
1176                         D_ERR("Node %u received %d of %d records for %s\n",
1177                               state->pnn_list[i], num_records,
1178                               state->num_records, recdb_name(state->recdb));
1179                         tevent_req_error(req, EPROTO);
1180                         return;
1181                 }
1182         }
1183
1184         talloc_free(reply);
1185
1186         D_INFO("Pushed %d records for db %s\n",
1187                state->num_records, recdb_name(state->recdb));
1188
1189         tevent_req_done(req);
1190 }
1191
1192 static bool push_database_recv(struct tevent_req *req, int *perr)
1193 {
1194         return generic_recv(req, perr);
1195 }
1196
1197 /*
1198  * Collect databases using highest sequence number
1199  */
1200
1201 struct collect_highseqnum_db_state {
1202         struct tevent_context *ev;
1203         struct ctdb_client_context *client;
1204         struct node_list *nlist;
1205         uint32_t db_id;
1206         struct recdb_context *recdb;
1207
1208         uint32_t max_pnn;
1209 };
1210
1211 static void collect_highseqnum_db_seqnum_done(struct tevent_req *subreq);
1212 static void collect_highseqnum_db_pulldb_done(struct tevent_req *subreq);
1213
1214 static struct tevent_req *collect_highseqnum_db_send(
1215                         TALLOC_CTX *mem_ctx,
1216                         struct tevent_context *ev,
1217                         struct ctdb_client_context *client,
1218                         struct node_list *nlist,
1219                         uint32_t db_id,
1220                         struct recdb_context *recdb)
1221 {
1222         struct tevent_req *req, *subreq;
1223         struct collect_highseqnum_db_state *state;
1224         struct ctdb_req_control request;
1225
1226         req = tevent_req_create(mem_ctx, &state,
1227                                 struct collect_highseqnum_db_state);
1228         if (req == NULL) {
1229                 return NULL;
1230         }
1231
1232         state->ev = ev;
1233         state->client = client;
1234         state->nlist = nlist;
1235         state->db_id = db_id;
1236         state->recdb = recdb;
1237
1238         ctdb_req_control_get_db_seqnum(&request, db_id);
1239         subreq = ctdb_client_control_multi_send(mem_ctx,
1240                                                 ev,
1241                                                 client,
1242                                                 nlist->pnn_list,
1243                                                 nlist->count,
1244                                                 TIMEOUT(),
1245                                                 &request);
1246         if (tevent_req_nomem(subreq, req)) {
1247                 return tevent_req_post(req, ev);
1248         }
1249         tevent_req_set_callback(subreq, collect_highseqnum_db_seqnum_done,
1250                                 req);
1251
1252         return req;
1253 }
1254
1255 static void collect_highseqnum_db_seqnum_done(struct tevent_req *subreq)
1256 {
1257         struct tevent_req *req = tevent_req_callback_data(
1258                 subreq, struct tevent_req);
1259         struct collect_highseqnum_db_state *state = tevent_req_data(
1260                 req, struct collect_highseqnum_db_state);
1261         struct ctdb_reply_control **reply;
1262         int *err_list;
1263         bool status;
1264         unsigned int i;
1265         int ret;
1266         uint64_t seqnum, max_seqnum;
1267
1268         status = ctdb_client_control_multi_recv(subreq, &ret, state,
1269                                                 &err_list, &reply);
1270         TALLOC_FREE(subreq);
1271         if (! status) {
1272                 int ret2;
1273                 uint32_t pnn;
1274
1275                 ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
1276                                                        state->nlist->count,
1277                                                        err_list,
1278                                                        &pnn);
1279                 if (ret2 != 0) {
1280                         D_ERR("control GET_DB_SEQNUM failed for db %s"
1281                               " on node %u, ret=%d\n",
1282                               recdb_name(state->recdb), pnn, ret2);
1283                 } else {
1284                         D_ERR("control GET_DB_SEQNUM failed for db %s,"
1285                               " ret=%d\n",
1286                               recdb_name(state->recdb), ret);
1287                 }
1288                 tevent_req_error(req, ret);
1289                 return;
1290         }
1291
1292         max_seqnum = 0;
1293         state->max_pnn = state->nlist->pnn_list[0];
1294         for (i=0; i<state->nlist->count; i++) {
1295                 ret = ctdb_reply_control_get_db_seqnum(reply[i], &seqnum);
1296                 if (ret != 0) {
1297                         tevent_req_error(req, EPROTO);
1298                         return;
1299                 }
1300
1301                 if (max_seqnum < seqnum) {
1302                         max_seqnum = seqnum;
1303                         state->max_pnn = state->nlist->pnn_list[i];
1304                 }
1305         }
1306
1307         talloc_free(reply);
1308
1309         D_INFO("Pull persistent db %s from node %d with seqnum 0x%"PRIx64"\n",
1310                recdb_name(state->recdb), state->max_pnn, max_seqnum);
1311
1312         subreq = pull_database_send(state,
1313                                     state->ev,
1314                                     state->client,
1315                                     state->max_pnn,
1316                                     state->recdb);
1317         if (tevent_req_nomem(subreq, req)) {
1318                 return;
1319         }
1320         tevent_req_set_callback(subreq, collect_highseqnum_db_pulldb_done,
1321                                 req);
1322 }
1323
1324 static void collect_highseqnum_db_pulldb_done(struct tevent_req *subreq)
1325 {
1326         struct tevent_req *req = tevent_req_callback_data(
1327                 subreq, struct tevent_req);
1328         struct collect_highseqnum_db_state *state = tevent_req_data(
1329                 req, struct collect_highseqnum_db_state);
1330         int ret;
1331         bool status;
1332
1333         status = pull_database_recv(subreq, &ret);
1334         TALLOC_FREE(subreq);
1335         if (! status) {
1336                 node_list_ban_credits(state->nlist, state->max_pnn);
1337                 tevent_req_error(req, ret);
1338                 return;
1339         }
1340
1341         tevent_req_done(req);
1342 }
1343
1344 static bool collect_highseqnum_db_recv(struct tevent_req *req, int *perr)
1345 {
1346         return generic_recv(req, perr);
1347 }
1348
1349 /*
1350  * Collect all databases
1351  */
1352
1353 struct collect_all_db_state {
1354         struct tevent_context *ev;
1355         struct ctdb_client_context *client;
1356         struct node_list *nlist;
1357         uint32_t db_id;
1358         struct recdb_context *recdb;
1359
1360         struct ctdb_pulldb pulldb;
1361         unsigned int index;
1362 };
1363
1364 static void collect_all_db_pulldb_done(struct tevent_req *subreq);
1365
1366 static struct tevent_req *collect_all_db_send(
1367                         TALLOC_CTX *mem_ctx,
1368                         struct tevent_context *ev,
1369                         struct ctdb_client_context *client,
1370                         struct node_list *nlist,
1371                         uint32_t db_id,
1372                         struct recdb_context *recdb)
1373 {
1374         struct tevent_req *req, *subreq;
1375         struct collect_all_db_state *state;
1376
1377         req = tevent_req_create(mem_ctx, &state,
1378                                 struct collect_all_db_state);
1379         if (req == NULL) {
1380                 return NULL;
1381         }
1382
1383         state->ev = ev;
1384         state->client = client;
1385         state->nlist = nlist;
1386         state->db_id = db_id;
1387         state->recdb = recdb;
1388         state->index = 0;
1389
1390         subreq = pull_database_send(state,
1391                                     ev,
1392                                     client,
1393                                     nlist->pnn_list[state->index],
1394                                     recdb);
1395         if (tevent_req_nomem(subreq, req)) {
1396                 return tevent_req_post(req, ev);
1397         }
1398         tevent_req_set_callback(subreq, collect_all_db_pulldb_done, req);
1399
1400         return req;
1401 }
1402
1403 static void collect_all_db_pulldb_done(struct tevent_req *subreq)
1404 {
1405         struct tevent_req *req = tevent_req_callback_data(
1406                 subreq, struct tevent_req);
1407         struct collect_all_db_state *state = tevent_req_data(
1408                 req, struct collect_all_db_state);
1409         int ret;
1410         bool status;
1411
1412         status = pull_database_recv(subreq, &ret);
1413         TALLOC_FREE(subreq);
1414         if (! status) {
1415                 node_list_ban_credits(state->nlist,
1416                                       state->nlist->pnn_list[state->index]);
1417                 tevent_req_error(req, ret);
1418                 return;
1419         }
1420
1421         state->index += 1;
1422         if (state->index == state->nlist->count) {
1423                 tevent_req_done(req);
1424                 return;
1425         }
1426
1427         subreq = pull_database_send(state,
1428                                     state->ev,
1429                                     state->client,
1430                                     state->nlist->pnn_list[state->index],
1431                                     state->recdb);
1432         if (tevent_req_nomem(subreq, req)) {
1433                 return;
1434         }
1435         tevent_req_set_callback(subreq, collect_all_db_pulldb_done, req);
1436 }
1437
1438 static bool collect_all_db_recv(struct tevent_req *req, int *perr)
1439 {
1440         return generic_recv(req, perr);
1441 }
1442
1443
1444 /**
1445  * For each database do the following:
1446  *  - Get DB name from all nodes
1447  *  - Attach database on missing nodes
1448  *  - Get DB path
1449  *  - Freeze database on all nodes
1450  *  - Start transaction on all nodes
1451  *  - Collect database from all nodes
1452  *  - Wipe database on all nodes
1453  *  - Push database to all nodes
1454  *  - Commit transaction on all nodes
1455  *  - Thaw database on all nodes
1456  */
1457
1458 struct recover_db_state {
1459         struct tevent_context *ev;
1460         struct ctdb_client_context *client;
1461         struct ctdb_tunable_list *tun_list;
1462         struct node_list *nlist;
1463         struct db *db;
1464
1465         uint32_t destnode;
1466         struct ctdb_transdb transdb;
1467
1468         const char *db_name, *db_path;
1469         struct recdb_context *recdb;
1470 };
1471
1472 static void recover_db_name_done(struct tevent_req *subreq);
1473 static void recover_db_create_missing_done(struct tevent_req *subreq);
1474 static void recover_db_path_done(struct tevent_req *subreq);
1475 static void recover_db_freeze_done(struct tevent_req *subreq);
1476 static void recover_db_transaction_started(struct tevent_req *subreq);
1477 static void recover_db_collect_done(struct tevent_req *subreq);
1478 static void recover_db_wipedb_done(struct tevent_req *subreq);
1479 static void recover_db_pushdb_done(struct tevent_req *subreq);
1480 static void recover_db_transaction_committed(struct tevent_req *subreq);
1481 static void recover_db_thaw_done(struct tevent_req *subreq);
1482
1483 static struct tevent_req *recover_db_send(TALLOC_CTX *mem_ctx,
1484                                           struct tevent_context *ev,
1485                                           struct ctdb_client_context *client,
1486                                           struct ctdb_tunable_list *tun_list,
1487                                           struct node_list *nlist,
1488                                           uint32_t generation,
1489                                           struct db *db)
1490 {
1491         struct tevent_req *req, *subreq;
1492         struct recover_db_state *state;
1493         struct ctdb_req_control request;
1494
1495         req = tevent_req_create(mem_ctx, &state, struct recover_db_state);
1496         if (req == NULL) {
1497                 return NULL;
1498         }
1499
1500         state->ev = ev;
1501         state->client = client;
1502         state->tun_list = tun_list;
1503         state->nlist = nlist;
1504         state->db = db;
1505
1506         state->destnode = ctdb_client_pnn(client);
1507         state->transdb.db_id = db->db_id;
1508         state->transdb.tid = generation;
1509
1510         ctdb_req_control_get_dbname(&request, db->db_id);
1511         subreq = ctdb_client_control_multi_send(state,
1512                                                 ev,
1513                                                 client,
1514                                                 state->db->pnn_list,
1515                                                 state->db->num_nodes,
1516                                                 TIMEOUT(),
1517                                                 &request);
1518         if (tevent_req_nomem(subreq, req)) {
1519                 return tevent_req_post(req, ev);
1520         }
1521         tevent_req_set_callback(subreq, recover_db_name_done, req);
1522
1523         return req;
1524 }
1525
1526 static void recover_db_name_done(struct tevent_req *subreq)
1527 {
1528         struct tevent_req *req = tevent_req_callback_data(
1529                 subreq, struct tevent_req);
1530         struct recover_db_state *state = tevent_req_data(
1531                 req, struct recover_db_state);
1532         struct ctdb_reply_control **reply;
1533         int *err_list;
1534         unsigned int i;
1535         int ret;
1536         bool status;
1537
1538         status = ctdb_client_control_multi_recv(subreq,
1539                                                 &ret,
1540                                                 state,
1541                                                 &err_list,
1542                                                 &reply);
1543         TALLOC_FREE(subreq);
1544         if (! status) {
1545                 int ret2;
1546                 uint32_t pnn;
1547
1548                 ret2 = ctdb_client_control_multi_error(state->db->pnn_list,
1549                                                        state->db->num_nodes,
1550                                                        err_list,
1551                                                        &pnn);
1552                 if (ret2 != 0) {
1553                         D_ERR("control GET_DBNAME failed on node %u,"
1554                               " ret=%d\n",
1555                               pnn,
1556                               ret2);
1557                 } else {
1558                         D_ERR("control GET_DBNAME failed, ret=%d\n",
1559                               ret);
1560                 }
1561                 tevent_req_error(req, ret);
1562                 return;
1563         }
1564
1565         for (i = 0; i < state->db->num_nodes; i++) {
1566                 const char *db_name;
1567                 uint32_t pnn;
1568
1569                 pnn = state->nlist->pnn_list[i];
1570
1571                 ret = ctdb_reply_control_get_dbname(reply[i],
1572                                                     state,
1573                                                     &db_name);
1574                 if (ret != 0) {
1575                         D_ERR("control GET_DBNAME failed on node %u "
1576                               "for db=0x%x, ret=%d\n",
1577                               pnn,
1578                               state->db->db_id,
1579                               ret);
1580                         tevent_req_error(req, EPROTO);
1581                         return;
1582                 }
1583
1584                 if (state->db_name == NULL) {
1585                         state->db_name = db_name;
1586                         continue;
1587                 }
1588
1589                 if (strcmp(state->db_name, db_name) != 0) {
1590                         D_ERR("Incompatible database name for 0x%"PRIx32" "
1591                               "(%s != %s) on node %"PRIu32"\n",
1592                               state->db->db_id,
1593                               db_name,
1594                               state->db_name,
1595                               pnn);
1596                         node_list_ban_credits(state->nlist, pnn);
1597                         tevent_req_error(req, ret);
1598                         return;
1599                 }
1600         }
1601
1602         talloc_free(reply);
1603
1604         subreq = db_create_missing_send(state,
1605                                         state->ev,
1606                                         state->client,
1607                                         state->nlist,
1608                                         state->db_name,
1609                                         state->db);
1610
1611         if (tevent_req_nomem(subreq, req)) {
1612                 return;
1613         }
1614         tevent_req_set_callback(subreq, recover_db_create_missing_done, req);
1615 }
1616
1617 static void recover_db_create_missing_done(struct tevent_req *subreq)
1618 {
1619         struct tevent_req *req = tevent_req_callback_data(
1620                 subreq, struct tevent_req);
1621         struct recover_db_state *state = tevent_req_data(
1622                 req, struct recover_db_state);
1623         struct ctdb_req_control request;
1624         int ret;
1625         bool status;
1626
1627         /* Could sanity check the db_id here */
1628         status = db_create_missing_recv(subreq, &ret);
1629         TALLOC_FREE(subreq);
1630         if (! status) {
1631                 tevent_req_error(req, ret);
1632                 return;
1633         }
1634
1635         ctdb_req_control_getdbpath(&request, state->db->db_id);
1636         subreq = ctdb_client_control_send(state, state->ev, state->client,
1637                                           state->destnode, TIMEOUT(),
1638                                           &request);
1639         if (tevent_req_nomem(subreq, req)) {
1640                 return;
1641         }
1642         tevent_req_set_callback(subreq, recover_db_path_done, req);
1643 }
1644
1645 static void recover_db_path_done(struct tevent_req *subreq)
1646 {
1647         struct tevent_req *req = tevent_req_callback_data(
1648                 subreq, struct tevent_req);
1649         struct recover_db_state *state = tevent_req_data(
1650                 req, struct recover_db_state);
1651         struct ctdb_reply_control *reply;
1652         struct ctdb_req_control request;
1653         int ret;
1654         bool status;
1655
1656         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
1657         TALLOC_FREE(subreq);
1658         if (! status) {
1659                 D_ERR("control GETDBPATH failed for db %s, ret=%d\n",
1660                       state->db_name, ret);
1661                 tevent_req_error(req, ret);
1662                 return;
1663         }
1664
1665         ret = ctdb_reply_control_getdbpath(reply, state, &state->db_path);
1666         if (ret != 0) {
1667                 D_ERR("control GETDBPATH failed for db %s, ret=%d\n",
1668                       state->db_name, ret);
1669                 tevent_req_error(req, EPROTO);
1670                 return;
1671         }
1672
1673         talloc_free(reply);
1674
1675         ctdb_req_control_db_freeze(&request, state->db->db_id);
1676         subreq = ctdb_client_control_multi_send(state,
1677                                                 state->ev,
1678                                                 state->client,
1679                                                 state->nlist->pnn_list,
1680                                                 state->nlist->count,
1681                                                 TIMEOUT(),
1682                                                 &request);
1683         if (tevent_req_nomem(subreq, req)) {
1684                 return;
1685         }
1686         tevent_req_set_callback(subreq, recover_db_freeze_done, req);
1687 }
1688
1689 static void recover_db_freeze_done(struct tevent_req *subreq)
1690 {
1691         struct tevent_req *req = tevent_req_callback_data(
1692                 subreq, struct tevent_req);
1693         struct recover_db_state *state = tevent_req_data(
1694                 req, struct recover_db_state);
1695         struct ctdb_req_control request;
1696         int *err_list;
1697         int ret;
1698         bool status;
1699
1700         status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
1701                                                 NULL);
1702         TALLOC_FREE(subreq);
1703         if (! status) {
1704                 int ret2;
1705                 uint32_t pnn;
1706
1707                 ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
1708                                                        state->nlist->count,
1709                                                        err_list,
1710                                                        &pnn);
1711                 if (ret2 != 0) {
1712                         D_ERR("control FREEZE_DB failed for db %s"
1713                               " on node %u, ret=%d\n",
1714                               state->db_name, pnn, ret2);
1715
1716                         node_list_ban_credits(state->nlist, pnn);
1717                 } else {
1718                         D_ERR("control FREEZE_DB failed for db %s, ret=%d\n",
1719                               state->db_name, ret);
1720                 }
1721                 tevent_req_error(req, ret);
1722                 return;
1723         }
1724
1725         ctdb_req_control_db_transaction_start(&request, &state->transdb);
1726         subreq = ctdb_client_control_multi_send(state,
1727                                                 state->ev,
1728                                                 state->client,
1729                                                 state->nlist->pnn_list,
1730                                                 state->nlist->count,
1731                                                 TIMEOUT(),
1732                                                 &request);
1733         if (tevent_req_nomem(subreq, req)) {
1734                 return;
1735         }
1736         tevent_req_set_callback(subreq, recover_db_transaction_started, req);
1737 }
1738
1739 static void recover_db_transaction_started(struct tevent_req *subreq)
1740 {
1741         struct tevent_req *req = tevent_req_callback_data(
1742                 subreq, struct tevent_req);
1743         struct recover_db_state *state = tevent_req_data(
1744                 req, struct recover_db_state);
1745         int *err_list;
1746         uint32_t flags;
1747         int ret;
1748         bool status;
1749
1750         status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
1751                                                 NULL);
1752         TALLOC_FREE(subreq);
1753         if (! status) {
1754                 int ret2;
1755                 uint32_t pnn;
1756
1757                 ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
1758                                                        state->nlist->count,
1759                                                        err_list,
1760                                                        &pnn);
1761                 if (ret2 != 0) {
1762                         D_ERR("control TRANSACTION_DB failed for db=%s"
1763                               " on node %u, ret=%d\n",
1764                               state->db_name, pnn, ret2);
1765                 } else {
1766                         D_ERR("control TRANSACTION_DB failed for db=%s,"
1767                               " ret=%d\n", state->db_name, ret);
1768                 }
1769                 tevent_req_error(req, ret);
1770                 return;
1771         }
1772
1773         flags = state->db->db_flags;
1774         state->recdb = recdb_create(state,
1775                                     state->db->db_id,
1776                                     state->db_name,
1777                                     state->db_path,
1778                                     state->tun_list->database_hash_size,
1779                                     flags & CTDB_DB_FLAGS_PERSISTENT);
1780         if (tevent_req_nomem(state->recdb, req)) {
1781                 return;
1782         }
1783
1784         if ((flags & CTDB_DB_FLAGS_PERSISTENT) ||
1785             (flags & CTDB_DB_FLAGS_REPLICATED)) {
1786                 subreq = collect_highseqnum_db_send(state,
1787                                                     state->ev,
1788                                                     state->client,
1789                                                     state->nlist,
1790                                                     state->db->db_id,
1791                                                     state->recdb);
1792         } else {
1793                 subreq = collect_all_db_send(state,
1794                                              state->ev,
1795                                              state->client,
1796                                              state->nlist,
1797                                              state->db->db_id,
1798                                              state->recdb);
1799         }
1800         if (tevent_req_nomem(subreq, req)) {
1801                 return;
1802         }
1803         tevent_req_set_callback(subreq, recover_db_collect_done, req);
1804 }
1805
1806 static void recover_db_collect_done(struct tevent_req *subreq)
1807 {
1808         struct tevent_req *req = tevent_req_callback_data(
1809                 subreq, struct tevent_req);
1810         struct recover_db_state *state = tevent_req_data(
1811                 req, struct recover_db_state);
1812         struct ctdb_req_control request;
1813         int ret;
1814         bool status;
1815
1816         if ((state->db->db_flags & CTDB_DB_FLAGS_PERSISTENT) ||
1817             (state->db->db_flags & CTDB_DB_FLAGS_REPLICATED)) {
1818                 status = collect_highseqnum_db_recv(subreq, &ret);
1819         } else {
1820                 status = collect_all_db_recv(subreq, &ret);
1821         }
1822         TALLOC_FREE(subreq);
1823         if (! status) {
1824                 tevent_req_error(req, ret);
1825                 return;
1826         }
1827
1828         ctdb_req_control_wipe_database(&request, &state->transdb);
1829         subreq = ctdb_client_control_multi_send(state,
1830                                                 state->ev,
1831                                                 state->client,
1832                                                 state->nlist->pnn_list,
1833                                                 state->nlist->count,
1834                                                 TIMEOUT(),
1835                                                 &request);
1836         if (tevent_req_nomem(subreq, req)) {
1837                 return;
1838         }
1839         tevent_req_set_callback(subreq, recover_db_wipedb_done, req);
1840 }
1841
1842 static void recover_db_wipedb_done(struct tevent_req *subreq)
1843 {
1844         struct tevent_req *req = tevent_req_callback_data(
1845                 subreq, struct tevent_req);
1846         struct recover_db_state *state = tevent_req_data(
1847                 req, struct recover_db_state);
1848         int *err_list;
1849         int ret;
1850         bool status;
1851
1852         status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
1853                                                 NULL);
1854         TALLOC_FREE(subreq);
1855         if (! status) {
1856                 int ret2;
1857                 uint32_t pnn;
1858
1859                 ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
1860                                                        state->nlist->count,
1861                                                        err_list,
1862                                                        &pnn);
1863                 if (ret2 != 0) {
1864                         D_ERR("control WIPEDB failed for db %s on node %u,"
1865                               " ret=%d\n", state->db_name, pnn, ret2);
1866                 } else {
1867                         D_ERR("control WIPEDB failed for db %s, ret=%d\n",
1868                               state->db_name, ret);
1869                 }
1870                 tevent_req_error(req, ret);
1871                 return;
1872         }
1873
1874         subreq = push_database_send(state,
1875                                     state->ev,
1876                                     state->client,
1877                                     state->nlist->pnn_list,
1878                                     state->nlist->count,
1879                                     state->recdb,
1880                                     state->tun_list->rec_buffer_size_limit);
1881         if (tevent_req_nomem(subreq, req)) {
1882                 return;
1883         }
1884         tevent_req_set_callback(subreq, recover_db_pushdb_done, req);
1885 }
1886
1887 static void recover_db_pushdb_done(struct tevent_req *subreq)
1888 {
1889         struct tevent_req *req = tevent_req_callback_data(
1890                 subreq, struct tevent_req);
1891         struct recover_db_state *state = tevent_req_data(
1892                 req, struct recover_db_state);
1893         struct ctdb_req_control request;
1894         int ret;
1895         bool status;
1896
1897         status = push_database_recv(subreq, &ret);
1898         TALLOC_FREE(subreq);
1899         if (! status) {
1900                 tevent_req_error(req, ret);
1901                 return;
1902         }
1903
1904         TALLOC_FREE(state->recdb);
1905
1906         ctdb_req_control_db_transaction_commit(&request, &state->transdb);
1907         subreq = ctdb_client_control_multi_send(state,
1908                                                 state->ev,
1909                                                 state->client,
1910                                                 state->nlist->pnn_list,
1911                                                 state->nlist->count,
1912                                                 TIMEOUT(),
1913                                                 &request);
1914         if (tevent_req_nomem(subreq, req)) {
1915                 return;
1916         }
1917         tevent_req_set_callback(subreq, recover_db_transaction_committed, req);
1918 }
1919
1920 static void recover_db_transaction_committed(struct tevent_req *subreq)
1921 {
1922         struct tevent_req *req = tevent_req_callback_data(
1923                 subreq, struct tevent_req);
1924         struct recover_db_state *state = tevent_req_data(
1925                 req, struct recover_db_state);
1926         struct ctdb_req_control request;
1927         int *err_list;
1928         int ret;
1929         bool status;
1930
1931         status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
1932                                                 NULL);
1933         TALLOC_FREE(subreq);
1934         if (! status) {
1935                 int ret2;
1936                 uint32_t pnn;
1937
1938                 ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
1939                                                        state->nlist->count,
1940                                                        err_list,
1941                                                        &pnn);
1942                 if (ret2 != 0) {
1943                         D_ERR("control DB_TRANSACTION_COMMIT failed for db %s"
1944                               " on node %u, ret=%d\n",
1945                               state->db_name, pnn, ret2);
1946                 } else {
1947                         D_ERR("control DB_TRANSACTION_COMMIT failed for db %s,"
1948                               " ret=%d\n", state->db_name, ret);
1949                 }
1950                 tevent_req_error(req, ret);
1951                 return;
1952         }
1953
1954         ctdb_req_control_db_thaw(&request, state->db->db_id);
1955         subreq = ctdb_client_control_multi_send(state,
1956                                                 state->ev,
1957                                                 state->client,
1958                                                 state->nlist->pnn_list,
1959                                                 state->nlist->count,
1960                                                 TIMEOUT(),
1961                                                 &request);
1962         if (tevent_req_nomem(subreq, req)) {
1963                 return;
1964         }
1965         tevent_req_set_callback(subreq, recover_db_thaw_done, req);
1966 }
1967
1968 static void recover_db_thaw_done(struct tevent_req *subreq)
1969 {
1970         struct tevent_req *req = tevent_req_callback_data(
1971                 subreq, struct tevent_req);
1972         struct recover_db_state *state = tevent_req_data(
1973                 req, struct recover_db_state);
1974         int *err_list;
1975         int ret;
1976         bool status;
1977
1978         status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
1979                                                 NULL);
1980         TALLOC_FREE(subreq);
1981         if (! status) {
1982                 int ret2;
1983                 uint32_t pnn;
1984
1985                 ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
1986                                                        state->nlist->count,
1987                                                        err_list,
1988                                                        &pnn);
1989                 if (ret2 != 0) {
1990                         D_ERR("control DB_THAW failed for db %s on node %u,"
1991                               " ret=%d\n", state->db_name, pnn, ret2);
1992                 } else {
1993                         D_ERR("control DB_THAW failed for db %s, ret=%d\n",
1994                               state->db_name, ret);
1995                 }
1996                 tevent_req_error(req, ret);
1997                 return;
1998         }
1999
2000         tevent_req_done(req);
2001 }
2002
2003 static bool recover_db_recv(struct tevent_req *req)
2004 {
2005         return generic_recv(req, NULL);
2006 }
2007
2008
2009 /*
2010  * Start database recovery for each database
2011  *
2012  * Try to recover each database 5 times before failing recovery.
2013  */
2014
2015 struct db_recovery_state {
2016         struct tevent_context *ev;
2017         struct db_list *dblist;
2018         unsigned int num_replies;
2019         unsigned int num_failed;
2020 };
2021
2022 struct db_recovery_one_state {
2023         struct tevent_req *req;
2024         struct ctdb_client_context *client;
2025         struct db_list *dblist;
2026         struct ctdb_tunable_list *tun_list;
2027         struct node_list *nlist;
2028         uint32_t generation;
2029         struct db *db;
2030         int num_fails;
2031 };
2032
2033 static void db_recovery_one_done(struct tevent_req *subreq);
2034
2035 static struct tevent_req *db_recovery_send(TALLOC_CTX *mem_ctx,
2036                                            struct tevent_context *ev,
2037                                            struct ctdb_client_context *client,
2038                                            struct db_list *dblist,
2039                                            struct ctdb_tunable_list *tun_list,
2040                                            struct node_list *nlist,
2041                                            uint32_t generation)
2042 {
2043         struct tevent_req *req, *subreq;
2044         struct db_recovery_state *state;
2045         struct db *db;
2046
2047         req = tevent_req_create(mem_ctx, &state, struct db_recovery_state);
2048         if (req == NULL) {
2049                 return NULL;
2050         }
2051
2052         state->ev = ev;
2053         state->dblist = dblist;
2054         state->num_replies = 0;
2055         state->num_failed = 0;
2056
2057         if (dblist->num_dbs == 0) {
2058                 tevent_req_done(req);
2059                 return tevent_req_post(req, ev);
2060         }
2061
2062         for (db = dblist->db; db != NULL; db = db->next) {
2063                 struct db_recovery_one_state *substate;
2064
2065                 substate = talloc_zero(state, struct db_recovery_one_state);
2066                 if (tevent_req_nomem(substate, req)) {
2067                         return tevent_req_post(req, ev);
2068                 }
2069
2070                 substate->req = req;
2071                 substate->client = client;
2072                 substate->dblist = dblist;
2073                 substate->tun_list = tun_list;
2074                 substate->nlist = nlist;
2075                 substate->generation = generation;
2076                 substate->db = db;
2077
2078                 subreq = recover_db_send(state,
2079                                          ev,
2080                                          client,
2081                                          tun_list,
2082                                          nlist,
2083                                          generation,
2084                                          substate->db);
2085                 if (tevent_req_nomem(subreq, req)) {
2086                         return tevent_req_post(req, ev);
2087                 }
2088                 tevent_req_set_callback(subreq, db_recovery_one_done,
2089                                         substate);
2090                 D_NOTICE("recover database 0x%08x\n", substate->db->db_id);
2091         }
2092
2093         return req;
2094 }
2095
2096 static void db_recovery_one_done(struct tevent_req *subreq)
2097 {
2098         struct db_recovery_one_state *substate = tevent_req_callback_data(
2099                 subreq, struct db_recovery_one_state);
2100         struct tevent_req *req = substate->req;
2101         struct db_recovery_state *state = tevent_req_data(
2102                 req, struct db_recovery_state);
2103         bool status;
2104
2105         status = recover_db_recv(subreq);
2106         TALLOC_FREE(subreq);
2107
2108         if (status) {
2109                 talloc_free(substate);
2110                 goto done;
2111         }
2112
2113         substate->num_fails += 1;
2114         if (substate->num_fails < NUM_RETRIES) {
2115                 subreq = recover_db_send(state,
2116                                          state->ev,
2117                                          substate->client,
2118                                          substate->tun_list,
2119                                          substate->nlist,
2120                                          substate->generation,
2121                                          substate->db);
2122                 if (tevent_req_nomem(subreq, req)) {
2123                         goto failed;
2124                 }
2125                 tevent_req_set_callback(subreq, db_recovery_one_done, substate);
2126                 D_NOTICE("recover database 0x%08x, attempt %d\n",
2127                          substate->db->db_id, substate->num_fails+1);
2128                 return;
2129         }
2130
2131 failed:
2132         state->num_failed += 1;
2133
2134 done:
2135         state->num_replies += 1;
2136
2137         if (state->num_replies == state->dblist->num_dbs) {
2138                 tevent_req_done(req);
2139         }
2140 }
2141
2142 static bool db_recovery_recv(struct tevent_req *req, unsigned int *count)
2143 {
2144         struct db_recovery_state *state = tevent_req_data(
2145                 req, struct db_recovery_state);
2146         int err;
2147
2148         if (tevent_req_is_unix_error(req, &err)) {
2149                 *count = 0;
2150                 return false;
2151         }
2152
2153         *count = state->num_replies - state->num_failed;
2154
2155         if (state->num_failed > 0) {
2156                 return false;
2157         }
2158
2159         return true;
2160 }
2161
2162 struct ban_node_state {
2163         struct tevent_context *ev;
2164         struct ctdb_client_context *client;
2165         struct ctdb_tunable_list *tun_list;
2166         struct node_list *nlist;
2167         uint32_t destnode;
2168
2169         uint32_t max_pnn;
2170 };
2171
2172 static bool ban_node_check(struct tevent_req *req);
2173 static void ban_node_check_done(struct tevent_req *subreq);
2174 static void ban_node_done(struct tevent_req *subreq);
2175
2176 static struct tevent_req *ban_node_send(TALLOC_CTX *mem_ctx,
2177                                         struct tevent_context *ev,
2178                                         struct ctdb_client_context *client,
2179                                         struct ctdb_tunable_list *tun_list,
2180                                         struct node_list *nlist)
2181 {
2182         struct tevent_req *req;
2183         struct ban_node_state *state;
2184         bool ok;
2185
2186         req = tevent_req_create(mem_ctx, &state, struct ban_node_state);
2187         if (req == NULL) {
2188                 return NULL;
2189         }
2190
2191         state->ev = ev;
2192         state->client = client;
2193         state->tun_list = tun_list;
2194         state->nlist = nlist;
2195         state->destnode = ctdb_client_pnn(client);
2196
2197         /* Bans are not enabled */
2198         if (state->tun_list->enable_bans == 0) {
2199                 D_ERR("Bans are not enabled\n");
2200                 tevent_req_done(req);
2201                 return tevent_req_post(req, ev);
2202         }
2203
2204         ok = ban_node_check(req);
2205         if (!ok) {
2206                 return tevent_req_post(req, ev);
2207         }
2208
2209         return req;
2210 }
2211
2212 static bool ban_node_check(struct tevent_req *req)
2213 {
2214         struct tevent_req *subreq;
2215         struct ban_node_state *state = tevent_req_data(
2216                 req, struct ban_node_state);
2217         struct ctdb_req_control request;
2218         unsigned max_credits = 0, i;
2219
2220         for (i=0; i<state->nlist->count; i++) {
2221                 if (state->nlist->ban_credits[i] > max_credits) {
2222                         state->max_pnn = state->nlist->pnn_list[i];
2223                         max_credits = state->nlist->ban_credits[i];
2224                 }
2225         }
2226
2227         if (max_credits < NUM_RETRIES) {
2228                 tevent_req_done(req);
2229                 return false;
2230         }
2231
2232         ctdb_req_control_get_nodemap(&request);
2233         subreq = ctdb_client_control_send(state,
2234                                           state->ev,
2235                                           state->client,
2236                                           state->max_pnn,
2237                                           TIMEOUT(),
2238                                           &request);
2239         if (tevent_req_nomem(subreq, req)) {
2240                 return false;
2241         }
2242         tevent_req_set_callback(subreq, ban_node_check_done, req);
2243
2244         return true;
2245 }
2246
2247 static void ban_node_check_done(struct tevent_req *subreq)
2248 {
2249         struct tevent_req *req = tevent_req_callback_data(
2250                 subreq, struct tevent_req);
2251         struct ban_node_state *state = tevent_req_data(
2252                 req, struct ban_node_state);
2253         struct ctdb_reply_control *reply;
2254         struct ctdb_node_map *nodemap;
2255         struct ctdb_req_control request;
2256         struct ctdb_ban_state ban;
2257         unsigned int i;
2258         int ret;
2259         bool ok;
2260
2261         ok = ctdb_client_control_recv(subreq, &ret, state, &reply);
2262         TALLOC_FREE(subreq);
2263         if (!ok) {
2264                 D_ERR("control GET_NODEMAP failed to node %u, ret=%d\n",
2265                       state->max_pnn, ret);
2266                 tevent_req_error(req, ret);
2267                 return;
2268         }
2269
2270         ret = ctdb_reply_control_get_nodemap(reply, state, &nodemap);
2271         if (ret != 0) {
2272                 D_ERR("control GET_NODEMAP failed, ret=%d\n", ret);
2273                 tevent_req_error(req, ret);
2274                 return;
2275         }
2276
2277         for (i=0; i<nodemap->num; i++) {
2278                 if (nodemap->node[i].pnn != state->max_pnn) {
2279                         continue;
2280                 }
2281
2282                 /* If the node became inactive, reset ban_credits */
2283                 if (nodemap->node[i].flags & NODE_FLAGS_INACTIVE) {
2284                         unsigned int j;
2285
2286                         for (j=0; j<state->nlist->count; j++) {
2287                                 if (state->nlist->pnn_list[j] ==
2288                                                 state->max_pnn) {
2289                                         state->nlist->ban_credits[j] = 0;
2290                                         break;
2291                                 }
2292                         }
2293                         state->max_pnn = CTDB_UNKNOWN_PNN;
2294                 }
2295         }
2296
2297         talloc_free(nodemap);
2298         talloc_free(reply);
2299
2300         /* If node becomes inactive during recovery, pick next */
2301         if (state->max_pnn == CTDB_UNKNOWN_PNN) {
2302                 (void) ban_node_check(req);
2303                 return;
2304         }
2305
2306         ban = (struct ctdb_ban_state) {
2307                 .pnn = state->max_pnn,
2308                 .time = state->tun_list->recovery_ban_period,
2309         };
2310
2311         D_ERR("Banning node %u for %u seconds\n", ban.pnn, ban.time);
2312
2313         ctdb_req_control_set_ban_state(&request, &ban);
2314         subreq = ctdb_client_control_send(state,
2315                                           state->ev,
2316                                           state->client,
2317                                           ban.pnn,
2318                                           TIMEOUT(),
2319                                           &request);
2320         if (tevent_req_nomem(subreq, req)) {
2321                 return;
2322         }
2323         tevent_req_set_callback(subreq, ban_node_done, req);
2324 }
2325
2326 static void ban_node_done(struct tevent_req *subreq)
2327 {
2328         struct tevent_req *req = tevent_req_callback_data(
2329                 subreq, struct tevent_req);
2330         struct ban_node_state *state = tevent_req_data(
2331                 req, struct ban_node_state);
2332         struct ctdb_reply_control *reply;
2333         int ret;
2334         bool status;
2335
2336         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
2337         TALLOC_FREE(subreq);
2338         if (! status) {
2339                 tevent_req_error(req, ret);
2340                 return;
2341         }
2342
2343         ret = ctdb_reply_control_set_ban_state(reply);
2344         if (ret != 0) {
2345                 D_ERR("control SET_BAN_STATE failed, ret=%d\n", ret);
2346                 tevent_req_error(req, ret);
2347                 return;
2348         }
2349
2350         talloc_free(reply);
2351         tevent_req_done(req);
2352 }
2353
2354 static bool ban_node_recv(struct tevent_req *req, int *perr)
2355 {
2356         if (tevent_req_is_unix_error(req, perr)) {
2357                 return false;
2358         }
2359
2360         return true;
2361 }
2362
2363 /*
2364  * Run the parallel database recovery
2365  *
2366  * - Get tunables
2367  * - Get nodemap from all nodes
2368  * - Get capabilities from all nodes
2369  * - Get dbmap
2370  * - Set RECOVERY_ACTIVE
2371  * - Send START_RECOVERY
2372  * - Update vnnmap on all nodes
2373  * - Run database recovery
2374  * - Set RECOVERY_NORMAL
2375  * - Send END_RECOVERY
2376  */
2377
2378 struct recovery_state {
2379         struct tevent_context *ev;
2380         struct ctdb_client_context *client;
2381         uint32_t generation;
2382         uint32_t destnode;
2383         struct node_list *nlist;
2384         struct ctdb_tunable_list *tun_list;
2385         struct ctdb_vnn_map *vnnmap;
2386         struct db_list *dblist;
2387 };
2388
2389 static void recovery_tunables_done(struct tevent_req *subreq);
2390 static void recovery_nodemap_done(struct tevent_req *subreq);
2391 static void recovery_nodemap_verify(struct tevent_req *subreq);
2392 static void recovery_capabilities_done(struct tevent_req *subreq);
2393 static void recovery_dbmap_done(struct tevent_req *subreq);
2394 static void recovery_active_done(struct tevent_req *subreq);
2395 static void recovery_start_recovery_done(struct tevent_req *subreq);
2396 static void recovery_vnnmap_update_done(struct tevent_req *subreq);
2397 static void recovery_db_recovery_done(struct tevent_req *subreq);
2398 static void recovery_failed_done(struct tevent_req *subreq);
2399 static void recovery_normal_done(struct tevent_req *subreq);
2400 static void recovery_end_recovery_done(struct tevent_req *subreq);
2401
2402 static struct tevent_req *recovery_send(TALLOC_CTX *mem_ctx,
2403                                         struct tevent_context *ev,
2404                                         struct ctdb_client_context *client,
2405                                         uint32_t generation)
2406 {
2407         struct tevent_req *req, *subreq;
2408         struct recovery_state *state;
2409         struct ctdb_req_control request;
2410
2411         req = tevent_req_create(mem_ctx, &state, struct recovery_state);
2412         if (req == NULL) {
2413                 return NULL;
2414         }
2415
2416         state->ev = ev;
2417         state->client = client;
2418         state->generation = generation;
2419         state->destnode = ctdb_client_pnn(client);
2420
2421         ctdb_req_control_get_all_tunables(&request);
2422         subreq = ctdb_client_control_send(state, state->ev, state->client,
2423                                           state->destnode, TIMEOUT(),
2424                                           &request);
2425         if (tevent_req_nomem(subreq, req)) {
2426                 return tevent_req_post(req, ev);
2427         }
2428         tevent_req_set_callback(subreq, recovery_tunables_done, req);
2429
2430         return req;
2431 }
2432
2433 static void recovery_tunables_done(struct tevent_req *subreq)
2434 {
2435         struct tevent_req *req = tevent_req_callback_data(
2436                 subreq, struct tevent_req);
2437         struct recovery_state *state = tevent_req_data(
2438                 req, struct recovery_state);
2439         struct ctdb_reply_control *reply;
2440         struct ctdb_req_control request;
2441         int ret;
2442         bool status;
2443
2444         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
2445         TALLOC_FREE(subreq);
2446         if (! status) {
2447                 D_ERR("control GET_ALL_TUNABLES failed, ret=%d\n", ret);
2448                 tevent_req_error(req, ret);
2449                 return;
2450         }
2451
2452         ret = ctdb_reply_control_get_all_tunables(reply, state,
2453                                                   &state->tun_list);
2454         if (ret != 0) {
2455                 D_ERR("control GET_ALL_TUNABLES failed, ret=%d\n", ret);
2456                 tevent_req_error(req, EPROTO);
2457                 return;
2458         }
2459
2460         talloc_free(reply);
2461
2462         recover_timeout = state->tun_list->recover_timeout;
2463
2464         ctdb_req_control_get_nodemap(&request);
2465         subreq = ctdb_client_control_send(state, state->ev, state->client,
2466                                           state->destnode, TIMEOUT(),
2467                                           &request);
2468         if (tevent_req_nomem(subreq, req)) {
2469                 return;
2470         }
2471         tevent_req_set_callback(subreq, recovery_nodemap_done, req);
2472 }
2473
2474 static void recovery_nodemap_done(struct tevent_req *subreq)
2475 {
2476         struct tevent_req *req = tevent_req_callback_data(
2477                 subreq, struct tevent_req);
2478         struct recovery_state *state = tevent_req_data(
2479                 req, struct recovery_state);
2480         struct ctdb_reply_control *reply;
2481         struct ctdb_req_control request;
2482         struct ctdb_node_map *nodemap;
2483         unsigned int i;
2484         bool status;
2485         int ret;
2486
2487         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
2488         TALLOC_FREE(subreq);
2489         if (! status) {
2490                 D_ERR("control GET_NODEMAP failed to node %u, ret=%d\n",
2491                       state->destnode, ret);
2492                 tevent_req_error(req, ret);
2493                 return;
2494         }
2495
2496         ret = ctdb_reply_control_get_nodemap(reply, state, &nodemap);
2497         if (ret != 0) {
2498                 D_ERR("control GET_NODEMAP failed, ret=%d\n", ret);
2499                 tevent_req_error(req, ret);
2500                 return;
2501         }
2502
2503         state->nlist = node_list_init(state, nodemap->num);
2504         if (tevent_req_nomem(state->nlist, req)) {
2505                 return;
2506         }
2507
2508         for (i=0; i<nodemap->num; i++) {
2509                 bool ok;
2510
2511                 if (nodemap->node[i].flags & NODE_FLAGS_DISCONNECTED) {
2512                         continue;
2513                 }
2514
2515                 ok = node_list_add(state->nlist, nodemap->node[i].pnn);
2516                 if (!ok) {
2517                         tevent_req_error(req, EINVAL);
2518                         return;
2519                 }
2520         }
2521
2522         talloc_free(nodemap);
2523         talloc_free(reply);
2524
2525         /* Verify flags by getting local node information from each node */
2526         ctdb_req_control_get_nodemap(&request);
2527         subreq = ctdb_client_control_multi_send(state,
2528                                                 state->ev,
2529                                                 state->client,
2530                                                 state->nlist->pnn_list,
2531                                                 state->nlist->count,
2532                                                 TIMEOUT(),
2533                                                 &request);
2534         if (tevent_req_nomem(subreq, req)) {
2535                 return;
2536         }
2537         tevent_req_set_callback(subreq, recovery_nodemap_verify, req);
2538 }
2539
2540 static void recovery_nodemap_verify(struct tevent_req *subreq)
2541 {
2542         struct tevent_req *req = tevent_req_callback_data(
2543                 subreq, struct tevent_req);
2544         struct recovery_state *state = tevent_req_data(
2545                 req, struct recovery_state);
2546         struct ctdb_req_control request;
2547         struct ctdb_reply_control **reply;
2548         struct node_list *nlist;
2549         unsigned int i;
2550         int *err_list;
2551         int ret;
2552         bool status;
2553
2554         status = ctdb_client_control_multi_recv(subreq,
2555                                                 &ret,
2556                                                 state,
2557                                                 &err_list,
2558                                                 &reply);
2559         TALLOC_FREE(subreq);
2560         if (! status) {
2561                 int ret2;
2562                 uint32_t pnn;
2563
2564                 ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
2565                                                        state->nlist->count,
2566                                                        err_list,
2567                                                        &pnn);
2568                 if (ret2 != 0) {
2569                         D_ERR("control GET_NODEMAP failed on node %u,"
2570                               " ret=%d\n", pnn, ret2);
2571                 } else {
2572                         D_ERR("control GET_NODEMAP failed, ret=%d\n", ret);
2573                 }
2574                 tevent_req_error(req, ret);
2575                 return;
2576         }
2577
2578         nlist = node_list_init(state, state->nlist->size);
2579         if (tevent_req_nomem(nlist, req)) {
2580                 return;
2581         }
2582
2583         for (i=0; i<state->nlist->count; i++) {
2584                 struct ctdb_node_map *nodemap = NULL;
2585                 uint32_t pnn, flags;
2586                 unsigned int j;
2587                 bool ok;
2588
2589                 pnn = state->nlist->pnn_list[i];
2590                 ret = ctdb_reply_control_get_nodemap(reply[i],
2591                                                      state,
2592                                                      &nodemap);
2593                 if (ret != 0) {
2594                         D_ERR("control GET_NODEMAP failed on node %u\n", pnn);
2595                         tevent_req_error(req, EPROTO);
2596                         return;
2597                 }
2598
2599                 flags = NODE_FLAGS_DISCONNECTED;
2600                 for (j=0; j<nodemap->num; j++) {
2601                         if (nodemap->node[j].pnn == pnn) {
2602                                 flags = nodemap->node[j].flags;
2603                                 break;
2604                         }
2605                 }
2606
2607                 TALLOC_FREE(nodemap);
2608
2609                 if (flags & NODE_FLAGS_INACTIVE) {
2610                         continue;
2611                 }
2612
2613                 ok = node_list_add(nlist, pnn);
2614                 if (!ok) {
2615                         tevent_req_error(req, EINVAL);
2616                         return;
2617                 }
2618         }
2619
2620         talloc_free(reply);
2621
2622         talloc_free(state->nlist);
2623         state->nlist = nlist;
2624
2625         ctdb_req_control_get_capabilities(&request);
2626         subreq = ctdb_client_control_multi_send(state,
2627                                                 state->ev,
2628                                                 state->client,
2629                                                 state->nlist->pnn_list,
2630                                                 state->nlist->count,
2631                                                 TIMEOUT(),
2632                                                 &request);
2633         if (tevent_req_nomem(subreq, req)) {
2634                 return;
2635         }
2636         tevent_req_set_callback(subreq, recovery_capabilities_done, req);
2637 }
2638
2639 static void recovery_capabilities_done(struct tevent_req *subreq)
2640 {
2641         struct tevent_req *req = tevent_req_callback_data(
2642                 subreq, struct tevent_req);
2643         struct recovery_state *state = tevent_req_data(
2644                 req, struct recovery_state);
2645         struct ctdb_reply_control **reply;
2646         struct ctdb_req_control request;
2647         int *err_list;
2648         unsigned int i;
2649         int ret;
2650         bool status;
2651
2652         status = ctdb_client_control_multi_recv(subreq, &ret, state, &err_list,
2653                                                 &reply);
2654         TALLOC_FREE(subreq);
2655         if (! status) {
2656                 int ret2;
2657                 uint32_t pnn;
2658
2659                 ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
2660                                                        state->nlist->count,
2661                                                        err_list,
2662                                                        &pnn);
2663                 if (ret2 != 0) {
2664                         D_ERR("control GET_CAPABILITIES failed on node %u,"
2665                               " ret=%d\n", pnn, ret2);
2666                 } else {
2667                         D_ERR("control GET_CAPABILITIES failed, ret=%d\n",
2668                               ret);
2669                 }
2670                 tevent_req_error(req, ret);
2671                 return;
2672         }
2673
2674         for (i=0; i<state->nlist->count; i++) {
2675                 uint32_t caps;
2676
2677                 ret = ctdb_reply_control_get_capabilities(reply[i], &caps);
2678                 if (ret != 0) {
2679                         D_ERR("control GET_CAPABILITIES failed on node %u\n",
2680                               state->nlist->pnn_list[i]);
2681                         tevent_req_error(req, EPROTO);
2682                         return;
2683                 }
2684
2685                 state->nlist->caps[i] = caps;
2686         }
2687
2688         talloc_free(reply);
2689
2690         ctdb_req_control_get_dbmap(&request);
2691         subreq = ctdb_client_control_multi_send(state,
2692                                                 state->ev,
2693                                                 state->client,
2694                                                 state->nlist->pnn_list,
2695                                                 state->nlist->count,
2696                                                 TIMEOUT(),
2697                                                 &request);
2698         if (tevent_req_nomem(subreq, req)) {
2699                 return;
2700         }
2701         tevent_req_set_callback(subreq, recovery_dbmap_done, req);
2702 }
2703
2704 static void recovery_dbmap_done(struct tevent_req *subreq)
2705 {
2706         struct tevent_req *req = tevent_req_callback_data(
2707                 subreq, struct tevent_req);
2708         struct recovery_state *state = tevent_req_data(
2709                 req, struct recovery_state);
2710         struct ctdb_reply_control **reply;
2711         struct ctdb_req_control request;
2712         int *err_list;
2713         unsigned int i, j;
2714         int ret;
2715         bool status;
2716
2717         status = ctdb_client_control_multi_recv(subreq,
2718                                                 &ret,
2719                                                 state,
2720                                                 &err_list,
2721                                                 &reply);
2722         TALLOC_FREE(subreq);
2723         if (! status) {
2724                 int ret2;
2725                 uint32_t pnn;
2726
2727                 ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
2728                                                        state->nlist->count,
2729                                                        err_list,
2730                                                        &pnn);
2731                 if (ret2 != 0) {
2732                         D_ERR("control GET_DBMAP failed on node %u,"
2733                               " ret=%d\n", pnn, ret2);
2734                 } else {
2735                         D_ERR("control GET_DBMAP failed, ret=%d\n",
2736                               ret);
2737                 }
2738                 tevent_req_error(req, ret);
2739                 return;
2740         }
2741
2742         state->dblist = db_list_init(state, state->nlist->count);
2743         if (tevent_req_nomem(state->dblist, req)) {
2744                 D_ERR("memory allocation error\n");
2745                 return;
2746         }
2747
2748         for (i = 0; i < state->nlist->count; i++) {
2749                 struct ctdb_dbid_map *dbmap = NULL;
2750                 uint32_t pnn;
2751
2752                 pnn = state->nlist->pnn_list[i];
2753
2754                 ret = ctdb_reply_control_get_dbmap(reply[i], state, &dbmap);
2755                 if (ret != 0) {
2756                         D_ERR("control GET_DBMAP failed on node %u\n",
2757                               pnn);
2758                         tevent_req_error(req, EPROTO);
2759                         return;
2760                 }
2761
2762                 for (j = 0; j < dbmap->num; j++) {
2763                         ret = db_list_check_and_add(state->dblist,
2764                                                     dbmap->dbs[j].db_id,
2765                                                     dbmap->dbs[j].flags,
2766                                                     pnn);
2767                         if (ret != 0) {
2768                                 D_ERR("failed to add database list entry, "
2769                                       "ret=%d\n",
2770                                       ret);
2771                                 tevent_req_error(req, ret);
2772                                 return;
2773                         }
2774                 }
2775
2776                 TALLOC_FREE(dbmap);
2777         }
2778
2779         ctdb_req_control_set_recmode(&request, CTDB_RECOVERY_ACTIVE);
2780         subreq = ctdb_client_control_multi_send(state,
2781                                                 state->ev,
2782                                                 state->client,
2783                                                 state->nlist->pnn_list,
2784                                                 state->nlist->count,
2785                                                 TIMEOUT(),
2786                                                 &request);
2787         if (tevent_req_nomem(subreq, req)) {
2788                 return;
2789         }
2790         tevent_req_set_callback(subreq, recovery_active_done, req);
2791 }
2792
2793 static void recovery_active_done(struct tevent_req *subreq)
2794 {
2795         struct tevent_req *req = tevent_req_callback_data(
2796                 subreq, struct tevent_req);
2797         struct recovery_state *state = tevent_req_data(
2798                 req, struct recovery_state);
2799         struct ctdb_req_control request;
2800         struct ctdb_vnn_map *vnnmap;
2801         int *err_list;
2802         int ret;
2803         bool status;
2804
2805         status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
2806                                                 NULL);
2807         TALLOC_FREE(subreq);
2808         if (! status) {
2809                 int ret2;
2810                 uint32_t pnn;
2811
2812                 ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
2813                                                        state->nlist->count,
2814                                                        err_list,
2815                                                        &pnn);
2816                 if (ret2 != 0) {
2817                         D_ERR("failed to set recovery mode ACTIVE on node %u,"
2818                               " ret=%d\n", pnn, ret2);
2819                 } else {
2820                         D_ERR("failed to set recovery mode ACTIVE, ret=%d\n",
2821                               ret);
2822                 }
2823                 tevent_req_error(req, ret);
2824                 return;
2825         }
2826
2827         D_ERR("Set recovery mode to ACTIVE\n");
2828
2829         /* Calculate new VNNMAP */
2830         vnnmap = talloc_zero(state, struct ctdb_vnn_map);
2831         if (tevent_req_nomem(vnnmap, req)) {
2832                 return;
2833         }
2834
2835         vnnmap->map = node_list_lmaster(state->nlist, vnnmap, &vnnmap->size);
2836         if (tevent_req_nomem(vnnmap->map, req)) {
2837                 return;
2838         }
2839
2840         if (vnnmap->size == 0) {
2841                 D_WARNING("No active lmasters found. Adding recmaster anyway\n");
2842                 vnnmap->map[0] = state->destnode;
2843                 vnnmap->size = 1;
2844         }
2845
2846         vnnmap->generation = state->generation;
2847
2848         state->vnnmap = vnnmap;
2849
2850         ctdb_req_control_start_recovery(&request);
2851         subreq = ctdb_client_control_multi_send(state,
2852                                                 state->ev,
2853                                                 state->client,
2854                                                 state->nlist->pnn_list,
2855                                                 state->nlist->count,
2856                                                 TIMEOUT(),
2857                                                 &request);
2858         if (tevent_req_nomem(subreq, req)) {
2859                 return;
2860         }
2861         tevent_req_set_callback(subreq, recovery_start_recovery_done, req);
2862 }
2863
2864 static void recovery_start_recovery_done(struct tevent_req *subreq)
2865 {
2866         struct tevent_req *req = tevent_req_callback_data(
2867                 subreq, struct tevent_req);
2868         struct recovery_state *state = tevent_req_data(
2869                 req, struct recovery_state);
2870         struct ctdb_req_control request;
2871         int *err_list;
2872         int ret;
2873         bool status;
2874
2875         status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
2876                                                 NULL);
2877         TALLOC_FREE(subreq);
2878         if (! status) {
2879                 int ret2;
2880                 uint32_t pnn;
2881
2882                 ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
2883                                                        state->nlist->count,
2884                                                        err_list,
2885                                                        &pnn);
2886                 if (ret2 != 0) {
2887                         D_ERR("failed to run start_recovery event on node %u,"
2888                               " ret=%d\n", pnn, ret2);
2889                 } else {
2890                         D_ERR("failed to run start_recovery event, ret=%d\n",
2891                               ret);
2892                 }
2893                 tevent_req_error(req, ret);
2894                 return;
2895         }
2896
2897         D_ERR("start_recovery event finished\n");
2898
2899         ctdb_req_control_setvnnmap(&request, state->vnnmap);
2900         subreq = ctdb_client_control_multi_send(state,
2901                                                 state->ev,
2902                                                 state->client,
2903                                                 state->nlist->pnn_list,
2904                                                 state->nlist->count,
2905                                                 TIMEOUT(),
2906                                                 &request);
2907         if (tevent_req_nomem(subreq, req)) {
2908                 return;
2909         }
2910         tevent_req_set_callback(subreq, recovery_vnnmap_update_done, req);
2911 }
2912
2913 static void recovery_vnnmap_update_done(struct tevent_req *subreq)
2914 {
2915         struct tevent_req *req = tevent_req_callback_data(
2916                 subreq, struct tevent_req);
2917         struct recovery_state *state = tevent_req_data(
2918                 req, struct recovery_state);
2919         int *err_list;
2920         int ret;
2921         bool status;
2922
2923         status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
2924                                                 NULL);
2925         TALLOC_FREE(subreq);
2926         if (! status) {
2927                 int ret2;
2928                 uint32_t pnn;
2929
2930                 ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
2931                                                        state->nlist->count,
2932                                                        err_list,
2933                                                        &pnn);
2934                 if (ret2 != 0) {
2935                         D_ERR("failed to update VNNMAP on node %u, ret=%d\n",
2936                               pnn, ret2);
2937                 } else {
2938                         D_ERR("failed to update VNNMAP, ret=%d\n", ret);
2939                 }
2940                 tevent_req_error(req, ret);
2941                 return;
2942         }
2943
2944         D_NOTICE("updated VNNMAP\n");
2945
2946         subreq = db_recovery_send(state,
2947                                   state->ev,
2948                                   state->client,
2949                                   state->dblist,
2950                                   state->tun_list,
2951                                   state->nlist,
2952                                   state->vnnmap->generation);
2953         if (tevent_req_nomem(subreq, req)) {
2954                 return;
2955         }
2956         tevent_req_set_callback(subreq, recovery_db_recovery_done, req);
2957 }
2958
2959 static void recovery_db_recovery_done(struct tevent_req *subreq)
2960 {
2961         struct tevent_req *req = tevent_req_callback_data(
2962                 subreq, struct tevent_req);
2963         struct recovery_state *state = tevent_req_data(
2964                 req, struct recovery_state);
2965         struct ctdb_req_control request;
2966         bool status;
2967         unsigned int count;
2968
2969         status = db_recovery_recv(subreq, &count);
2970         TALLOC_FREE(subreq);
2971
2972         D_ERR("%d of %d databases recovered\n", count, state->dblist->num_dbs);
2973
2974         if (! status) {
2975                 subreq = ban_node_send(state,
2976                                        state->ev,
2977                                        state->client,
2978                                        state->tun_list,
2979                                        state->nlist);
2980                 if (tevent_req_nomem(subreq, req)) {
2981                         return;
2982                 }
2983                 tevent_req_set_callback(subreq, recovery_failed_done, req);
2984                 return;
2985         }
2986
2987         ctdb_req_control_set_recmode(&request, CTDB_RECOVERY_NORMAL);
2988         subreq = ctdb_client_control_multi_send(state,
2989                                                 state->ev,
2990                                                 state->client,
2991                                                 state->nlist->pnn_list,
2992                                                 state->nlist->count,
2993                                                 TIMEOUT(),
2994                                                 &request);
2995         if (tevent_req_nomem(subreq, req)) {
2996                 return;
2997         }
2998         tevent_req_set_callback(subreq, recovery_normal_done, req);
2999 }
3000
3001 static void recovery_failed_done(struct tevent_req *subreq)
3002 {
3003         struct tevent_req *req = tevent_req_callback_data(
3004                 subreq, struct tevent_req);
3005         int ret;
3006         bool status;
3007
3008         status = ban_node_recv(subreq, &ret);
3009         TALLOC_FREE(subreq);
3010         if (! status) {
3011                 D_ERR("failed to ban node, ret=%d\n", ret);
3012         }
3013
3014         tevent_req_error(req, EIO);
3015 }
3016
3017 static void recovery_normal_done(struct tevent_req *subreq)
3018 {
3019         struct tevent_req *req = tevent_req_callback_data(
3020                 subreq, struct tevent_req);
3021         struct recovery_state *state = tevent_req_data(
3022                 req, struct recovery_state);
3023         struct ctdb_req_control request;
3024         int *err_list;
3025         int ret;
3026         bool status;
3027
3028         status = ctdb_client_control_multi_recv(subreq, &ret, state, &err_list,
3029                                                 NULL);
3030         TALLOC_FREE(subreq);
3031         if (! status) {
3032                 int ret2;
3033                 uint32_t pnn;
3034
3035                 ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
3036                                                        state->nlist->count,
3037                                                        err_list,
3038                                                        &pnn);
3039                 if (ret2 != 0) {
3040                         D_ERR("failed to set recovery mode NORMAL on node %u,"
3041                               " ret=%d\n", pnn, ret2);
3042                 } else {
3043                         D_ERR("failed to set recovery mode NORMAL, ret=%d\n",
3044                               ret);
3045                 }
3046                 tevent_req_error(req, ret);
3047                 return;
3048         }
3049
3050         D_ERR("Set recovery mode to NORMAL\n");
3051
3052         ctdb_req_control_end_recovery(&request);
3053         subreq = ctdb_client_control_multi_send(state,
3054                                                 state->ev,
3055                                                 state->client,
3056                                                 state->nlist->pnn_list,
3057                                                 state->nlist->count,
3058                                                 TIMEOUT(),
3059                                                 &request);
3060         if (tevent_req_nomem(subreq, req)) {
3061                 return;
3062         }
3063         tevent_req_set_callback(subreq, recovery_end_recovery_done, req);
3064 }
3065
3066 static void recovery_end_recovery_done(struct tevent_req *subreq)
3067 {
3068         struct tevent_req *req = tevent_req_callback_data(
3069                 subreq, struct tevent_req);
3070         struct recovery_state *state = tevent_req_data(
3071                 req, struct recovery_state);
3072         int *err_list;
3073         int ret;
3074         bool status;
3075
3076         status = ctdb_client_control_multi_recv(subreq, &ret, state, &err_list,
3077                                                 NULL);
3078         TALLOC_FREE(subreq);
3079         if (! status) {
3080                 int ret2;
3081                 uint32_t pnn;
3082
3083                 ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
3084                                                        state->nlist->count,
3085                                                        err_list,
3086                                                        &pnn);
3087                 if (ret2 != 0) {
3088                         D_ERR("failed to run recovered event on node %u,"
3089                               " ret=%d\n", pnn, ret2);
3090                 } else {
3091                         D_ERR("failed to run recovered event, ret=%d\n", ret);
3092                 }
3093                 tevent_req_error(req, ret);
3094                 return;
3095         }
3096
3097         D_ERR("recovered event finished\n");
3098
3099         tevent_req_done(req);
3100 }
3101
3102 static void recovery_recv(struct tevent_req *req, int *perr)
3103 {
3104         generic_recv(req, perr);
3105 }
3106
3107 static void usage(const char *progname)
3108 {
3109         fprintf(stderr, "\nUsage: %s <output-fd> <ctdb-socket-path> <generation>\n",
3110                 progname);
3111 }
3112
3113
3114 /*
3115  * Arguments - log fd, write fd, socket path, generation
3116  */
3117 int main(int argc, char *argv[])
3118 {
3119         int write_fd;
3120         const char *sockpath;
3121         TALLOC_CTX *mem_ctx = NULL;
3122         struct tevent_context *ev;
3123         struct ctdb_client_context *client;
3124         bool status;
3125         int ret = 0;
3126         struct tevent_req *req;
3127         uint32_t generation;
3128
3129         if (argc != 4) {
3130                 usage(argv[0]);
3131                 exit(1);
3132         }
3133
3134         write_fd = atoi(argv[1]);
3135         sockpath = argv[2];
3136         generation = (uint32_t)smb_strtoul(argv[3],
3137                                            NULL,
3138                                            0,
3139                                            &ret,
3140                                            SMB_STR_STANDARD);
3141         if (ret != 0) {
3142                 fprintf(stderr, "recovery: unable to initialize generation\n");
3143                 goto failed;
3144         }
3145
3146         mem_ctx = talloc_new(NULL);
3147         if (mem_ctx == NULL) {
3148                 fprintf(stderr, "recovery: talloc_new() failed\n");
3149                 goto failed;
3150         }
3151
3152         ret = logging_init(mem_ctx, NULL, NULL, "ctdb-recovery");
3153         if (ret != 0) {
3154                 fprintf(stderr, "recovery: Unable to initialize logging\n");
3155                 goto failed;
3156         }
3157
3158         ev = tevent_context_init(mem_ctx);
3159         if (ev == NULL) {
3160                 D_ERR("tevent_context_init() failed\n");
3161                 goto failed;
3162         }
3163
3164         status = logging_setup_sighup_handler(ev, mem_ctx, NULL, NULL);
3165         if (!status) {
3166                 D_ERR("logging_setup_sighup_handler() failed\n");
3167                 goto failed;
3168         }
3169
3170         ret = ctdb_client_init(mem_ctx, ev, sockpath, &client);
3171         if (ret != 0) {
3172                 D_ERR("ctdb_client_init() failed, ret=%d\n", ret);
3173                 goto failed;
3174         }
3175
3176         req = recovery_send(mem_ctx, ev, client, generation);
3177         if (req == NULL) {
3178                 D_ERR("database_recover_send() failed\n");
3179                 goto failed;
3180         }
3181
3182         if (! tevent_req_poll(req, ev)) {
3183                 D_ERR("tevent_req_poll() failed\n");
3184                 goto failed;
3185         }
3186
3187         recovery_recv(req, &ret);
3188         TALLOC_FREE(req);
3189         if (ret != 0) {
3190                 D_ERR("database recovery failed, ret=%d\n", ret);
3191                 goto failed;
3192         }
3193
3194         sys_write(write_fd, &ret, sizeof(ret));
3195         return 0;
3196
3197 failed:
3198         TALLOC_FREE(mem_ctx);
3199         return 1;
3200 }