lib: relicense smb_strtoul(l) under LGPLv3
[samba.git] / ctdb / server / ctdb_recovery_helper.c
1 /*
2    ctdb parallel database recovery
3
4    Copyright (C) Amitay Isaacs  2015
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "replace.h"
21 #include "system/network.h"
22 #include "system/filesys.h"
23
24 #include <talloc.h>
25 #include <tevent.h>
26 #include <tdb.h>
27 #include <libgen.h>
28
29 #include "lib/tdb_wrap/tdb_wrap.h"
30 #include "lib/util/dlinklist.h"
31 #include "lib/util/sys_rw.h"
32 #include "lib/util/time.h"
33 #include "lib/util/tevent_unix.h"
34 #include "lib/util/util.h"
35 #include "lib/util/smb_strtox.h"
36
37 #include "protocol/protocol.h"
38 #include "protocol/protocol_api.h"
39 #include "client/client.h"
40
41 #include "common/logging.h"
42
43 static int recover_timeout = 30;
44
45 #define NUM_RETRIES     3
46
47 #define TIMEOUT()       timeval_current_ofs(recover_timeout, 0)
48
49 /*
50  * Utility functions
51  */
52
53 static bool generic_recv(struct tevent_req *req, int *perr)
54 {
55         int err;
56
57         if (tevent_req_is_unix_error(req, &err)) {
58                 if (perr != NULL) {
59                         *perr = err;
60                 }
61                 return false;
62         }
63
64         return true;
65 }
66
67 static uint64_t rec_srvid = CTDB_SRVID_RECOVERY;
68
69 static uint64_t srvid_next(void)
70 {
71         rec_srvid += 1;
72         return rec_srvid;
73 }
74
75 /*
76  * Node related functions
77  */
78
79 struct node_list {
80         uint32_t *pnn_list;
81         uint32_t *caps;
82         uint32_t *ban_credits;
83         unsigned int size;
84         unsigned int count;
85 };
86
87 static struct node_list *node_list_init(TALLOC_CTX *mem_ctx, unsigned int size)
88 {
89         struct node_list *nlist;
90         unsigned int i;
91
92         nlist = talloc_zero(mem_ctx, struct node_list);
93         if (nlist == NULL) {
94                 return NULL;
95         }
96
97         nlist->pnn_list = talloc_array(nlist, uint32_t, size);
98         nlist->caps = talloc_zero_array(nlist, uint32_t, size);
99         nlist->ban_credits = talloc_zero_array(nlist, uint32_t, size);
100
101         if (nlist->pnn_list == NULL ||
102             nlist->caps == NULL ||
103             nlist->ban_credits == NULL) {
104                 talloc_free(nlist);
105                 return NULL;
106         }
107         nlist->size = size;
108
109         for (i=0; i<nlist->size; i++) {
110                 nlist->pnn_list[i] = CTDB_UNKNOWN_PNN;
111         }
112
113         return nlist;
114 }
115
116 static bool node_list_add(struct node_list *nlist, uint32_t pnn)
117 {
118         unsigned int i;
119
120         if (nlist->count == nlist->size) {
121                 return false;
122         }
123
124         for (i=0; i<nlist->count; i++) {
125                 if (nlist->pnn_list[i] == pnn) {
126                         return false;
127                 }
128         }
129
130         nlist->pnn_list[nlist->count] = pnn;
131         nlist->count += 1;
132
133         return true;
134 }
135
136 static uint32_t *node_list_lmaster(struct node_list *nlist,
137                                    TALLOC_CTX *mem_ctx,
138                                    unsigned int *pnn_count)
139 {
140         uint32_t *pnn_list;
141         unsigned int count, i;
142
143         pnn_list = talloc_zero_array(mem_ctx, uint32_t, nlist->count);
144         if (pnn_list == NULL) {
145                 return NULL;
146         }
147
148         count = 0;
149         for (i=0; i<nlist->count; i++) {
150                 if (!(nlist->caps[i] & CTDB_CAP_LMASTER)) {
151                         continue;
152                 }
153
154                 pnn_list[count] = nlist->pnn_list[i];
155                 count += 1;
156         }
157
158         *pnn_count = count;
159         return pnn_list;
160 }
161
162 static void node_list_ban_credits(struct node_list *nlist, uint32_t pnn)
163 {
164         unsigned int i;
165
166         for (i=0; i<nlist->count; i++) {
167                 if (nlist->pnn_list[i] == pnn) {
168                         nlist->ban_credits[i] += 1;
169                         break;
170                 }
171         }
172 }
173
174 /*
175  * Database list functions
176  *
177  * Simple, naive implementation that could be updated to a db_hash or similar
178  */
179
180 struct db {
181         struct db *prev, *next;
182
183         uint32_t db_id;
184         uint32_t db_flags;
185         uint32_t *pnn_list;
186         unsigned int num_nodes;
187 };
188
189 struct db_list {
190         unsigned int num_dbs;
191         struct db *db;
192         unsigned int num_nodes;
193 };
194
195 static struct db_list *db_list_init(TALLOC_CTX *mem_ctx, unsigned int num_nodes)
196 {
197         struct db_list *l;
198
199         l = talloc_zero(mem_ctx, struct db_list);
200         l->num_nodes = num_nodes;
201
202         return l;
203 }
204
205 static struct db *db_list_find(struct db_list *dblist, uint32_t db_id)
206 {
207         struct db *db;
208
209         if (dblist == NULL) {
210                 return NULL;
211         }
212
213         db = dblist->db;
214         while (db != NULL && db->db_id != db_id) {
215                 db = db->next;
216         }
217
218         return db;
219 }
220
221 static int db_list_add(struct db_list *dblist,
222                        uint32_t db_id,
223                        uint32_t db_flags,
224                        uint32_t node)
225 {
226         struct db *db = NULL;
227
228         if (dblist == NULL) {
229                 return EINVAL;
230         }
231
232         db = talloc_zero(dblist, struct db);
233         if (db == NULL) {
234                 return ENOMEM;
235         }
236
237         db->db_id = db_id;
238         db->db_flags = db_flags;
239         db->pnn_list = talloc_zero_array(db, uint32_t, dblist->num_nodes);
240         if (db->pnn_list == NULL) {
241                 talloc_free(db);
242                 return ENOMEM;
243         }
244         db->pnn_list[0] = node;
245         db->num_nodes = 1;
246
247         DLIST_ADD_END(dblist->db, db);
248         dblist->num_dbs++;
249
250         return 0;
251 }
252
253 static int db_list_check_and_add(struct db_list *dblist,
254                        uint32_t db_id,
255                        uint32_t db_flags,
256                        uint32_t node)
257 {
258         struct db *db = NULL;
259         int ret;
260
261         /*
262          * These flags are masked out because they are only set on a
263          * node when a client attaches to that node, so they might not
264          * be set yet.  They can't be passed as part of the attch, so
265          * they're no use here.
266          */
267         db_flags &= ~(CTDB_DB_FLAGS_READONLY | CTDB_DB_FLAGS_STICKY);
268
269         if (dblist == NULL) {
270                 return EINVAL;
271         }
272
273         db = db_list_find(dblist, db_id);
274         if (db == NULL) {
275                 ret = db_list_add(dblist, db_id, db_flags, node);
276                 return ret;
277         }
278
279         if (db->db_flags != db_flags) {
280                 D_ERR("Incompatible database flags for 0x%"PRIx32" "
281                       "(0x%"PRIx32" != 0x%"PRIx32")\n",
282                       db_id,
283                       db_flags,
284                       db->db_flags);
285                 return EINVAL;
286         }
287
288         if (db->num_nodes >= dblist->num_nodes) {
289                 return EINVAL;
290         }
291
292         db->pnn_list[db->num_nodes] = node;
293         db->num_nodes++;
294
295         return 0;
296 }
297
298 /*
299  * Create database on nodes where it is missing
300  */
301
302 struct db_create_missing_state {
303         struct tevent_context *ev;
304         struct ctdb_client_context *client;
305
306         struct node_list *nlist;
307
308         const char *db_name;
309         uint32_t *missing_pnn_list;
310         int missing_num_nodes;
311 };
312
313 static void db_create_missing_done(struct tevent_req *subreq);
314
315 static struct tevent_req *db_create_missing_send(
316                                         TALLOC_CTX *mem_ctx,
317                                         struct tevent_context *ev,
318                                         struct ctdb_client_context *client,
319                                         struct node_list *nlist,
320                                         const char *db_name,
321                                         struct db *db)
322 {
323         struct tevent_req *req, *subreq;
324         struct db_create_missing_state *state;
325         struct ctdb_req_control request;
326         unsigned int i, j;
327
328         req = tevent_req_create(mem_ctx,
329                                 &state,
330                                 struct db_create_missing_state);
331         if (req == NULL) {
332                 return NULL;
333         }
334
335         state->ev = ev;
336         state->client = client;
337         state->nlist = nlist;
338         state->db_name = db_name;
339
340         if (nlist->count == db->num_nodes) {
341                 tevent_req_done(req);
342                 return tevent_req_post(req, ev);
343         }
344
345         state->missing_pnn_list = talloc_array(mem_ctx, uint32_t, nlist->count);
346         if (tevent_req_nomem(state->missing_pnn_list, req)) {
347                 return tevent_req_post(req, ev);
348         }
349
350         for (i = 0; i < nlist->count; i++) {
351                 uint32_t pnn = nlist->pnn_list[i] ;
352
353                 for (j = 0; j < db->num_nodes; j++) {
354                         if (pnn == db->pnn_list[j]) {
355                                 break;
356                         }
357                 }
358
359                 if (j < db->num_nodes) {
360                         continue;
361                 }
362
363                 DBG_INFO("Create database %s on node %u\n",
364                          state->db_name,
365                          pnn);
366                 state->missing_pnn_list[state->missing_num_nodes] = pnn;
367                 state->missing_num_nodes++;
368         }
369
370         if (db->db_flags & CTDB_DB_FLAGS_PERSISTENT) {
371                 ctdb_req_control_db_attach_persistent(&request, db_name);
372         } else if (db->db_flags & CTDB_DB_FLAGS_REPLICATED) {
373                 ctdb_req_control_db_attach_replicated(&request, db_name);
374         } else {
375                 ctdb_req_control_db_attach(&request, db_name);
376         }
377         request.flags = CTDB_CTRL_FLAG_ATTACH_RECOVERY;
378         subreq = ctdb_client_control_multi_send(state,
379                                                 state->ev,
380                                                 state->client,
381                                                 state->missing_pnn_list,
382                                                 state->missing_num_nodes,
383                                                 TIMEOUT(),
384                                                 &request);
385         if (tevent_req_nomem(subreq, req)) {
386                 return tevent_req_post(req, ev);
387         }
388         tevent_req_set_callback(subreq, db_create_missing_done, req);
389
390         return req;
391 }
392
393 static void db_create_missing_done(struct tevent_req *subreq)
394 {
395         struct tevent_req *req = tevent_req_callback_data(
396                 subreq, struct tevent_req);
397         struct db_create_missing_state *state = tevent_req_data(
398                 req, struct db_create_missing_state);
399         int *err_list;
400         int ret;
401         bool status;
402
403         status = ctdb_client_control_multi_recv(subreq,
404                                                 &ret,
405                                                 NULL,
406                                                 &err_list,
407                                                 NULL);
408         TALLOC_FREE(subreq);
409         if (! status) {
410                 int ret2;
411                 uint32_t pnn;
412
413                 ret2 = ctdb_client_control_multi_error(
414                                                 state->missing_pnn_list,
415                                                 state->missing_num_nodes,
416                                                 err_list,
417                                                 &pnn);
418                 if (ret2 != 0) {
419                         D_ERR("control DB_ATTACH failed for db %s"
420                               " on node %u, ret=%d\n",
421                               state->db_name,
422                               pnn,
423                               ret2);
424                         node_list_ban_credits(state->nlist, pnn);
425                 } else {
426                         D_ERR("control DB_ATTACH failed for db %s, ret=%d\n",
427                               state->db_name,
428                               ret);
429                 }
430                 tevent_req_error(req, ret);
431                 return;
432         }
433
434         tevent_req_done(req);
435 }
436
437 static bool db_create_missing_recv(struct tevent_req *req, int *perr)
438 {
439         return generic_recv(req, perr);
440 }
441
442 /*
443  * Recovery database functions
444  */
445
446 struct recdb_context {
447         uint32_t db_id;
448         const char *db_name;
449         const char *db_path;
450         struct tdb_wrap *db;
451         bool persistent;
452 };
453
454 static struct recdb_context *recdb_create(TALLOC_CTX *mem_ctx, uint32_t db_id,
455                                           const char *db_name,
456                                           const char *db_path,
457                                           uint32_t hash_size, bool persistent)
458 {
459         static char *db_dir_state = NULL;
460         struct recdb_context *recdb;
461         unsigned int tdb_flags;
462
463         recdb = talloc(mem_ctx, struct recdb_context);
464         if (recdb == NULL) {
465                 return NULL;
466         }
467
468         if (db_dir_state == NULL) {
469                 db_dir_state = getenv("CTDB_DBDIR_STATE");
470         }
471
472         recdb->db_name = db_name;
473         recdb->db_id = db_id;
474         recdb->db_path = talloc_asprintf(recdb, "%s/recdb.%s",
475                                          db_dir_state != NULL ?
476                                             db_dir_state :
477                                             dirname(discard_const(db_path)),
478                                          db_name);
479         if (recdb->db_path == NULL) {
480                 talloc_free(recdb);
481                 return NULL;
482         }
483         unlink(recdb->db_path);
484
485         tdb_flags = TDB_NOLOCK | TDB_INCOMPATIBLE_HASH | TDB_DISALLOW_NESTING;
486         recdb->db = tdb_wrap_open(mem_ctx, recdb->db_path, hash_size,
487                                   tdb_flags, O_RDWR|O_CREAT|O_EXCL, 0600);
488         if (recdb->db == NULL) {
489                 talloc_free(recdb);
490                 D_ERR("failed to create recovery db %s\n", recdb->db_path);
491                 return NULL;
492         }
493
494         recdb->persistent = persistent;
495
496         return recdb;
497 }
498
499 static uint32_t recdb_id(struct recdb_context *recdb)
500 {
501         return recdb->db_id;
502 }
503
504 static const char *recdb_name(struct recdb_context *recdb)
505 {
506         return recdb->db_name;
507 }
508
509 static const char *recdb_path(struct recdb_context *recdb)
510 {
511         return recdb->db_path;
512 }
513
514 static struct tdb_context *recdb_tdb(struct recdb_context *recdb)
515 {
516         return recdb->db->tdb;
517 }
518
519 static bool recdb_persistent(struct recdb_context *recdb)
520 {
521         return recdb->persistent;
522 }
523
524 struct recdb_add_traverse_state {
525         struct recdb_context *recdb;
526         uint32_t mypnn;
527 };
528
529 static int recdb_add_traverse(uint32_t reqid, struct ctdb_ltdb_header *header,
530                               TDB_DATA key, TDB_DATA data,
531                               void *private_data)
532 {
533         struct recdb_add_traverse_state *state =
534                 (struct recdb_add_traverse_state *)private_data;
535         struct ctdb_ltdb_header *hdr;
536         TDB_DATA prev_data;
537         int ret;
538
539         /* header is not marshalled separately in the pulldb control */
540         if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
541                 return -1;
542         }
543
544         hdr = (struct ctdb_ltdb_header *)data.dptr;
545
546         /* fetch the existing record, if any */
547         prev_data = tdb_fetch(recdb_tdb(state->recdb), key);
548
549         if (prev_data.dptr != NULL) {
550                 struct ctdb_ltdb_header prev_hdr;
551
552                 prev_hdr = *(struct ctdb_ltdb_header *)prev_data.dptr;
553                 free(prev_data.dptr);
554                 if (hdr->rsn < prev_hdr.rsn ||
555                     (hdr->rsn == prev_hdr.rsn &&
556                      prev_hdr.dmaster != state->mypnn)) {
557                         return 0;
558                 }
559         }
560
561         ret = tdb_store(recdb_tdb(state->recdb), key, data, TDB_REPLACE);
562         if (ret != 0) {
563                 return -1;
564         }
565         return 0;
566 }
567
568 static bool recdb_add(struct recdb_context *recdb, int mypnn,
569                       struct ctdb_rec_buffer *recbuf)
570 {
571         struct recdb_add_traverse_state state;
572         int ret;
573
574         state.recdb = recdb;
575         state.mypnn = mypnn;
576
577         ret = ctdb_rec_buffer_traverse(recbuf, recdb_add_traverse, &state);
578         if (ret != 0) {
579                 return false;
580         }
581
582         return true;
583 }
584
585 /* This function decides which records from recdb are retained */
586 static int recbuf_filter_add(struct ctdb_rec_buffer *recbuf, bool persistent,
587                              uint32_t reqid, uint32_t dmaster,
588                              TDB_DATA key, TDB_DATA data)
589 {
590         struct ctdb_ltdb_header *header;
591         int ret;
592
593         /* Skip empty records */
594         if (data.dsize <= sizeof(struct ctdb_ltdb_header)) {
595                 return 0;
596         }
597
598         /* update the dmaster field to point to us */
599         header = (struct ctdb_ltdb_header *)data.dptr;
600         if (!persistent) {
601                 header->dmaster = dmaster;
602                 header->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
603         }
604
605         ret = ctdb_rec_buffer_add(recbuf, recbuf, reqid, NULL, key, data);
606         if (ret != 0) {
607                 return ret;
608         }
609
610         return 0;
611 }
612
613 struct recdb_records_traverse_state {
614         struct ctdb_rec_buffer *recbuf;
615         uint32_t dmaster;
616         uint32_t reqid;
617         bool persistent;
618         bool failed;
619 };
620
621 static int recdb_records_traverse(struct tdb_context *tdb,
622                                   TDB_DATA key, TDB_DATA data,
623                                   void *private_data)
624 {
625         struct recdb_records_traverse_state *state =
626                 (struct recdb_records_traverse_state *)private_data;
627         int ret;
628
629         ret = recbuf_filter_add(state->recbuf, state->persistent,
630                                 state->reqid, state->dmaster, key, data);
631         if (ret != 0) {
632                 state->failed = true;
633                 return ret;
634         }
635
636         return 0;
637 }
638
639 static struct ctdb_rec_buffer *recdb_records(struct recdb_context *recdb,
640                                              TALLOC_CTX *mem_ctx,
641                                              uint32_t dmaster)
642 {
643         struct recdb_records_traverse_state state;
644         int ret;
645
646         state.recbuf = ctdb_rec_buffer_init(mem_ctx, recdb_id(recdb));
647         if (state.recbuf == NULL) {
648                 return NULL;
649         }
650         state.dmaster = dmaster;
651         state.reqid = 0;
652         state.persistent = recdb_persistent(recdb);
653         state.failed = false;
654
655         ret = tdb_traverse_read(recdb_tdb(recdb), recdb_records_traverse,
656                                 &state);
657         if (ret == -1 || state.failed) {
658                 D_ERR("Failed to marshall recovery records for %s\n",
659                       recdb_name(recdb));
660                 TALLOC_FREE(state.recbuf);
661                 return NULL;
662         }
663
664         return state.recbuf;
665 }
666
667 struct recdb_file_traverse_state {
668         struct ctdb_rec_buffer *recbuf;
669         struct recdb_context *recdb;
670         TALLOC_CTX *mem_ctx;
671         uint32_t dmaster;
672         uint32_t reqid;
673         bool persistent;
674         bool failed;
675         int fd;
676         size_t max_size;
677         unsigned int num_buffers;
678 };
679
680 static int recdb_file_traverse(struct tdb_context *tdb,
681                                TDB_DATA key, TDB_DATA data,
682                                void *private_data)
683 {
684         struct recdb_file_traverse_state *state =
685                 (struct recdb_file_traverse_state *)private_data;
686         int ret;
687
688         ret = recbuf_filter_add(state->recbuf, state->persistent,
689                                 state->reqid, state->dmaster, key, data);
690         if (ret != 0) {
691                 state->failed = true;
692                 return ret;
693         }
694
695         if (ctdb_rec_buffer_len(state->recbuf) > state->max_size) {
696                 ret = ctdb_rec_buffer_write(state->recbuf, state->fd);
697                 if (ret != 0) {
698                         D_ERR("Failed to collect recovery records for %s\n",
699                               recdb_name(state->recdb));
700                         state->failed = true;
701                         return ret;
702                 }
703
704                 state->num_buffers += 1;
705
706                 TALLOC_FREE(state->recbuf);
707                 state->recbuf = ctdb_rec_buffer_init(state->mem_ctx,
708                                                      recdb_id(state->recdb));
709                 if (state->recbuf == NULL) {
710                         state->failed = true;
711                         return ENOMEM;
712                 }
713         }
714
715         return 0;
716 }
717
718 static int recdb_file(struct recdb_context *recdb, TALLOC_CTX *mem_ctx,
719                       uint32_t dmaster, int fd, int max_size)
720 {
721         struct recdb_file_traverse_state state;
722         int ret;
723
724         state.recbuf = ctdb_rec_buffer_init(mem_ctx, recdb_id(recdb));
725         if (state.recbuf == NULL) {
726                 return -1;
727         }
728         state.recdb = recdb;
729         state.mem_ctx = mem_ctx;
730         state.dmaster = dmaster;
731         state.reqid = 0;
732         state.persistent = recdb_persistent(recdb);
733         state.failed = false;
734         state.fd = fd;
735         state.max_size = max_size;
736         state.num_buffers = 0;
737
738         ret = tdb_traverse_read(recdb_tdb(recdb), recdb_file_traverse, &state);
739         if (ret == -1 || state.failed) {
740                 TALLOC_FREE(state.recbuf);
741                 return -1;
742         }
743
744         ret = ctdb_rec_buffer_write(state.recbuf, fd);
745         if (ret != 0) {
746                 D_ERR("Failed to collect recovery records for %s\n",
747                       recdb_name(recdb));
748                 TALLOC_FREE(state.recbuf);
749                 return -1;
750         }
751         state.num_buffers += 1;
752
753         D_DEBUG("Wrote %d buffers of recovery records for %s\n",
754                 state.num_buffers, recdb_name(recdb));
755
756         return state.num_buffers;
757 }
758
759 /*
760  * Pull database from a single node
761  */
762
763 struct pull_database_state {
764         struct tevent_context *ev;
765         struct ctdb_client_context *client;
766         struct recdb_context *recdb;
767         uint32_t pnn;
768         uint64_t srvid;
769         unsigned int num_records;
770         int result;
771 };
772
773 static void pull_database_handler(uint64_t srvid, TDB_DATA data,
774                                   void *private_data);
775 static void pull_database_register_done(struct tevent_req *subreq);
776 static void pull_database_old_done(struct tevent_req *subreq);
777 static void pull_database_unregister_done(struct tevent_req *subreq);
778 static void pull_database_new_done(struct tevent_req *subreq);
779
780 static struct tevent_req *pull_database_send(
781                         TALLOC_CTX *mem_ctx,
782                         struct tevent_context *ev,
783                         struct ctdb_client_context *client,
784                         uint32_t pnn, uint32_t caps,
785                         struct recdb_context *recdb)
786 {
787         struct tevent_req *req, *subreq;
788         struct pull_database_state *state;
789         struct ctdb_req_control request;
790
791         req = tevent_req_create(mem_ctx, &state, struct pull_database_state);
792         if (req == NULL) {
793                 return NULL;
794         }
795
796         state->ev = ev;
797         state->client = client;
798         state->recdb = recdb;
799         state->pnn = pnn;
800         state->srvid = srvid_next();
801
802         if (caps & CTDB_CAP_FRAGMENTED_CONTROLS) {
803                 subreq = ctdb_client_set_message_handler_send(
804                                         state, state->ev, state->client,
805                                         state->srvid, pull_database_handler,
806                                         req);
807                 if (tevent_req_nomem(subreq, req)) {
808                         return tevent_req_post(req, ev);
809                 }
810
811                 tevent_req_set_callback(subreq, pull_database_register_done,
812                                         req);
813
814         } else {
815                 struct ctdb_pulldb pulldb;
816
817                 pulldb.db_id = recdb_id(recdb);
818                 pulldb.lmaster = CTDB_LMASTER_ANY;
819
820                 ctdb_req_control_pull_db(&request, &pulldb);
821                 subreq = ctdb_client_control_send(state, state->ev,
822                                                   state->client,
823                                                   pnn, TIMEOUT(),
824                                                   &request);
825                 if (tevent_req_nomem(subreq, req)) {
826                         return tevent_req_post(req, ev);
827                 }
828                 tevent_req_set_callback(subreq, pull_database_old_done, req);
829         }
830
831         return req;
832 }
833
834 static void pull_database_handler(uint64_t srvid, TDB_DATA data,
835                                   void *private_data)
836 {
837         struct tevent_req *req = talloc_get_type_abort(
838                 private_data, struct tevent_req);
839         struct pull_database_state *state = tevent_req_data(
840                 req, struct pull_database_state);
841         struct ctdb_rec_buffer *recbuf;
842         size_t np;
843         int ret;
844         bool status;
845
846         if (srvid != state->srvid) {
847                 return;
848         }
849
850         ret = ctdb_rec_buffer_pull(data.dptr, data.dsize, state, &recbuf, &np);
851         if (ret != 0) {
852                 D_ERR("Invalid data received for DB_PULL messages\n");
853                 return;
854         }
855
856         if (recbuf->db_id != recdb_id(state->recdb)) {
857                 talloc_free(recbuf);
858                 D_ERR("Invalid dbid:%08x for DB_PULL messages for %s\n",
859                       recbuf->db_id, recdb_name(state->recdb));
860                 return;
861         }
862
863         status = recdb_add(state->recdb, ctdb_client_pnn(state->client),
864                            recbuf);
865         if (! status) {
866                 talloc_free(recbuf);
867                 D_ERR("Failed to add records to recdb for %s\n",
868                       recdb_name(state->recdb));
869                 return;
870         }
871
872         state->num_records += recbuf->count;
873         talloc_free(recbuf);
874 }
875
876 static void pull_database_register_done(struct tevent_req *subreq)
877 {
878         struct tevent_req *req = tevent_req_callback_data(
879                 subreq, struct tevent_req);
880         struct pull_database_state *state = tevent_req_data(
881                 req, struct pull_database_state);
882         struct ctdb_req_control request;
883         struct ctdb_pulldb_ext pulldb_ext;
884         int ret;
885         bool status;
886
887         status = ctdb_client_set_message_handler_recv(subreq, &ret);
888         TALLOC_FREE(subreq);
889         if (! status) {
890                 D_ERR("Failed to set message handler for DB_PULL for %s\n",
891                       recdb_name(state->recdb));
892                 tevent_req_error(req, ret);
893                 return;
894         }
895
896         pulldb_ext.db_id = recdb_id(state->recdb);
897         pulldb_ext.lmaster = CTDB_LMASTER_ANY;
898         pulldb_ext.srvid = state->srvid;
899
900         ctdb_req_control_db_pull(&request, &pulldb_ext);
901         subreq = ctdb_client_control_send(state, state->ev, state->client,
902                                           state->pnn, TIMEOUT(), &request);
903         if (tevent_req_nomem(subreq, req)) {
904                 return;
905         }
906         tevent_req_set_callback(subreq, pull_database_new_done, req);
907 }
908
909 static void pull_database_old_done(struct tevent_req *subreq)
910 {
911         struct tevent_req *req = tevent_req_callback_data(
912                 subreq, struct tevent_req);
913         struct pull_database_state *state = tevent_req_data(
914                 req, struct pull_database_state);
915         struct ctdb_reply_control *reply;
916         struct ctdb_rec_buffer *recbuf;
917         int ret;
918         bool status;
919
920         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
921         TALLOC_FREE(subreq);
922         if (! status) {
923                 D_ERR("control PULL_DB failed for %s on node %u, ret=%d\n",
924                       recdb_name(state->recdb), state->pnn, ret);
925                 tevent_req_error(req, ret);
926                 return;
927         }
928
929         ret = ctdb_reply_control_pull_db(reply, state, &recbuf);
930         talloc_free(reply);
931         if (ret != 0) {
932                 tevent_req_error(req, ret);
933                 return;
934         }
935
936         status = recdb_add(state->recdb, ctdb_client_pnn(state->client),
937                            recbuf);
938         if (! status) {
939                 talloc_free(recbuf);
940                 tevent_req_error(req, EIO);
941                 return;
942         }
943
944         state->num_records = recbuf->count;
945         talloc_free(recbuf);
946
947         D_INFO("Pulled %d records for db %s from node %d\n",
948                state->num_records, recdb_name(state->recdb), state->pnn);
949
950         tevent_req_done(req);
951 }
952
953 static void pull_database_new_done(struct tevent_req *subreq)
954 {
955         struct tevent_req *req = tevent_req_callback_data(
956                 subreq, struct tevent_req);
957         struct pull_database_state *state = tevent_req_data(
958                 req, struct pull_database_state);
959         struct ctdb_reply_control *reply;
960         uint32_t num_records;
961         int ret;
962         bool status;
963
964         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
965         TALLOC_FREE(subreq);
966         if (! status) {
967                 D_ERR("control DB_PULL failed for %s on node %u, ret=%d\n",
968                       recdb_name(state->recdb), state->pnn, ret);
969                 state->result = ret;
970                 goto unregister;
971         }
972
973         ret = ctdb_reply_control_db_pull(reply, &num_records);
974         talloc_free(reply);
975         if (num_records != state->num_records) {
976                 D_ERR("mismatch (%u != %u) in DB_PULL records for db %s\n",
977                       num_records, state->num_records,
978                       recdb_name(state->recdb));
979                 state->result = EIO;
980                 goto unregister;
981         }
982
983         D_INFO("Pulled %d records for db %s from node %d\n",
984                state->num_records, recdb_name(state->recdb), state->pnn);
985
986 unregister:
987
988         subreq = ctdb_client_remove_message_handler_send(
989                                         state, state->ev, state->client,
990                                         state->srvid, req);
991         if (tevent_req_nomem(subreq, req)) {
992                 return;
993         }
994         tevent_req_set_callback(subreq, pull_database_unregister_done, req);
995 }
996
997 static void pull_database_unregister_done(struct tevent_req *subreq)
998 {
999         struct tevent_req *req = tevent_req_callback_data(
1000                 subreq, struct tevent_req);
1001         struct pull_database_state *state = tevent_req_data(
1002                 req, struct pull_database_state);
1003         int ret;
1004         bool status;
1005
1006         status = ctdb_client_remove_message_handler_recv(subreq, &ret);
1007         TALLOC_FREE(subreq);
1008         if (! status) {
1009                 D_ERR("failed to remove message handler for DB_PULL for db %s\n",
1010                       recdb_name(state->recdb));
1011                 tevent_req_error(req, ret);
1012                 return;
1013         }
1014
1015         if (state->result != 0) {
1016                 tevent_req_error(req, state->result);
1017                 return;
1018         }
1019
1020         tevent_req_done(req);
1021 }
1022
1023 static bool pull_database_recv(struct tevent_req *req, int *perr)
1024 {
1025         return generic_recv(req, perr);
1026 }
1027
1028 /*
1029  * Push database to specified nodes (old style)
1030  */
1031
1032 struct push_database_old_state {
1033         struct tevent_context *ev;
1034         struct ctdb_client_context *client;
1035         struct recdb_context *recdb;
1036         uint32_t *pnn_list;
1037         unsigned int count;
1038         struct ctdb_rec_buffer *recbuf;
1039         unsigned int index;
1040 };
1041
1042 static void push_database_old_push_done(struct tevent_req *subreq);
1043
1044 static struct tevent_req *push_database_old_send(
1045                         TALLOC_CTX *mem_ctx,
1046                         struct tevent_context *ev,
1047                         struct ctdb_client_context *client,
1048                         uint32_t *pnn_list,
1049                         unsigned int count,
1050                         struct recdb_context *recdb)
1051 {
1052         struct tevent_req *req, *subreq;
1053         struct push_database_old_state *state;
1054         struct ctdb_req_control request;
1055         uint32_t pnn;
1056
1057         req = tevent_req_create(mem_ctx, &state,
1058                                 struct push_database_old_state);
1059         if (req == NULL) {
1060                 return NULL;
1061         }
1062
1063         state->ev = ev;
1064         state->client = client;
1065         state->recdb = recdb;
1066         state->pnn_list = pnn_list;
1067         state->count = count;
1068         state->index = 0;
1069
1070         state->recbuf = recdb_records(recdb, state,
1071                                       ctdb_client_pnn(client));
1072         if (tevent_req_nomem(state->recbuf, req)) {
1073                 return tevent_req_post(req, ev);
1074         }
1075
1076         pnn = state->pnn_list[state->index];
1077
1078         ctdb_req_control_push_db(&request, state->recbuf);
1079         subreq = ctdb_client_control_send(state, ev, client, pnn,
1080                                           TIMEOUT(), &request);
1081         if (tevent_req_nomem(subreq, req)) {
1082                 return tevent_req_post(req, ev);
1083         }
1084         tevent_req_set_callback(subreq, push_database_old_push_done, req);
1085
1086         return req;
1087 }
1088
1089 static void push_database_old_push_done(struct tevent_req *subreq)
1090 {
1091         struct tevent_req *req = tevent_req_callback_data(
1092                 subreq, struct tevent_req);
1093         struct push_database_old_state *state = tevent_req_data(
1094                 req, struct push_database_old_state);
1095         struct ctdb_req_control request;
1096         uint32_t pnn;
1097         int ret;
1098         bool status;
1099
1100         status = ctdb_client_control_recv(subreq, &ret, NULL, NULL);
1101         TALLOC_FREE(subreq);
1102         if (! status) {
1103                 D_ERR("control PUSH_DB failed for db %s on node %u, ret=%d\n",
1104                       recdb_name(state->recdb), state->pnn_list[state->index],
1105                       ret);
1106                 tevent_req_error(req, ret);
1107                 return;
1108         }
1109
1110         state->index += 1;
1111         if (state->index == state->count) {
1112                 TALLOC_FREE(state->recbuf);
1113                 tevent_req_done(req);
1114                 return;
1115         }
1116
1117         pnn = state->pnn_list[state->index];
1118
1119         ctdb_req_control_push_db(&request, state->recbuf);
1120         subreq = ctdb_client_control_send(state, state->ev, state->client,
1121                                           pnn, TIMEOUT(), &request);
1122         if (tevent_req_nomem(subreq, req)) {
1123                 return;
1124         }
1125         tevent_req_set_callback(subreq, push_database_old_push_done, req);
1126 }
1127
1128 static bool push_database_old_recv(struct tevent_req *req, int *perr)
1129 {
1130         return generic_recv(req, perr);
1131 }
1132
1133 /*
1134  * Push database to specified nodes (new style)
1135  */
1136
1137 struct push_database_new_state {
1138         struct tevent_context *ev;
1139         struct ctdb_client_context *client;
1140         struct recdb_context *recdb;
1141         uint32_t *pnn_list;
1142         unsigned int count;
1143         uint64_t srvid;
1144         uint32_t dmaster;
1145         int fd;
1146         int num_buffers;
1147         int num_buffers_sent;
1148         unsigned int num_records;
1149 };
1150
1151 static void push_database_new_started(struct tevent_req *subreq);
1152 static void push_database_new_send_msg(struct tevent_req *req);
1153 static void push_database_new_send_done(struct tevent_req *subreq);
1154 static void push_database_new_confirmed(struct tevent_req *subreq);
1155
1156 static struct tevent_req *push_database_new_send(
1157                         TALLOC_CTX *mem_ctx,
1158                         struct tevent_context *ev,
1159                         struct ctdb_client_context *client,
1160                         uint32_t *pnn_list,
1161                         unsigned int count,
1162                         struct recdb_context *recdb,
1163                         int max_size)
1164 {
1165         struct tevent_req *req, *subreq;
1166         struct push_database_new_state *state;
1167         struct ctdb_req_control request;
1168         struct ctdb_pulldb_ext pulldb_ext;
1169         char *filename;
1170         off_t offset;
1171
1172         req = tevent_req_create(mem_ctx, &state,
1173                                 struct push_database_new_state);
1174         if (req == NULL) {
1175                 return NULL;
1176         }
1177
1178         state->ev = ev;
1179         state->client = client;
1180         state->recdb = recdb;
1181         state->pnn_list = pnn_list;
1182         state->count = count;
1183
1184         state->srvid = srvid_next();
1185         state->dmaster = ctdb_client_pnn(client);
1186         state->num_buffers_sent = 0;
1187         state->num_records = 0;
1188
1189         filename = talloc_asprintf(state, "%s.dat", recdb_path(recdb));
1190         if (tevent_req_nomem(filename, req)) {
1191                 return tevent_req_post(req, ev);
1192         }
1193
1194         state->fd = open(filename, O_RDWR|O_CREAT, 0644);
1195         if (state->fd == -1) {
1196                 tevent_req_error(req, errno);
1197                 return tevent_req_post(req, ev);
1198         }
1199         unlink(filename);
1200         talloc_free(filename);
1201
1202         state->num_buffers = recdb_file(recdb, state, state->dmaster,
1203                                         state->fd, max_size);
1204         if (state->num_buffers == -1) {
1205                 tevent_req_error(req, ENOMEM);
1206                 return tevent_req_post(req, ev);
1207         }
1208
1209         offset = lseek(state->fd, 0, SEEK_SET);
1210         if (offset != 0) {
1211                 tevent_req_error(req, EIO);
1212                 return tevent_req_post(req, ev);
1213         }
1214
1215         pulldb_ext.db_id = recdb_id(recdb);
1216         pulldb_ext.srvid = state->srvid;
1217
1218         ctdb_req_control_db_push_start(&request, &pulldb_ext);
1219         subreq = ctdb_client_control_multi_send(state, ev, client,
1220                                                 pnn_list, count,
1221                                                 TIMEOUT(), &request);
1222         if (tevent_req_nomem(subreq, req)) {
1223                 return tevent_req_post(req, ev);
1224         }
1225         tevent_req_set_callback(subreq, push_database_new_started, req);
1226
1227         return req;
1228 }
1229
1230 static void push_database_new_started(struct tevent_req *subreq)
1231 {
1232         struct tevent_req *req = tevent_req_callback_data(
1233                 subreq, struct tevent_req);
1234         struct push_database_new_state *state = tevent_req_data(
1235                 req, struct push_database_new_state);
1236         int *err_list;
1237         int ret;
1238         bool status;
1239
1240         status = ctdb_client_control_multi_recv(subreq, &ret, state,
1241                                                 &err_list, NULL);
1242         TALLOC_FREE(subreq);
1243         if (! status) {
1244                 int ret2;
1245                 uint32_t pnn;
1246
1247                 ret2 = ctdb_client_control_multi_error(state->pnn_list,
1248                                                        state->count,
1249                                                        err_list, &pnn);
1250                 if (ret2 != 0) {
1251                         D_ERR("control DB_PUSH_START failed for db %s"
1252                               " on node %u, ret=%d\n",
1253                               recdb_name(state->recdb), pnn, ret2);
1254                 } else {
1255                         D_ERR("control DB_PUSH_START failed for db %s,"
1256                               " ret=%d\n",
1257                               recdb_name(state->recdb), ret);
1258                 }
1259                 talloc_free(err_list);
1260
1261                 tevent_req_error(req, ret);
1262                 return;
1263         }
1264
1265         push_database_new_send_msg(req);
1266 }
1267
1268 static void push_database_new_send_msg(struct tevent_req *req)
1269 {
1270         struct push_database_new_state *state = tevent_req_data(
1271                 req, struct push_database_new_state);
1272         struct tevent_req *subreq;
1273         struct ctdb_rec_buffer *recbuf;
1274         struct ctdb_req_message message;
1275         TDB_DATA data;
1276         size_t np;
1277         int ret;
1278
1279         if (state->num_buffers_sent == state->num_buffers) {
1280                 struct ctdb_req_control request;
1281
1282                 ctdb_req_control_db_push_confirm(&request,
1283                                                  recdb_id(state->recdb));
1284                 subreq = ctdb_client_control_multi_send(state, state->ev,
1285                                                         state->client,
1286                                                         state->pnn_list,
1287                                                         state->count,
1288                                                         TIMEOUT(), &request);
1289                 if (tevent_req_nomem(subreq, req)) {
1290                         return;
1291                 }
1292                 tevent_req_set_callback(subreq, push_database_new_confirmed,
1293                                         req);
1294                 return;
1295         }
1296
1297         ret = ctdb_rec_buffer_read(state->fd, state, &recbuf);
1298         if (ret != 0) {
1299                 tevent_req_error(req, ret);
1300                 return;
1301         }
1302
1303         data.dsize = ctdb_rec_buffer_len(recbuf);
1304         data.dptr = talloc_size(state, data.dsize);
1305         if (tevent_req_nomem(data.dptr, req)) {
1306                 return;
1307         }
1308
1309         ctdb_rec_buffer_push(recbuf, data.dptr, &np);
1310
1311         message.srvid = state->srvid;
1312         message.data.data = data;
1313
1314         D_DEBUG("Pushing buffer %d with %d records for db %s\n",
1315                 state->num_buffers_sent, recbuf->count,
1316                 recdb_name(state->recdb));
1317
1318         subreq = ctdb_client_message_multi_send(state, state->ev,
1319                                                 state->client,
1320                                                 state->pnn_list, state->count,
1321                                                 &message);
1322         if (tevent_req_nomem(subreq, req)) {
1323                 return;
1324         }
1325         tevent_req_set_callback(subreq, push_database_new_send_done, req);
1326
1327         state->num_records += recbuf->count;
1328
1329         talloc_free(data.dptr);
1330         talloc_free(recbuf);
1331 }
1332
1333 static void push_database_new_send_done(struct tevent_req *subreq)
1334 {
1335         struct tevent_req *req = tevent_req_callback_data(
1336                 subreq, struct tevent_req);
1337         struct push_database_new_state *state = tevent_req_data(
1338                 req, struct push_database_new_state);
1339         bool status;
1340         int ret;
1341
1342         status = ctdb_client_message_multi_recv(subreq, &ret, NULL, NULL);
1343         TALLOC_FREE(subreq);
1344         if (! status) {
1345                 D_ERR("Sending recovery records failed for %s\n",
1346                       recdb_name(state->recdb));
1347                 tevent_req_error(req, ret);
1348                 return;
1349         }
1350
1351         state->num_buffers_sent += 1;
1352
1353         push_database_new_send_msg(req);
1354 }
1355
1356 static void push_database_new_confirmed(struct tevent_req *subreq)
1357 {
1358         struct tevent_req *req = tevent_req_callback_data(
1359                 subreq, struct tevent_req);
1360         struct push_database_new_state *state = tevent_req_data(
1361                 req, struct push_database_new_state);
1362         struct ctdb_reply_control **reply;
1363         int *err_list;
1364         bool status;
1365         unsigned int i;
1366         int ret;
1367         uint32_t num_records;
1368
1369         status = ctdb_client_control_multi_recv(subreq, &ret, state,
1370                                                 &err_list, &reply);
1371         TALLOC_FREE(subreq);
1372         if (! status) {
1373                 int ret2;
1374                 uint32_t pnn;
1375
1376                 ret2 = ctdb_client_control_multi_error(state->pnn_list,
1377                                                        state->count, err_list,
1378                                                        &pnn);
1379                 if (ret2 != 0) {
1380                         D_ERR("control DB_PUSH_CONFIRM failed for db %s"
1381                               " on node %u, ret=%d\n",
1382                               recdb_name(state->recdb), pnn, ret2);
1383                 } else {
1384                         D_ERR("control DB_PUSH_CONFIRM failed for db %s,"
1385                               " ret=%d\n",
1386                               recdb_name(state->recdb), ret);
1387                 }
1388                 tevent_req_error(req, ret);
1389                 return;
1390         }
1391
1392         for (i=0; i<state->count; i++) {
1393                 ret = ctdb_reply_control_db_push_confirm(reply[i],
1394                                                          &num_records);
1395                 if (ret != 0) {
1396                         tevent_req_error(req, EPROTO);
1397                         return;
1398                 }
1399
1400                 if (num_records != state->num_records) {
1401                         D_ERR("Node %u received %d of %d records for %s\n",
1402                               state->pnn_list[i], num_records,
1403                               state->num_records, recdb_name(state->recdb));
1404                         tevent_req_error(req, EPROTO);
1405                         return;
1406                 }
1407         }
1408
1409         talloc_free(reply);
1410
1411         D_INFO("Pushed %d records for db %s\n",
1412                state->num_records, recdb_name(state->recdb));
1413
1414         tevent_req_done(req);
1415 }
1416
1417 static bool push_database_new_recv(struct tevent_req *req, int *perr)
1418 {
1419         return generic_recv(req, perr);
1420 }
1421
1422 /*
1423  * wrapper for push_database_old and push_database_new
1424  */
1425
1426 struct push_database_state {
1427         bool old_done, new_done;
1428 };
1429
1430 static void push_database_old_done(struct tevent_req *subreq);
1431 static void push_database_new_done(struct tevent_req *subreq);
1432
1433 static struct tevent_req *push_database_send(
1434                         TALLOC_CTX *mem_ctx,
1435                         struct tevent_context *ev,
1436                         struct ctdb_client_context *client,
1437                         struct node_list *nlist,
1438                         struct ctdb_tunable_list *tun_list,
1439                         struct recdb_context *recdb)
1440 {
1441         struct tevent_req *req, *subreq;
1442         struct push_database_state *state;
1443         uint32_t *old_list, *new_list;
1444         unsigned int old_count, new_count;
1445         unsigned int i;
1446
1447         req = tevent_req_create(mem_ctx, &state, struct push_database_state);
1448         if (req == NULL) {
1449                 return NULL;
1450         }
1451
1452         state->old_done = false;
1453         state->new_done = false;
1454
1455         old_count = 0;
1456         new_count = 0;
1457         old_list = talloc_array(state, uint32_t, nlist->count);
1458         new_list = talloc_array(state, uint32_t, nlist->count);
1459         if (tevent_req_nomem(old_list, req) ||
1460             tevent_req_nomem(new_list,req)) {
1461                 return tevent_req_post(req, ev);
1462         }
1463
1464         for (i=0; i<nlist->count; i++) {
1465                 if (nlist->caps[i] & CTDB_CAP_FRAGMENTED_CONTROLS) {
1466                         new_list[new_count] = nlist->pnn_list[i];
1467                         new_count += 1;
1468                 } else {
1469                         old_list[old_count] = nlist->pnn_list[i];
1470                         old_count += 1;
1471                 }
1472         }
1473
1474         if (old_count > 0) {
1475                 subreq = push_database_old_send(state, ev, client,
1476                                                 old_list, old_count, recdb);
1477                 if (tevent_req_nomem(subreq, req)) {
1478                         return tevent_req_post(req, ev);
1479                 }
1480                 tevent_req_set_callback(subreq, push_database_old_done, req);
1481         } else {
1482                 state->old_done = true;
1483         }
1484
1485         if (new_count > 0) {
1486                 subreq = push_database_new_send(state, ev, client,
1487                                                 new_list, new_count, recdb,
1488                                                 tun_list->rec_buffer_size_limit);
1489                 if (tevent_req_nomem(subreq, req)) {
1490                         return tevent_req_post(req, ev);
1491                 }
1492                 tevent_req_set_callback(subreq, push_database_new_done, req);
1493         } else {
1494                 state->new_done = true;
1495         }
1496
1497         return req;
1498 }
1499
1500 static void push_database_old_done(struct tevent_req *subreq)
1501 {
1502         struct tevent_req *req = tevent_req_callback_data(
1503                 subreq, struct tevent_req);
1504         struct push_database_state *state = tevent_req_data(
1505                 req, struct push_database_state);
1506         bool status;
1507         int ret;
1508
1509         status = push_database_old_recv(subreq, &ret);
1510         if (! status) {
1511                 tevent_req_error(req, ret);
1512                 return;
1513         }
1514
1515         state->old_done = true;
1516
1517         if (state->old_done && state->new_done) {
1518                 tevent_req_done(req);
1519         }
1520 }
1521
1522 static void push_database_new_done(struct tevent_req *subreq)
1523 {
1524         struct tevent_req *req = tevent_req_callback_data(
1525                 subreq, struct tevent_req);
1526         struct push_database_state *state = tevent_req_data(
1527                 req, struct push_database_state);
1528         bool status;
1529         int ret;
1530
1531         status = push_database_new_recv(subreq, &ret);
1532         if (! status) {
1533                 tevent_req_error(req, ret);
1534                 return;
1535         }
1536
1537         state->new_done = true;
1538
1539         if (state->old_done && state->new_done) {
1540                 tevent_req_done(req);
1541         }
1542 }
1543
1544 static bool push_database_recv(struct tevent_req *req, int *perr)
1545 {
1546         return generic_recv(req, perr);
1547 }
1548
1549 /*
1550  * Collect databases using highest sequence number
1551  */
1552
1553 struct collect_highseqnum_db_state {
1554         struct tevent_context *ev;
1555         struct ctdb_client_context *client;
1556         struct node_list *nlist;
1557         uint32_t db_id;
1558         struct recdb_context *recdb;
1559
1560         uint32_t max_pnn;
1561 };
1562
1563 static void collect_highseqnum_db_seqnum_done(struct tevent_req *subreq);
1564 static void collect_highseqnum_db_pulldb_done(struct tevent_req *subreq);
1565
1566 static struct tevent_req *collect_highseqnum_db_send(
1567                         TALLOC_CTX *mem_ctx,
1568                         struct tevent_context *ev,
1569                         struct ctdb_client_context *client,
1570                         struct node_list *nlist,
1571                         uint32_t db_id,
1572                         struct recdb_context *recdb)
1573 {
1574         struct tevent_req *req, *subreq;
1575         struct collect_highseqnum_db_state *state;
1576         struct ctdb_req_control request;
1577
1578         req = tevent_req_create(mem_ctx, &state,
1579                                 struct collect_highseqnum_db_state);
1580         if (req == NULL) {
1581                 return NULL;
1582         }
1583
1584         state->ev = ev;
1585         state->client = client;
1586         state->nlist = nlist;
1587         state->db_id = db_id;
1588         state->recdb = recdb;
1589
1590         ctdb_req_control_get_db_seqnum(&request, db_id);
1591         subreq = ctdb_client_control_multi_send(mem_ctx,
1592                                                 ev,
1593                                                 client,
1594                                                 nlist->pnn_list,
1595                                                 nlist->count,
1596                                                 TIMEOUT(),
1597                                                 &request);
1598         if (tevent_req_nomem(subreq, req)) {
1599                 return tevent_req_post(req, ev);
1600         }
1601         tevent_req_set_callback(subreq, collect_highseqnum_db_seqnum_done,
1602                                 req);
1603
1604         return req;
1605 }
1606
1607 static void collect_highseqnum_db_seqnum_done(struct tevent_req *subreq)
1608 {
1609         struct tevent_req *req = tevent_req_callback_data(
1610                 subreq, struct tevent_req);
1611         struct collect_highseqnum_db_state *state = tevent_req_data(
1612                 req, struct collect_highseqnum_db_state);
1613         struct ctdb_reply_control **reply;
1614         int *err_list;
1615         bool status;
1616         unsigned int i;
1617         int ret;
1618         uint64_t seqnum, max_seqnum;
1619         uint32_t max_caps;
1620
1621         status = ctdb_client_control_multi_recv(subreq, &ret, state,
1622                                                 &err_list, &reply);
1623         TALLOC_FREE(subreq);
1624         if (! status) {
1625                 int ret2;
1626                 uint32_t pnn;
1627
1628                 ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
1629                                                        state->nlist->count,
1630                                                        err_list,
1631                                                        &pnn);
1632                 if (ret2 != 0) {
1633                         D_ERR("control GET_DB_SEQNUM failed for db %s"
1634                               " on node %u, ret=%d\n",
1635                               recdb_name(state->recdb), pnn, ret2);
1636                 } else {
1637                         D_ERR("control GET_DB_SEQNUM failed for db %s,"
1638                               " ret=%d\n",
1639                               recdb_name(state->recdb), ret);
1640                 }
1641                 tevent_req_error(req, ret);
1642                 return;
1643         }
1644
1645         max_seqnum = 0;
1646         state->max_pnn = state->nlist->pnn_list[0];
1647         max_caps = state->nlist->caps[0];
1648         for (i=0; i<state->nlist->count; i++) {
1649                 ret = ctdb_reply_control_get_db_seqnum(reply[i], &seqnum);
1650                 if (ret != 0) {
1651                         tevent_req_error(req, EPROTO);
1652                         return;
1653                 }
1654
1655                 if (max_seqnum < seqnum) {
1656                         max_seqnum = seqnum;
1657                         state->max_pnn = state->nlist->pnn_list[i];
1658                         max_caps = state->nlist->caps[i];
1659                 }
1660         }
1661
1662         talloc_free(reply);
1663
1664         D_INFO("Pull persistent db %s from node %d with seqnum 0x%"PRIx64"\n",
1665                recdb_name(state->recdb), state->max_pnn, max_seqnum);
1666
1667         subreq = pull_database_send(state,
1668                                     state->ev,
1669                                     state->client,
1670                                     state->max_pnn,
1671                                     max_caps,
1672                                     state->recdb);
1673         if (tevent_req_nomem(subreq, req)) {
1674                 return;
1675         }
1676         tevent_req_set_callback(subreq, collect_highseqnum_db_pulldb_done,
1677                                 req);
1678 }
1679
1680 static void collect_highseqnum_db_pulldb_done(struct tevent_req *subreq)
1681 {
1682         struct tevent_req *req = tevent_req_callback_data(
1683                 subreq, struct tevent_req);
1684         struct collect_highseqnum_db_state *state = tevent_req_data(
1685                 req, struct collect_highseqnum_db_state);
1686         int ret;
1687         bool status;
1688
1689         status = pull_database_recv(subreq, &ret);
1690         TALLOC_FREE(subreq);
1691         if (! status) {
1692                 node_list_ban_credits(state->nlist, state->max_pnn);
1693                 tevent_req_error(req, ret);
1694                 return;
1695         }
1696
1697         tevent_req_done(req);
1698 }
1699
1700 static bool collect_highseqnum_db_recv(struct tevent_req *req, int *perr)
1701 {
1702         return generic_recv(req, perr);
1703 }
1704
1705 /*
1706  * Collect all databases
1707  */
1708
1709 struct collect_all_db_state {
1710         struct tevent_context *ev;
1711         struct ctdb_client_context *client;
1712         struct node_list *nlist;
1713         uint32_t db_id;
1714         struct recdb_context *recdb;
1715
1716         struct ctdb_pulldb pulldb;
1717         unsigned int index;
1718 };
1719
1720 static void collect_all_db_pulldb_done(struct tevent_req *subreq);
1721
1722 static struct tevent_req *collect_all_db_send(
1723                         TALLOC_CTX *mem_ctx,
1724                         struct tevent_context *ev,
1725                         struct ctdb_client_context *client,
1726                         struct node_list *nlist,
1727                         uint32_t db_id,
1728                         struct recdb_context *recdb)
1729 {
1730         struct tevent_req *req, *subreq;
1731         struct collect_all_db_state *state;
1732
1733         req = tevent_req_create(mem_ctx, &state,
1734                                 struct collect_all_db_state);
1735         if (req == NULL) {
1736                 return NULL;
1737         }
1738
1739         state->ev = ev;
1740         state->client = client;
1741         state->nlist = nlist;
1742         state->db_id = db_id;
1743         state->recdb = recdb;
1744         state->index = 0;
1745
1746         subreq = pull_database_send(state,
1747                                     ev,
1748                                     client,
1749                                     nlist->pnn_list[state->index],
1750                                     nlist->caps[state->index],
1751                                     recdb);
1752         if (tevent_req_nomem(subreq, req)) {
1753                 return tevent_req_post(req, ev);
1754         }
1755         tevent_req_set_callback(subreq, collect_all_db_pulldb_done, req);
1756
1757         return req;
1758 }
1759
1760 static void collect_all_db_pulldb_done(struct tevent_req *subreq)
1761 {
1762         struct tevent_req *req = tevent_req_callback_data(
1763                 subreq, struct tevent_req);
1764         struct collect_all_db_state *state = tevent_req_data(
1765                 req, struct collect_all_db_state);
1766         int ret;
1767         bool status;
1768
1769         status = pull_database_recv(subreq, &ret);
1770         TALLOC_FREE(subreq);
1771         if (! status) {
1772                 node_list_ban_credits(state->nlist,
1773                                       state->nlist->pnn_list[state->index]);
1774                 tevent_req_error(req, ret);
1775                 return;
1776         }
1777
1778         state->index += 1;
1779         if (state->index == state->nlist->count) {
1780                 tevent_req_done(req);
1781                 return;
1782         }
1783
1784         subreq = pull_database_send(state,
1785                                     state->ev,
1786                                     state->client,
1787                                     state->nlist->pnn_list[state->index],
1788                                     state->nlist->caps[state->index],
1789                                     state->recdb);
1790         if (tevent_req_nomem(subreq, req)) {
1791                 return;
1792         }
1793         tevent_req_set_callback(subreq, collect_all_db_pulldb_done, req);
1794 }
1795
1796 static bool collect_all_db_recv(struct tevent_req *req, int *perr)
1797 {
1798         return generic_recv(req, perr);
1799 }
1800
1801
1802 /**
1803  * For each database do the following:
1804  *  - Get DB name from all nodes
1805  *  - Attach database on missing nodes
1806  *  - Get DB path
1807  *  - Freeze database on all nodes
1808  *  - Start transaction on all nodes
1809  *  - Collect database from all nodes
1810  *  - Wipe database on all nodes
1811  *  - Push database to all nodes
1812  *  - Commit transaction on all nodes
1813  *  - Thaw database on all nodes
1814  */
1815
1816 struct recover_db_state {
1817         struct tevent_context *ev;
1818         struct ctdb_client_context *client;
1819         struct ctdb_tunable_list *tun_list;
1820         struct node_list *nlist;
1821         struct db *db;
1822
1823         uint32_t destnode;
1824         struct ctdb_transdb transdb;
1825
1826         const char *db_name, *db_path;
1827         struct recdb_context *recdb;
1828 };
1829
1830 static void recover_db_name_done(struct tevent_req *subreq);
1831 static void recover_db_create_missing_done(struct tevent_req *subreq);
1832 static void recover_db_path_done(struct tevent_req *subreq);
1833 static void recover_db_freeze_done(struct tevent_req *subreq);
1834 static void recover_db_transaction_started(struct tevent_req *subreq);
1835 static void recover_db_collect_done(struct tevent_req *subreq);
1836 static void recover_db_wipedb_done(struct tevent_req *subreq);
1837 static void recover_db_pushdb_done(struct tevent_req *subreq);
1838 static void recover_db_transaction_committed(struct tevent_req *subreq);
1839 static void recover_db_thaw_done(struct tevent_req *subreq);
1840
1841 static struct tevent_req *recover_db_send(TALLOC_CTX *mem_ctx,
1842                                           struct tevent_context *ev,
1843                                           struct ctdb_client_context *client,
1844                                           struct ctdb_tunable_list *tun_list,
1845                                           struct node_list *nlist,
1846                                           uint32_t generation,
1847                                           struct db *db)
1848 {
1849         struct tevent_req *req, *subreq;
1850         struct recover_db_state *state;
1851         struct ctdb_req_control request;
1852
1853         req = tevent_req_create(mem_ctx, &state, struct recover_db_state);
1854         if (req == NULL) {
1855                 return NULL;
1856         }
1857
1858         state->ev = ev;
1859         state->client = client;
1860         state->tun_list = tun_list;
1861         state->nlist = nlist;
1862         state->db = db;
1863
1864         state->destnode = ctdb_client_pnn(client);
1865         state->transdb.db_id = db->db_id;
1866         state->transdb.tid = generation;
1867
1868         ctdb_req_control_get_dbname(&request, db->db_id);
1869         subreq = ctdb_client_control_multi_send(state,
1870                                                 ev,
1871                                                 client,
1872                                                 state->db->pnn_list,
1873                                                 state->db->num_nodes,
1874                                                 TIMEOUT(),
1875                                                 &request);
1876         if (tevent_req_nomem(subreq, req)) {
1877                 return tevent_req_post(req, ev);
1878         }
1879         tevent_req_set_callback(subreq, recover_db_name_done, req);
1880
1881         return req;
1882 }
1883
1884 static void recover_db_name_done(struct tevent_req *subreq)
1885 {
1886         struct tevent_req *req = tevent_req_callback_data(
1887                 subreq, struct tevent_req);
1888         struct recover_db_state *state = tevent_req_data(
1889                 req, struct recover_db_state);
1890         struct ctdb_reply_control **reply;
1891         int *err_list;
1892         unsigned int i;
1893         int ret;
1894         bool status;
1895
1896         status = ctdb_client_control_multi_recv(subreq,
1897                                                 &ret,
1898                                                 state,
1899                                                 &err_list,
1900                                                 &reply);
1901         TALLOC_FREE(subreq);
1902         if (! status) {
1903                 int ret2;
1904                 uint32_t pnn;
1905
1906                 ret2 = ctdb_client_control_multi_error(state->db->pnn_list,
1907                                                        state->db->num_nodes,
1908                                                        err_list,
1909                                                        &pnn);
1910                 if (ret2 != 0) {
1911                         D_ERR("control GET_DBNAME failed on node %u,"
1912                               " ret=%d\n",
1913                               pnn,
1914                               ret2);
1915                 } else {
1916                         D_ERR("control GET_DBNAME failed, ret=%d\n",
1917                               ret);
1918                 }
1919                 tevent_req_error(req, ret);
1920                 return;
1921         }
1922
1923         for (i = 0; i < state->db->num_nodes; i++) {
1924                 const char *db_name;
1925                 uint32_t pnn;
1926
1927                 pnn = state->nlist->pnn_list[i];
1928
1929                 ret = ctdb_reply_control_get_dbname(reply[i],
1930                                                     state,
1931                                                     &db_name);
1932                 if (ret != 0) {
1933                         D_ERR("control GET_DBNAME failed on node %u "
1934                               "for db=0x%x, ret=%d\n",
1935                               pnn,
1936                               state->db->db_id,
1937                               ret);
1938                         tevent_req_error(req, EPROTO);
1939                         return;
1940                 }
1941
1942                 if (state->db_name == NULL) {
1943                         state->db_name = db_name;
1944                         continue;
1945                 }
1946
1947                 if (strcmp(state->db_name, db_name) != 0) {
1948                         D_ERR("Incompatible database name for 0x%"PRIx32" "
1949                               "(%s != %s) on node %"PRIu32"\n",
1950                               state->db->db_id,
1951                               db_name,
1952                               state->db_name,
1953                               pnn);
1954                         node_list_ban_credits(state->nlist, pnn);
1955                         tevent_req_error(req, ret);
1956                         return;
1957                 }
1958         }
1959
1960         talloc_free(reply);
1961
1962         subreq = db_create_missing_send(state,
1963                                         state->ev,
1964                                         state->client,
1965                                         state->nlist,
1966                                         state->db_name,
1967                                         state->db);
1968
1969         if (tevent_req_nomem(subreq, req)) {
1970                 return;
1971         }
1972         tevent_req_set_callback(subreq, recover_db_create_missing_done, req);
1973 }
1974
1975 static void recover_db_create_missing_done(struct tevent_req *subreq)
1976 {
1977         struct tevent_req *req = tevent_req_callback_data(
1978                 subreq, struct tevent_req);
1979         struct recover_db_state *state = tevent_req_data(
1980                 req, struct recover_db_state);
1981         struct ctdb_req_control request;
1982         int ret;
1983         bool status;
1984
1985         /* Could sanity check the db_id here */
1986         status = db_create_missing_recv(subreq, &ret);
1987         TALLOC_FREE(subreq);
1988         if (! status) {
1989                 tevent_req_error(req, ret);
1990                 return;
1991         }
1992
1993         ctdb_req_control_getdbpath(&request, state->db->db_id);
1994         subreq = ctdb_client_control_send(state, state->ev, state->client,
1995                                           state->destnode, TIMEOUT(),
1996                                           &request);
1997         if (tevent_req_nomem(subreq, req)) {
1998                 return;
1999         }
2000         tevent_req_set_callback(subreq, recover_db_path_done, req);
2001 }
2002
2003 static void recover_db_path_done(struct tevent_req *subreq)
2004 {
2005         struct tevent_req *req = tevent_req_callback_data(
2006                 subreq, struct tevent_req);
2007         struct recover_db_state *state = tevent_req_data(
2008                 req, struct recover_db_state);
2009         struct ctdb_reply_control *reply;
2010         struct ctdb_req_control request;
2011         int ret;
2012         bool status;
2013
2014         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
2015         TALLOC_FREE(subreq);
2016         if (! status) {
2017                 D_ERR("control GETDBPATH failed for db %s, ret=%d\n",
2018                       state->db_name, ret);
2019                 tevent_req_error(req, ret);
2020                 return;
2021         }
2022
2023         ret = ctdb_reply_control_getdbpath(reply, state, &state->db_path);
2024         if (ret != 0) {
2025                 D_ERR("control GETDBPATH failed for db %s, ret=%d\n",
2026                       state->db_name, ret);
2027                 tevent_req_error(req, EPROTO);
2028                 return;
2029         }
2030
2031         talloc_free(reply);
2032
2033         ctdb_req_control_db_freeze(&request, state->db->db_id);
2034         subreq = ctdb_client_control_multi_send(state,
2035                                                 state->ev,
2036                                                 state->client,
2037                                                 state->nlist->pnn_list,
2038                                                 state->nlist->count,
2039                                                 TIMEOUT(),
2040                                                 &request);
2041         if (tevent_req_nomem(subreq, req)) {
2042                 return;
2043         }
2044         tevent_req_set_callback(subreq, recover_db_freeze_done, req);
2045 }
2046
2047 static void recover_db_freeze_done(struct tevent_req *subreq)
2048 {
2049         struct tevent_req *req = tevent_req_callback_data(
2050                 subreq, struct tevent_req);
2051         struct recover_db_state *state = tevent_req_data(
2052                 req, struct recover_db_state);
2053         struct ctdb_req_control request;
2054         int *err_list;
2055         int ret;
2056         bool status;
2057
2058         status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
2059                                                 NULL);
2060         TALLOC_FREE(subreq);
2061         if (! status) {
2062                 int ret2;
2063                 uint32_t pnn;
2064
2065                 ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
2066                                                        state->nlist->count,
2067                                                        err_list,
2068                                                        &pnn);
2069                 if (ret2 != 0) {
2070                         D_ERR("control FREEZE_DB failed for db %s"
2071                               " on node %u, ret=%d\n",
2072                               state->db_name, pnn, ret2);
2073
2074                         node_list_ban_credits(state->nlist, pnn);
2075                 } else {
2076                         D_ERR("control FREEZE_DB failed for db %s, ret=%d\n",
2077                               state->db_name, ret);
2078                 }
2079                 tevent_req_error(req, ret);
2080                 return;
2081         }
2082
2083         ctdb_req_control_db_transaction_start(&request, &state->transdb);
2084         subreq = ctdb_client_control_multi_send(state,
2085                                                 state->ev,
2086                                                 state->client,
2087                                                 state->nlist->pnn_list,
2088                                                 state->nlist->count,
2089                                                 TIMEOUT(),
2090                                                 &request);
2091         if (tevent_req_nomem(subreq, req)) {
2092                 return;
2093         }
2094         tevent_req_set_callback(subreq, recover_db_transaction_started, req);
2095 }
2096
2097 static void recover_db_transaction_started(struct tevent_req *subreq)
2098 {
2099         struct tevent_req *req = tevent_req_callback_data(
2100                 subreq, struct tevent_req);
2101         struct recover_db_state *state = tevent_req_data(
2102                 req, struct recover_db_state);
2103         int *err_list;
2104         uint32_t flags;
2105         int ret;
2106         bool status;
2107
2108         status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
2109                                                 NULL);
2110         TALLOC_FREE(subreq);
2111         if (! status) {
2112                 int ret2;
2113                 uint32_t pnn;
2114
2115                 ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
2116                                                        state->nlist->count,
2117                                                        err_list,
2118                                                        &pnn);
2119                 if (ret2 != 0) {
2120                         D_ERR("control TRANSACTION_DB failed for db=%s"
2121                               " on node %u, ret=%d\n",
2122                               state->db_name, pnn, ret2);
2123                 } else {
2124                         D_ERR("control TRANSACTION_DB failed for db=%s,"
2125                               " ret=%d\n", state->db_name, ret);
2126                 }
2127                 tevent_req_error(req, ret);
2128                 return;
2129         }
2130
2131         flags = state->db->db_flags;
2132         state->recdb = recdb_create(state,
2133                                     state->db->db_id,
2134                                     state->db_name,
2135                                     state->db_path,
2136                                     state->tun_list->database_hash_size,
2137                                     flags & CTDB_DB_FLAGS_PERSISTENT);
2138         if (tevent_req_nomem(state->recdb, req)) {
2139                 return;
2140         }
2141
2142         if ((flags & CTDB_DB_FLAGS_PERSISTENT) ||
2143             (flags & CTDB_DB_FLAGS_REPLICATED)) {
2144                 subreq = collect_highseqnum_db_send(state,
2145                                                     state->ev,
2146                                                     state->client,
2147                                                     state->nlist,
2148                                                     state->db->db_id,
2149                                                     state->recdb);
2150         } else {
2151                 subreq = collect_all_db_send(state,
2152                                              state->ev,
2153                                              state->client,
2154                                              state->nlist,
2155                                              state->db->db_id,
2156                                              state->recdb);
2157         }
2158         if (tevent_req_nomem(subreq, req)) {
2159                 return;
2160         }
2161         tevent_req_set_callback(subreq, recover_db_collect_done, req);
2162 }
2163
2164 static void recover_db_collect_done(struct tevent_req *subreq)
2165 {
2166         struct tevent_req *req = tevent_req_callback_data(
2167                 subreq, struct tevent_req);
2168         struct recover_db_state *state = tevent_req_data(
2169                 req, struct recover_db_state);
2170         struct ctdb_req_control request;
2171         int ret;
2172         bool status;
2173
2174         if ((state->db->db_flags & CTDB_DB_FLAGS_PERSISTENT) ||
2175             (state->db->db_flags & CTDB_DB_FLAGS_REPLICATED)) {
2176                 status = collect_highseqnum_db_recv(subreq, &ret);
2177         } else {
2178                 status = collect_all_db_recv(subreq, &ret);
2179         }
2180         TALLOC_FREE(subreq);
2181         if (! status) {
2182                 tevent_req_error(req, ret);
2183                 return;
2184         }
2185
2186         ctdb_req_control_wipe_database(&request, &state->transdb);
2187         subreq = ctdb_client_control_multi_send(state,
2188                                                 state->ev,
2189                                                 state->client,
2190                                                 state->nlist->pnn_list,
2191                                                 state->nlist->count,
2192                                                 TIMEOUT(),
2193                                                 &request);
2194         if (tevent_req_nomem(subreq, req)) {
2195                 return;
2196         }
2197         tevent_req_set_callback(subreq, recover_db_wipedb_done, req);
2198 }
2199
2200 static void recover_db_wipedb_done(struct tevent_req *subreq)
2201 {
2202         struct tevent_req *req = tevent_req_callback_data(
2203                 subreq, struct tevent_req);
2204         struct recover_db_state *state = tevent_req_data(
2205                 req, struct recover_db_state);
2206         int *err_list;
2207         int ret;
2208         bool status;
2209
2210         status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
2211                                                 NULL);
2212         TALLOC_FREE(subreq);
2213         if (! status) {
2214                 int ret2;
2215                 uint32_t pnn;
2216
2217                 ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
2218                                                        state->nlist->count,
2219                                                        err_list,
2220                                                        &pnn);
2221                 if (ret2 != 0) {
2222                         D_ERR("control WIPEDB failed for db %s on node %u,"
2223                               " ret=%d\n", state->db_name, pnn, ret2);
2224                 } else {
2225                         D_ERR("control WIPEDB failed for db %s, ret=%d\n",
2226                               state->db_name, ret);
2227                 }
2228                 tevent_req_error(req, ret);
2229                 return;
2230         }
2231
2232         subreq = push_database_send(state,
2233                                     state->ev,
2234                                     state->client,
2235                                     state->nlist,
2236                                     state->tun_list,
2237                                     state->recdb);
2238         if (tevent_req_nomem(subreq, req)) {
2239                 return;
2240         }
2241         tevent_req_set_callback(subreq, recover_db_pushdb_done, req);
2242 }
2243
2244 static void recover_db_pushdb_done(struct tevent_req *subreq)
2245 {
2246         struct tevent_req *req = tevent_req_callback_data(
2247                 subreq, struct tevent_req);
2248         struct recover_db_state *state = tevent_req_data(
2249                 req, struct recover_db_state);
2250         struct ctdb_req_control request;
2251         int ret;
2252         bool status;
2253
2254         status = push_database_recv(subreq, &ret);
2255         TALLOC_FREE(subreq);
2256         if (! status) {
2257                 tevent_req_error(req, ret);
2258                 return;
2259         }
2260
2261         TALLOC_FREE(state->recdb);
2262
2263         ctdb_req_control_db_transaction_commit(&request, &state->transdb);
2264         subreq = ctdb_client_control_multi_send(state,
2265                                                 state->ev,
2266                                                 state->client,
2267                                                 state->nlist->pnn_list,
2268                                                 state->nlist->count,
2269                                                 TIMEOUT(),
2270                                                 &request);
2271         if (tevent_req_nomem(subreq, req)) {
2272                 return;
2273         }
2274         tevent_req_set_callback(subreq, recover_db_transaction_committed, req);
2275 }
2276
2277 static void recover_db_transaction_committed(struct tevent_req *subreq)
2278 {
2279         struct tevent_req *req = tevent_req_callback_data(
2280                 subreq, struct tevent_req);
2281         struct recover_db_state *state = tevent_req_data(
2282                 req, struct recover_db_state);
2283         struct ctdb_req_control request;
2284         int *err_list;
2285         int ret;
2286         bool status;
2287
2288         status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
2289                                                 NULL);
2290         TALLOC_FREE(subreq);
2291         if (! status) {
2292                 int ret2;
2293                 uint32_t pnn;
2294
2295                 ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
2296                                                        state->nlist->count,
2297                                                        err_list,
2298                                                        &pnn);
2299                 if (ret2 != 0) {
2300                         D_ERR("control DB_TRANSACTION_COMMIT failed for db %s"
2301                               " on node %u, ret=%d\n",
2302                               state->db_name, pnn, ret2);
2303                 } else {
2304                         D_ERR("control DB_TRANSACTION_COMMIT failed for db %s,"
2305                               " ret=%d\n", state->db_name, ret);
2306                 }
2307                 tevent_req_error(req, ret);
2308                 return;
2309         }
2310
2311         ctdb_req_control_db_thaw(&request, state->db->db_id);
2312         subreq = ctdb_client_control_multi_send(state,
2313                                                 state->ev,
2314                                                 state->client,
2315                                                 state->nlist->pnn_list,
2316                                                 state->nlist->count,
2317                                                 TIMEOUT(),
2318                                                 &request);
2319         if (tevent_req_nomem(subreq, req)) {
2320                 return;
2321         }
2322         tevent_req_set_callback(subreq, recover_db_thaw_done, req);
2323 }
2324
2325 static void recover_db_thaw_done(struct tevent_req *subreq)
2326 {
2327         struct tevent_req *req = tevent_req_callback_data(
2328                 subreq, struct tevent_req);
2329         struct recover_db_state *state = tevent_req_data(
2330                 req, struct recover_db_state);
2331         int *err_list;
2332         int ret;
2333         bool status;
2334
2335         status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
2336                                                 NULL);
2337         TALLOC_FREE(subreq);
2338         if (! status) {
2339                 int ret2;
2340                 uint32_t pnn;
2341
2342                 ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
2343                                                        state->nlist->count,
2344                                                        err_list,
2345                                                        &pnn);
2346                 if (ret2 != 0) {
2347                         D_ERR("control DB_THAW failed for db %s on node %u,"
2348                               " ret=%d\n", state->db_name, pnn, ret2);
2349                 } else {
2350                         D_ERR("control DB_THAW failed for db %s, ret=%d\n",
2351                               state->db_name, ret);
2352                 }
2353                 tevent_req_error(req, ret);
2354                 return;
2355         }
2356
2357         tevent_req_done(req);
2358 }
2359
2360 static bool recover_db_recv(struct tevent_req *req)
2361 {
2362         return generic_recv(req, NULL);
2363 }
2364
2365
2366 /*
2367  * Start database recovery for each database
2368  *
2369  * Try to recover each database 5 times before failing recovery.
2370  */
2371
2372 struct db_recovery_state {
2373         struct tevent_context *ev;
2374         struct db_list *dblist;
2375         unsigned int num_replies;
2376         unsigned int num_failed;
2377 };
2378
2379 struct db_recovery_one_state {
2380         struct tevent_req *req;
2381         struct ctdb_client_context *client;
2382         struct db_list *dblist;
2383         struct ctdb_tunable_list *tun_list;
2384         struct node_list *nlist;
2385         uint32_t generation;
2386         struct db *db;
2387         int num_fails;
2388 };
2389
2390 static void db_recovery_one_done(struct tevent_req *subreq);
2391
2392 static struct tevent_req *db_recovery_send(TALLOC_CTX *mem_ctx,
2393                                            struct tevent_context *ev,
2394                                            struct ctdb_client_context *client,
2395                                            struct db_list *dblist,
2396                                            struct ctdb_tunable_list *tun_list,
2397                                            struct node_list *nlist,
2398                                            uint32_t generation)
2399 {
2400         struct tevent_req *req, *subreq;
2401         struct db_recovery_state *state;
2402         struct db *db;
2403
2404         req = tevent_req_create(mem_ctx, &state, struct db_recovery_state);
2405         if (req == NULL) {
2406                 return NULL;
2407         }
2408
2409         state->ev = ev;
2410         state->dblist = dblist;
2411         state->num_replies = 0;
2412         state->num_failed = 0;
2413
2414         if (dblist->num_dbs == 0) {
2415                 tevent_req_done(req);
2416                 return tevent_req_post(req, ev);
2417         }
2418
2419         for (db = dblist->db; db != NULL; db = db->next) {
2420                 struct db_recovery_one_state *substate;
2421
2422                 substate = talloc_zero(state, struct db_recovery_one_state);
2423                 if (tevent_req_nomem(substate, req)) {
2424                         return tevent_req_post(req, ev);
2425                 }
2426
2427                 substate->req = req;
2428                 substate->client = client;
2429                 substate->dblist = dblist;
2430                 substate->tun_list = tun_list;
2431                 substate->nlist = nlist;
2432                 substate->generation = generation;
2433                 substate->db = db;
2434
2435                 subreq = recover_db_send(state,
2436                                          ev,
2437                                          client,
2438                                          tun_list,
2439                                          nlist,
2440                                          generation,
2441                                          substate->db);
2442                 if (tevent_req_nomem(subreq, req)) {
2443                         return tevent_req_post(req, ev);
2444                 }
2445                 tevent_req_set_callback(subreq, db_recovery_one_done,
2446                                         substate);
2447                 D_NOTICE("recover database 0x%08x\n", substate->db->db_id);
2448         }
2449
2450         return req;
2451 }
2452
2453 static void db_recovery_one_done(struct tevent_req *subreq)
2454 {
2455         struct db_recovery_one_state *substate = tevent_req_callback_data(
2456                 subreq, struct db_recovery_one_state);
2457         struct tevent_req *req = substate->req;
2458         struct db_recovery_state *state = tevent_req_data(
2459                 req, struct db_recovery_state);
2460         bool status;
2461
2462         status = recover_db_recv(subreq);
2463         TALLOC_FREE(subreq);
2464
2465         if (status) {
2466                 talloc_free(substate);
2467                 goto done;
2468         }
2469
2470         substate->num_fails += 1;
2471         if (substate->num_fails < NUM_RETRIES) {
2472                 subreq = recover_db_send(state,
2473                                          state->ev,
2474                                          substate->client,
2475                                          substate->tun_list,
2476                                          substate->nlist,
2477                                          substate->generation,
2478                                          substate->db);
2479                 if (tevent_req_nomem(subreq, req)) {
2480                         goto failed;
2481                 }
2482                 tevent_req_set_callback(subreq, db_recovery_one_done, substate);
2483                 D_NOTICE("recover database 0x%08x, attempt %d\n",
2484                          substate->db->db_id, substate->num_fails+1);
2485                 return;
2486         }
2487
2488 failed:
2489         state->num_failed += 1;
2490
2491 done:
2492         state->num_replies += 1;
2493
2494         if (state->num_replies == state->dblist->num_dbs) {
2495                 tevent_req_done(req);
2496         }
2497 }
2498
2499 static bool db_recovery_recv(struct tevent_req *req, unsigned int *count)
2500 {
2501         struct db_recovery_state *state = tevent_req_data(
2502                 req, struct db_recovery_state);
2503         int err;
2504
2505         if (tevent_req_is_unix_error(req, &err)) {
2506                 *count = 0;
2507                 return false;
2508         }
2509
2510         *count = state->num_replies - state->num_failed;
2511
2512         if (state->num_failed > 0) {
2513                 return false;
2514         }
2515
2516         return true;
2517 }
2518
2519 struct ban_node_state {
2520         struct tevent_context *ev;
2521         struct ctdb_client_context *client;
2522         struct ctdb_tunable_list *tun_list;
2523         struct node_list *nlist;
2524         uint32_t destnode;
2525
2526         uint32_t max_pnn;
2527 };
2528
2529 static bool ban_node_check(struct tevent_req *req);
2530 static void ban_node_check_done(struct tevent_req *subreq);
2531 static void ban_node_done(struct tevent_req *subreq);
2532
2533 static struct tevent_req *ban_node_send(TALLOC_CTX *mem_ctx,
2534                                         struct tevent_context *ev,
2535                                         struct ctdb_client_context *client,
2536                                         struct ctdb_tunable_list *tun_list,
2537                                         struct node_list *nlist)
2538 {
2539         struct tevent_req *req;
2540         struct ban_node_state *state;
2541         bool ok;
2542
2543         req = tevent_req_create(mem_ctx, &state, struct ban_node_state);
2544         if (req == NULL) {
2545                 return NULL;
2546         }
2547
2548         state->ev = ev;
2549         state->client = client;
2550         state->tun_list = tun_list;
2551         state->nlist = nlist;
2552         state->destnode = ctdb_client_pnn(client);
2553
2554         /* Bans are not enabled */
2555         if (state->tun_list->enable_bans == 0) {
2556                 D_ERR("Bans are not enabled\n");
2557                 tevent_req_done(req);
2558                 return tevent_req_post(req, ev);
2559         }
2560
2561         ok = ban_node_check(req);
2562         if (!ok) {
2563                 return tevent_req_post(req, ev);
2564         }
2565
2566         return req;
2567 }
2568
2569 static bool ban_node_check(struct tevent_req *req)
2570 {
2571         struct tevent_req *subreq;
2572         struct ban_node_state *state = tevent_req_data(
2573                 req, struct ban_node_state);
2574         struct ctdb_req_control request;
2575         unsigned max_credits = 0, i;
2576
2577         for (i=0; i<state->nlist->count; i++) {
2578                 if (state->nlist->ban_credits[i] > max_credits) {
2579                         state->max_pnn = state->nlist->pnn_list[i];
2580                         max_credits = state->nlist->ban_credits[i];
2581                 }
2582         }
2583
2584         if (max_credits < NUM_RETRIES) {
2585                 tevent_req_done(req);
2586                 return false;
2587         }
2588
2589         ctdb_req_control_get_nodemap(&request);
2590         subreq = ctdb_client_control_send(state,
2591                                           state->ev,
2592                                           state->client,
2593                                           state->max_pnn,
2594                                           TIMEOUT(),
2595                                           &request);
2596         if (tevent_req_nomem(subreq, req)) {
2597                 return false;
2598         }
2599         tevent_req_set_callback(subreq, ban_node_check_done, req);
2600
2601         return true;
2602 }
2603
2604 static void ban_node_check_done(struct tevent_req *subreq)
2605 {
2606         struct tevent_req *req = tevent_req_callback_data(
2607                 subreq, struct tevent_req);
2608         struct ban_node_state *state = tevent_req_data(
2609                 req, struct ban_node_state);
2610         struct ctdb_reply_control *reply;
2611         struct ctdb_node_map *nodemap;
2612         struct ctdb_req_control request;
2613         struct ctdb_ban_state ban;
2614         unsigned int i;
2615         int ret;
2616         bool ok;
2617
2618         ok = ctdb_client_control_recv(subreq, &ret, state, &reply);
2619         TALLOC_FREE(subreq);
2620         if (!ok) {
2621                 D_ERR("control GET_NODEMAP failed to node %u, ret=%d\n",
2622                       state->max_pnn, ret);
2623                 tevent_req_error(req, ret);
2624                 return;
2625         }
2626
2627         ret = ctdb_reply_control_get_nodemap(reply, state, &nodemap);
2628         if (ret != 0) {
2629                 D_ERR("control GET_NODEMAP failed, ret=%d\n", ret);
2630                 tevent_req_error(req, ret);
2631                 return;
2632         }
2633
2634         for (i=0; i<nodemap->num; i++) {
2635                 if (nodemap->node[i].pnn != state->max_pnn) {
2636                         continue;
2637                 }
2638
2639                 /* If the node became inactive, reset ban_credits */
2640                 if (nodemap->node[i].flags & NODE_FLAGS_INACTIVE) {
2641                         unsigned int j;
2642
2643                         for (j=0; j<state->nlist->count; j++) {
2644                                 if (state->nlist->pnn_list[j] ==
2645                                                 state->max_pnn) {
2646                                         state->nlist->ban_credits[j] = 0;
2647                                         break;
2648                                 }
2649                         }
2650                         state->max_pnn = CTDB_UNKNOWN_PNN;
2651                 }
2652         }
2653
2654         talloc_free(nodemap);
2655         talloc_free(reply);
2656
2657         /* If node becames inactive during recovery, pick next */
2658         if (state->max_pnn == CTDB_UNKNOWN_PNN) {
2659                 (void) ban_node_check(req);
2660                 return;
2661         }
2662
2663         ban = (struct ctdb_ban_state) {
2664                 .pnn = state->max_pnn,
2665                 .time = state->tun_list->recovery_ban_period,
2666         };
2667
2668         D_ERR("Banning node %u for %u seconds\n", ban.pnn, ban.time);
2669
2670         ctdb_req_control_set_ban_state(&request, &ban);
2671         subreq = ctdb_client_control_send(state,
2672                                           state->ev,
2673                                           state->client,
2674                                           ban.pnn,
2675                                           TIMEOUT(),
2676                                           &request);
2677         if (tevent_req_nomem(subreq, req)) {
2678                 return;
2679         }
2680         tevent_req_set_callback(subreq, ban_node_done, req);
2681 }
2682
2683 static void ban_node_done(struct tevent_req *subreq)
2684 {
2685         struct tevent_req *req = tevent_req_callback_data(
2686                 subreq, struct tevent_req);
2687         struct node_ban_state *state = tevent_req_data(
2688                 req, struct node_ban_state);
2689         struct ctdb_reply_control *reply;
2690         int ret;
2691         bool status;
2692
2693         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
2694         TALLOC_FREE(subreq);
2695         if (! status) {
2696                 tevent_req_error(req, ret);
2697                 return;
2698         }
2699
2700         ret = ctdb_reply_control_set_ban_state(reply);
2701         if (ret != 0) {
2702                 D_ERR("control SET_BAN_STATE failed, ret=%d\n", ret);
2703                 tevent_req_error(req, ret);
2704                 return;
2705         }
2706
2707         talloc_free(reply);
2708         tevent_req_done(req);
2709 }
2710
2711 static bool ban_node_recv(struct tevent_req *req, int *perr)
2712 {
2713         if (tevent_req_is_unix_error(req, perr)) {
2714                 return false;
2715         }
2716
2717         return true;
2718 }
2719
2720 /*
2721  * Run the parallel database recovery
2722  *
2723  * - Get tunables
2724  * - Get nodemap from all nodes
2725  * - Get capabilities from all nodes
2726  * - Get dbmap
2727  * - Set RECOVERY_ACTIVE
2728  * - Send START_RECOVERY
2729  * - Update vnnmap on all nodes
2730  * - Run database recovery
2731  * - Set RECOVERY_NORMAL
2732  * - Send END_RECOVERY
2733  */
2734
2735 struct recovery_state {
2736         struct tevent_context *ev;
2737         struct ctdb_client_context *client;
2738         uint32_t generation;
2739         uint32_t destnode;
2740         struct node_list *nlist;
2741         struct ctdb_tunable_list *tun_list;
2742         struct ctdb_vnn_map *vnnmap;
2743         struct db_list *dblist;
2744 };
2745
2746 static void recovery_tunables_done(struct tevent_req *subreq);
2747 static void recovery_nodemap_done(struct tevent_req *subreq);
2748 static void recovery_nodemap_verify(struct tevent_req *subreq);
2749 static void recovery_capabilities_done(struct tevent_req *subreq);
2750 static void recovery_dbmap_done(struct tevent_req *subreq);
2751 static void recovery_active_done(struct tevent_req *subreq);
2752 static void recovery_start_recovery_done(struct tevent_req *subreq);
2753 static void recovery_vnnmap_update_done(struct tevent_req *subreq);
2754 static void recovery_db_recovery_done(struct tevent_req *subreq);
2755 static void recovery_failed_done(struct tevent_req *subreq);
2756 static void recovery_normal_done(struct tevent_req *subreq);
2757 static void recovery_end_recovery_done(struct tevent_req *subreq);
2758
2759 static struct tevent_req *recovery_send(TALLOC_CTX *mem_ctx,
2760                                         struct tevent_context *ev,
2761                                         struct ctdb_client_context *client,
2762                                         uint32_t generation)
2763 {
2764         struct tevent_req *req, *subreq;
2765         struct recovery_state *state;
2766         struct ctdb_req_control request;
2767
2768         req = tevent_req_create(mem_ctx, &state, struct recovery_state);
2769         if (req == NULL) {
2770                 return NULL;
2771         }
2772
2773         state->ev = ev;
2774         state->client = client;
2775         state->generation = generation;
2776         state->destnode = ctdb_client_pnn(client);
2777
2778         ctdb_req_control_get_all_tunables(&request);
2779         subreq = ctdb_client_control_send(state, state->ev, state->client,
2780                                           state->destnode, TIMEOUT(),
2781                                           &request);
2782         if (tevent_req_nomem(subreq, req)) {
2783                 return tevent_req_post(req, ev);
2784         }
2785         tevent_req_set_callback(subreq, recovery_tunables_done, req);
2786
2787         return req;
2788 }
2789
2790 static void recovery_tunables_done(struct tevent_req *subreq)
2791 {
2792         struct tevent_req *req = tevent_req_callback_data(
2793                 subreq, struct tevent_req);
2794         struct recovery_state *state = tevent_req_data(
2795                 req, struct recovery_state);
2796         struct ctdb_reply_control *reply;
2797         struct ctdb_req_control request;
2798         int ret;
2799         bool status;
2800
2801         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
2802         TALLOC_FREE(subreq);
2803         if (! status) {
2804                 D_ERR("control GET_ALL_TUNABLES failed, ret=%d\n", ret);
2805                 tevent_req_error(req, ret);
2806                 return;
2807         }
2808
2809         ret = ctdb_reply_control_get_all_tunables(reply, state,
2810                                                   &state->tun_list);
2811         if (ret != 0) {
2812                 D_ERR("control GET_ALL_TUNABLES failed, ret=%d\n", ret);
2813                 tevent_req_error(req, EPROTO);
2814                 return;
2815         }
2816
2817         talloc_free(reply);
2818
2819         recover_timeout = state->tun_list->recover_timeout;
2820
2821         ctdb_req_control_get_nodemap(&request);
2822         subreq = ctdb_client_control_send(state, state->ev, state->client,
2823                                           state->destnode, TIMEOUT(),
2824                                           &request);
2825         if (tevent_req_nomem(subreq, req)) {
2826                 return;
2827         }
2828         tevent_req_set_callback(subreq, recovery_nodemap_done, req);
2829 }
2830
2831 static void recovery_nodemap_done(struct tevent_req *subreq)
2832 {
2833         struct tevent_req *req = tevent_req_callback_data(
2834                 subreq, struct tevent_req);
2835         struct recovery_state *state = tevent_req_data(
2836                 req, struct recovery_state);
2837         struct ctdb_reply_control *reply;
2838         struct ctdb_req_control request;
2839         struct ctdb_node_map *nodemap;
2840         unsigned int i;
2841         bool status;
2842         int ret;
2843
2844         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
2845         TALLOC_FREE(subreq);
2846         if (! status) {
2847                 D_ERR("control GET_NODEMAP failed to node %u, ret=%d\n",
2848                       state->destnode, ret);
2849                 tevent_req_error(req, ret);
2850                 return;
2851         }
2852
2853         ret = ctdb_reply_control_get_nodemap(reply, state, &nodemap);
2854         if (ret != 0) {
2855                 D_ERR("control GET_NODEMAP failed, ret=%d\n", ret);
2856                 tevent_req_error(req, ret);
2857                 return;
2858         }
2859
2860         state->nlist = node_list_init(state, nodemap->num);
2861         if (tevent_req_nomem(state->nlist, req)) {
2862                 return;
2863         }
2864
2865         for (i=0; i<nodemap->num; i++) {
2866                 bool ok;
2867
2868                 if (nodemap->node[i].flags & NODE_FLAGS_DISCONNECTED) {
2869                         continue;
2870                 }
2871
2872                 ok = node_list_add(state->nlist, nodemap->node[i].pnn);
2873                 if (!ok) {
2874                         tevent_req_error(req, EINVAL);
2875                         return;
2876                 }
2877         }
2878
2879         talloc_free(nodemap);
2880         talloc_free(reply);
2881
2882         /* Verify flags by getting local node information from each node */
2883         ctdb_req_control_get_nodemap(&request);
2884         subreq = ctdb_client_control_multi_send(state,
2885                                                 state->ev,
2886                                                 state->client,
2887                                                 state->nlist->pnn_list,
2888                                                 state->nlist->count,
2889                                                 TIMEOUT(),
2890                                                 &request);
2891         if (tevent_req_nomem(subreq, req)) {
2892                 return;
2893         }
2894         tevent_req_set_callback(subreq, recovery_nodemap_verify, req);
2895 }
2896
2897 static void recovery_nodemap_verify(struct tevent_req *subreq)
2898 {
2899         struct tevent_req *req = tevent_req_callback_data(
2900                 subreq, struct tevent_req);
2901         struct recovery_state *state = tevent_req_data(
2902                 req, struct recovery_state);
2903         struct ctdb_req_control request;
2904         struct ctdb_reply_control **reply;
2905         struct node_list *nlist;
2906         unsigned int i;
2907         int *err_list;
2908         int ret;
2909         bool status;
2910
2911         status = ctdb_client_control_multi_recv(subreq,
2912                                                 &ret,
2913                                                 state,
2914                                                 &err_list,
2915                                                 &reply);
2916         TALLOC_FREE(subreq);
2917         if (! status) {
2918                 int ret2;
2919                 uint32_t pnn;
2920
2921                 ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
2922                                                        state->nlist->count,
2923                                                        err_list,
2924                                                        &pnn);
2925                 if (ret2 != 0) {
2926                         D_ERR("control GET_NODEMAP failed on node %u,"
2927                               " ret=%d\n", pnn, ret2);
2928                 } else {
2929                         D_ERR("control GET_NODEMAP failed, ret=%d\n", ret);
2930                 }
2931                 tevent_req_error(req, ret);
2932                 return;
2933         }
2934
2935         nlist = node_list_init(state, state->nlist->size);
2936         if (tevent_req_nomem(nlist, req)) {
2937                 return;
2938         }
2939
2940         for (i=0; i<state->nlist->count; i++) {
2941                 struct ctdb_node_map *nodemap = NULL;
2942                 uint32_t pnn, flags;
2943                 unsigned int j;
2944                 bool ok;
2945
2946                 pnn = state->nlist->pnn_list[i];
2947                 ret = ctdb_reply_control_get_nodemap(reply[i],
2948                                                      state,
2949                                                      &nodemap);
2950                 if (ret != 0) {
2951                         D_ERR("control GET_NODEMAP failed on node %u\n", pnn);
2952                         tevent_req_error(req, EPROTO);
2953                         return;
2954                 }
2955
2956                 flags = NODE_FLAGS_DISCONNECTED;
2957                 for (j=0; j<nodemap->num; j++) {
2958                         if (nodemap->node[j].pnn == pnn) {
2959                                 flags = nodemap->node[j].flags;
2960                                 break;
2961                         }
2962                 }
2963
2964                 TALLOC_FREE(nodemap);
2965
2966                 if (flags & NODE_FLAGS_INACTIVE) {
2967                         continue;
2968                 }
2969
2970                 ok = node_list_add(nlist, pnn);
2971                 if (!ok) {
2972                         tevent_req_error(req, EINVAL);
2973                         return;
2974                 }
2975         }
2976
2977         talloc_free(reply);
2978
2979         talloc_free(state->nlist);
2980         state->nlist = nlist;
2981
2982         ctdb_req_control_get_capabilities(&request);
2983         subreq = ctdb_client_control_multi_send(state,
2984                                                 state->ev,
2985                                                 state->client,
2986                                                 state->nlist->pnn_list,
2987                                                 state->nlist->count,
2988                                                 TIMEOUT(),
2989                                                 &request);
2990         if (tevent_req_nomem(subreq, req)) {
2991                 return;
2992         }
2993         tevent_req_set_callback(subreq, recovery_capabilities_done, req);
2994 }
2995
2996 static void recovery_capabilities_done(struct tevent_req *subreq)
2997 {
2998         struct tevent_req *req = tevent_req_callback_data(
2999                 subreq, struct tevent_req);
3000         struct recovery_state *state = tevent_req_data(
3001                 req, struct recovery_state);
3002         struct ctdb_reply_control **reply;
3003         struct ctdb_req_control request;
3004         int *err_list;
3005         unsigned int i;
3006         int ret;
3007         bool status;
3008
3009         status = ctdb_client_control_multi_recv(subreq, &ret, state, &err_list,
3010                                                 &reply);
3011         TALLOC_FREE(subreq);
3012         if (! status) {
3013                 int ret2;
3014                 uint32_t pnn;
3015
3016                 ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
3017                                                        state->nlist->count,
3018                                                        err_list,
3019                                                        &pnn);
3020                 if (ret2 != 0) {
3021                         D_ERR("control GET_CAPABILITIES failed on node %u,"
3022                               " ret=%d\n", pnn, ret2);
3023                 } else {
3024                         D_ERR("control GET_CAPABILITIES failed, ret=%d\n",
3025                               ret);
3026                 }
3027                 tevent_req_error(req, ret);
3028                 return;
3029         }
3030
3031         for (i=0; i<state->nlist->count; i++) {
3032                 uint32_t caps;
3033
3034                 ret = ctdb_reply_control_get_capabilities(reply[i], &caps);
3035                 if (ret != 0) {
3036                         D_ERR("control GET_CAPABILITIES failed on node %u\n",
3037                               state->nlist->pnn_list[i]);
3038                         tevent_req_error(req, EPROTO);
3039                         return;
3040                 }
3041
3042                 state->nlist->caps[i] = caps;
3043         }
3044
3045         talloc_free(reply);
3046
3047         ctdb_req_control_get_dbmap(&request);
3048         subreq = ctdb_client_control_multi_send(state,
3049                                                 state->ev,
3050                                                 state->client,
3051                                                 state->nlist->pnn_list,
3052                                                 state->nlist->count,
3053                                                 TIMEOUT(),
3054                                                 &request);
3055         if (tevent_req_nomem(subreq, req)) {
3056                 return;
3057         }
3058         tevent_req_set_callback(subreq, recovery_dbmap_done, req);
3059 }
3060
3061 static void recovery_dbmap_done(struct tevent_req *subreq)
3062 {
3063         struct tevent_req *req = tevent_req_callback_data(
3064                 subreq, struct tevent_req);
3065         struct recovery_state *state = tevent_req_data(
3066                 req, struct recovery_state);
3067         struct ctdb_reply_control **reply;
3068         struct ctdb_req_control request;
3069         int *err_list;
3070         unsigned int i, j;
3071         int ret;
3072         bool status;
3073
3074         status = ctdb_client_control_multi_recv(subreq,
3075                                                 &ret,
3076                                                 state,
3077                                                 &err_list,
3078                                                 &reply);
3079         TALLOC_FREE(subreq);
3080         if (! status) {
3081                 int ret2;
3082                 uint32_t pnn;
3083
3084                 ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
3085                                                        state->nlist->count,
3086                                                        err_list,
3087                                                        &pnn);
3088                 if (ret2 != 0) {
3089                         D_ERR("control GET_DBMAP failed on node %u,"
3090                               " ret=%d\n", pnn, ret2);
3091                 } else {
3092                         D_ERR("control GET_DBMAP failed, ret=%d\n",
3093                               ret);
3094                 }
3095                 tevent_req_error(req, ret);
3096                 return;
3097         }
3098
3099         state->dblist = db_list_init(state, state->nlist->count);
3100         if (tevent_req_nomem(state->dblist, req)) {
3101                 D_ERR("memory allocation error\n");
3102                 return;
3103         }
3104
3105         for (i = 0; i < state->nlist->count; i++) {
3106                 struct ctdb_dbid_map *dbmap = NULL;
3107                 uint32_t pnn;
3108
3109                 pnn = state->nlist->pnn_list[i];
3110
3111                 ret = ctdb_reply_control_get_dbmap(reply[i], state, &dbmap);
3112                 if (ret != 0) {
3113                         D_ERR("control GET_DBMAP failed on node %u\n",
3114                               pnn);
3115                         tevent_req_error(req, EPROTO);
3116                         return;
3117                 }
3118
3119                 for (j = 0; j < dbmap->num; j++) {
3120                         ret = db_list_check_and_add(state->dblist,
3121                                                     dbmap->dbs[j].db_id,
3122                                                     dbmap->dbs[j].flags,
3123                                                     pnn);
3124                         if (ret != 0) {
3125                                 D_ERR("failed to add database list entry, "
3126                                       "ret=%d\n",
3127                                       ret);
3128                                 tevent_req_error(req, ret);
3129                                 return;
3130                         }
3131                 }
3132
3133                 TALLOC_FREE(dbmap);
3134         }
3135
3136         ctdb_req_control_set_recmode(&request, CTDB_RECOVERY_ACTIVE);
3137         subreq = ctdb_client_control_multi_send(state,
3138                                                 state->ev,
3139                                                 state->client,
3140                                                 state->nlist->pnn_list,
3141                                                 state->nlist->count,
3142                                                 TIMEOUT(),
3143                                                 &request);
3144         if (tevent_req_nomem(subreq, req)) {
3145                 return;
3146         }
3147         tevent_req_set_callback(subreq, recovery_active_done, req);
3148 }
3149
3150 static void recovery_active_done(struct tevent_req *subreq)
3151 {
3152         struct tevent_req *req = tevent_req_callback_data(
3153                 subreq, struct tevent_req);
3154         struct recovery_state *state = tevent_req_data(
3155                 req, struct recovery_state);
3156         struct ctdb_req_control request;
3157         struct ctdb_vnn_map *vnnmap;
3158         int *err_list;
3159         int ret;
3160         bool status;
3161
3162         status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
3163                                                 NULL);
3164         TALLOC_FREE(subreq);
3165         if (! status) {
3166                 int ret2;
3167                 uint32_t pnn;
3168
3169                 ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
3170                                                        state->nlist->count,
3171                                                        err_list,
3172                                                        &pnn);
3173                 if (ret2 != 0) {
3174                         D_ERR("failed to set recovery mode ACTIVE on node %u,"
3175                               " ret=%d\n", pnn, ret2);
3176                 } else {
3177                         D_ERR("failed to set recovery mode ACTIVE, ret=%d\n",
3178                               ret);
3179                 }
3180                 tevent_req_error(req, ret);
3181                 return;
3182         }
3183
3184         D_ERR("Set recovery mode to ACTIVE\n");
3185
3186         /* Calculate new VNNMAP */
3187         vnnmap = talloc_zero(state, struct ctdb_vnn_map);
3188         if (tevent_req_nomem(vnnmap, req)) {
3189                 return;
3190         }
3191
3192         vnnmap->map = node_list_lmaster(state->nlist, vnnmap, &vnnmap->size);
3193         if (tevent_req_nomem(vnnmap->map, req)) {
3194                 return;
3195         }
3196
3197         if (vnnmap->size == 0) {
3198                 D_WARNING("No active lmasters found. Adding recmaster anyway\n");
3199                 vnnmap->map[0] = state->destnode;
3200                 vnnmap->size = 1;
3201         }
3202
3203         vnnmap->generation = state->generation;
3204
3205         state->vnnmap = vnnmap;
3206
3207         ctdb_req_control_start_recovery(&request);
3208         subreq = ctdb_client_control_multi_send(state,
3209                                                 state->ev,
3210                                                 state->client,
3211                                                 state->nlist->pnn_list,
3212                                                 state->nlist->count,
3213                                                 TIMEOUT(),
3214                                                 &request);
3215         if (tevent_req_nomem(subreq, req)) {
3216                 return;
3217         }
3218         tevent_req_set_callback(subreq, recovery_start_recovery_done, req);
3219 }
3220
3221 static void recovery_start_recovery_done(struct tevent_req *subreq)
3222 {
3223         struct tevent_req *req = tevent_req_callback_data(
3224                 subreq, struct tevent_req);
3225         struct recovery_state *state = tevent_req_data(
3226                 req, struct recovery_state);
3227         struct ctdb_req_control request;
3228         int *err_list;
3229         int ret;
3230         bool status;
3231
3232         status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
3233                                                 NULL);
3234         TALLOC_FREE(subreq);
3235         if (! status) {
3236                 int ret2;
3237                 uint32_t pnn;
3238
3239                 ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
3240                                                        state->nlist->count,
3241                                                        err_list,
3242                                                        &pnn);
3243                 if (ret2 != 0) {
3244                         D_ERR("failed to run start_recovery event on node %u,"
3245                               " ret=%d\n", pnn, ret2);
3246                 } else {
3247                         D_ERR("failed to run start_recovery event, ret=%d\n",
3248                               ret);
3249                 }
3250                 tevent_req_error(req, ret);
3251                 return;
3252         }
3253
3254         D_ERR("start_recovery event finished\n");
3255
3256         ctdb_req_control_setvnnmap(&request, state->vnnmap);
3257         subreq = ctdb_client_control_multi_send(state,
3258                                                 state->ev,
3259                                                 state->client,
3260                                                 state->nlist->pnn_list,
3261                                                 state->nlist->count,
3262                                                 TIMEOUT(),
3263                                                 &request);
3264         if (tevent_req_nomem(subreq, req)) {
3265                 return;
3266         }
3267         tevent_req_set_callback(subreq, recovery_vnnmap_update_done, req);
3268 }
3269
3270 static void recovery_vnnmap_update_done(struct tevent_req *subreq)
3271 {
3272         struct tevent_req *req = tevent_req_callback_data(
3273                 subreq, struct tevent_req);
3274         struct recovery_state *state = tevent_req_data(
3275                 req, struct recovery_state);
3276         int *err_list;
3277         int ret;
3278         bool status;
3279
3280         status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
3281                                                 NULL);
3282         TALLOC_FREE(subreq);
3283         if (! status) {
3284                 int ret2;
3285                 uint32_t pnn;
3286
3287                 ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
3288                                                        state->nlist->count,
3289                                                        err_list,
3290                                                        &pnn);
3291                 if (ret2 != 0) {
3292                         D_ERR("failed to update VNNMAP on node %u, ret=%d\n",
3293                               pnn, ret2);
3294                 } else {
3295                         D_ERR("failed to update VNNMAP, ret=%d\n", ret);
3296                 }
3297                 tevent_req_error(req, ret);
3298                 return;
3299         }
3300
3301         D_NOTICE("updated VNNMAP\n");
3302
3303         subreq = db_recovery_send(state,
3304                                   state->ev,
3305                                   state->client,
3306                                   state->dblist,
3307                                   state->tun_list,
3308                                   state->nlist,
3309                                   state->vnnmap->generation);
3310         if (tevent_req_nomem(subreq, req)) {
3311                 return;
3312         }
3313         tevent_req_set_callback(subreq, recovery_db_recovery_done, req);
3314 }
3315
3316 static void recovery_db_recovery_done(struct tevent_req *subreq)
3317 {
3318         struct tevent_req *req = tevent_req_callback_data(
3319                 subreq, struct tevent_req);
3320         struct recovery_state *state = tevent_req_data(
3321                 req, struct recovery_state);
3322         struct ctdb_req_control request;
3323         bool status;
3324         unsigned int count;
3325
3326         status = db_recovery_recv(subreq, &count);
3327         TALLOC_FREE(subreq);
3328
3329         D_ERR("%d of %d databases recovered\n", count, state->dblist->num_dbs);
3330
3331         if (! status) {
3332                 subreq = ban_node_send(state,
3333                                        state->ev,
3334                                        state->client,
3335                                        state->tun_list,
3336                                        state->nlist);
3337                 if (tevent_req_nomem(subreq, req)) {
3338                         return;
3339                 }
3340                 tevent_req_set_callback(subreq, recovery_failed_done, req);
3341                 return;
3342         }
3343
3344         ctdb_req_control_set_recmode(&request, CTDB_RECOVERY_NORMAL);
3345         subreq = ctdb_client_control_multi_send(state,
3346                                                 state->ev,
3347                                                 state->client,
3348                                                 state->nlist->pnn_list,
3349                                                 state->nlist->count,
3350                                                 TIMEOUT(),
3351                                                 &request);
3352         if (tevent_req_nomem(subreq, req)) {
3353                 return;
3354         }
3355         tevent_req_set_callback(subreq, recovery_normal_done, req);
3356 }
3357
3358 static void recovery_failed_done(struct tevent_req *subreq)
3359 {
3360         struct tevent_req *req = tevent_req_callback_data(
3361                 subreq, struct tevent_req);
3362         int ret;
3363         bool status;
3364
3365         status = ban_node_recv(subreq, &ret);
3366         TALLOC_FREE(subreq);
3367         if (! status) {
3368                 D_ERR("failed to ban node, ret=%d\n", ret);
3369         }
3370
3371         tevent_req_error(req, EIO);
3372 }
3373
3374 static void recovery_normal_done(struct tevent_req *subreq)
3375 {
3376         struct tevent_req *req = tevent_req_callback_data(
3377                 subreq, struct tevent_req);
3378         struct recovery_state *state = tevent_req_data(
3379                 req, struct recovery_state);
3380         struct ctdb_req_control request;
3381         int *err_list;
3382         int ret;
3383         bool status;
3384
3385         status = ctdb_client_control_multi_recv(subreq, &ret, state, &err_list,
3386                                                 NULL);
3387         TALLOC_FREE(subreq);
3388         if (! status) {
3389                 int ret2;
3390                 uint32_t pnn;
3391
3392                 ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
3393                                                        state->nlist->count,
3394                                                        err_list,
3395                                                        &pnn);
3396                 if (ret2 != 0) {
3397                         D_ERR("failed to set recovery mode NORMAL on node %u,"
3398                               " ret=%d\n", pnn, ret2);
3399                 } else {
3400                         D_ERR("failed to set recovery mode NORMAL, ret=%d\n",
3401                               ret);
3402                 }
3403                 tevent_req_error(req, ret);
3404                 return;
3405         }
3406
3407         D_ERR("Set recovery mode to NORMAL\n");
3408
3409         ctdb_req_control_end_recovery(&request);
3410         subreq = ctdb_client_control_multi_send(state,
3411                                                 state->ev,
3412                                                 state->client,
3413                                                 state->nlist->pnn_list,
3414                                                 state->nlist->count,
3415                                                 TIMEOUT(),
3416                                                 &request);
3417         if (tevent_req_nomem(subreq, req)) {
3418                 return;
3419         }
3420         tevent_req_set_callback(subreq, recovery_end_recovery_done, req);
3421 }
3422
3423 static void recovery_end_recovery_done(struct tevent_req *subreq)
3424 {
3425         struct tevent_req *req = tevent_req_callback_data(
3426                 subreq, struct tevent_req);
3427         struct recovery_state *state = tevent_req_data(
3428                 req, struct recovery_state);
3429         int *err_list;
3430         int ret;
3431         bool status;
3432
3433         status = ctdb_client_control_multi_recv(subreq, &ret, state, &err_list,
3434                                                 NULL);
3435         TALLOC_FREE(subreq);
3436         if (! status) {
3437                 int ret2;
3438                 uint32_t pnn;
3439
3440                 ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
3441                                                        state->nlist->count,
3442                                                        err_list,
3443                                                        &pnn);
3444                 if (ret2 != 0) {
3445                         D_ERR("failed to run recovered event on node %u,"
3446                               " ret=%d\n", pnn, ret2);
3447                 } else {
3448                         D_ERR("failed to run recovered event, ret=%d\n", ret);
3449                 }
3450                 tevent_req_error(req, ret);
3451                 return;
3452         }
3453
3454         D_ERR("recovered event finished\n");
3455
3456         tevent_req_done(req);
3457 }
3458
3459 static void recovery_recv(struct tevent_req *req, int *perr)
3460 {
3461         generic_recv(req, perr);
3462 }
3463
3464 static void usage(const char *progname)
3465 {
3466         fprintf(stderr, "\nUsage: %s <output-fd> <ctdb-socket-path> <generation>\n",
3467                 progname);
3468 }
3469
3470
3471 /*
3472  * Arguments - log fd, write fd, socket path, generation
3473  */
3474 int main(int argc, char *argv[])
3475 {
3476         int write_fd;
3477         const char *sockpath;
3478         TALLOC_CTX *mem_ctx = NULL;
3479         struct tevent_context *ev;
3480         struct ctdb_client_context *client;
3481         int ret = 0;
3482         struct tevent_req *req;
3483         uint32_t generation;
3484
3485         if (argc != 4) {
3486                 usage(argv[0]);
3487                 exit(1);
3488         }
3489
3490         write_fd = atoi(argv[1]);
3491         sockpath = argv[2];
3492         generation = (uint32_t)smb_strtoul(argv[3],
3493                                            NULL,
3494                                            0,
3495                                            &ret,
3496                                            SMB_STR_STANDARD);
3497         if (ret != 0) {
3498                 fprintf(stderr, "recovery: unable to initialize generation\n");
3499                 goto failed;
3500         }
3501
3502         mem_ctx = talloc_new(NULL);
3503         if (mem_ctx == NULL) {
3504                 fprintf(stderr, "recovery: talloc_new() failed\n");
3505                 goto failed;
3506         }
3507
3508         ret = logging_init(mem_ctx, NULL, NULL, "ctdb-recovery");
3509         if (ret != 0) {
3510                 fprintf(stderr, "recovery: Unable to initialize logging\n");
3511                 goto failed;
3512         }
3513
3514         ev = tevent_context_init(mem_ctx);
3515         if (ev == NULL) {
3516                 D_ERR("tevent_context_init() failed\n");
3517                 goto failed;
3518         }
3519
3520         ret = ctdb_client_init(mem_ctx, ev, sockpath, &client);
3521         if (ret != 0) {
3522                 D_ERR("ctdb_client_init() failed, ret=%d\n", ret);
3523                 goto failed;
3524         }
3525
3526         req = recovery_send(mem_ctx, ev, client, generation);
3527         if (req == NULL) {
3528                 D_ERR("database_recover_send() failed\n");
3529                 goto failed;
3530         }
3531
3532         if (! tevent_req_poll(req, ev)) {
3533                 D_ERR("tevent_req_poll() failed\n");
3534                 goto failed;
3535         }
3536
3537         recovery_recv(req, &ret);
3538         TALLOC_FREE(req);
3539         if (ret != 0) {
3540                 D_ERR("database recovery failed, ret=%d\n", ret);
3541                 goto failed;
3542         }
3543
3544         sys_write(write_fd, &ret, sizeof(ret));
3545         return 0;
3546
3547 failed:
3548         TALLOC_FREE(mem_ctx);
3549         return 1;
3550 }