ctdb-recoverd: Fold IP allocation house-keeping into IP verification
[vlendec/samba-autobuild/.git] / ctdb / server / ctdb_recoverd.c
1 /* 
2    ctdb recovery daemon
3
4    Copyright (C) Ronnie Sahlberg  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "replace.h"
21 #include "system/filesys.h"
22 #include "system/time.h"
23 #include "system/network.h"
24 #include "system/wait.h"
25
26 #include <popt.h>
27 #include <talloc.h>
28 #include <tevent.h>
29 #include <tdb.h>
30
31 #include "lib/tdb_wrap/tdb_wrap.h"
32 #include "lib/util/dlinklist.h"
33 #include "lib/util/debug.h"
34 #include "lib/util/samba_util.h"
35 #include "lib/util/util_process.h"
36
37 #include "ctdb_private.h"
38 #include "ctdb_client.h"
39
40 #include "common/system.h"
41 #include "common/cmdline.h"
42 #include "common/common.h"
43 #include "common/logging.h"
44
45 #include "ctdb_cluster_mutex.h"
46
47 /* List of SRVID requests that need to be processed */
48 struct srvid_list {
49         struct srvid_list *next, *prev;
50         struct ctdb_srvid_message *request;
51 };
52
53 struct srvid_requests {
54         struct srvid_list *requests;
55 };
56
57 static void srvid_request_reply(struct ctdb_context *ctdb,
58                                 struct ctdb_srvid_message *request,
59                                 TDB_DATA result)
60 {
61         /* Someone that sent srvid==0 does not want a reply */
62         if (request->srvid == 0) {
63                 talloc_free(request);
64                 return;
65         }
66
67         if (ctdb_client_send_message(ctdb, request->pnn, request->srvid,
68                                      result) == 0) {
69                 DEBUG(DEBUG_INFO,("Sent SRVID reply to %u:%llu\n",
70                                   (unsigned)request->pnn,
71                                   (unsigned long long)request->srvid));
72         } else {
73                 DEBUG(DEBUG_ERR,("Failed to send SRVID reply to %u:%llu\n",
74                                  (unsigned)request->pnn,
75                                  (unsigned long long)request->srvid));
76         }
77
78         talloc_free(request);
79 }
80
81 static void srvid_requests_reply(struct ctdb_context *ctdb,
82                                  struct srvid_requests **requests,
83                                  TDB_DATA result)
84 {
85         struct srvid_list *r;
86
87         for (r = (*requests)->requests; r != NULL; r = r->next) {
88                 srvid_request_reply(ctdb, r->request, result);
89         }
90
91         /* Free the list structure... */
92         TALLOC_FREE(*requests);
93 }
94
95 static void srvid_request_add(struct ctdb_context *ctdb,
96                               struct srvid_requests **requests,
97                               struct ctdb_srvid_message *request)
98 {
99         struct srvid_list *t;
100         int32_t ret;
101         TDB_DATA result;
102
103         if (*requests == NULL) {
104                 *requests = talloc_zero(ctdb, struct srvid_requests);
105                 if (*requests == NULL) {
106                         goto nomem;
107                 }
108         }
109
110         t = talloc_zero(*requests, struct srvid_list);
111         if (t == NULL) {
112                 /* If *requests was just allocated above then free it */
113                 if ((*requests)->requests == NULL) {
114                         TALLOC_FREE(*requests);
115                 }
116                 goto nomem;
117         }
118
119         t->request = (struct ctdb_srvid_message *)talloc_steal(t, request);
120         DLIST_ADD((*requests)->requests, t);
121
122         return;
123
124 nomem:
125         /* Failed to add the request to the list.  Send a fail. */
126         DEBUG(DEBUG_ERR, (__location__
127                           " Out of memory, failed to queue SRVID request\n"));
128         ret = -ENOMEM;
129         result.dsize = sizeof(ret);
130         result.dptr = (uint8_t *)&ret;
131         srvid_request_reply(ctdb, request, result);
132 }
133
134 /* An abstraction to allow an operation (takeover runs, recoveries,
135  * ...) to be disabled for a given timeout */
136 struct ctdb_op_state {
137         struct tevent_timer *timer;
138         bool in_progress;
139         const char *name;
140 };
141
142 static struct ctdb_op_state *ctdb_op_init(TALLOC_CTX *mem_ctx, const char *name)
143 {
144         struct ctdb_op_state *state = talloc_zero(mem_ctx, struct ctdb_op_state);
145
146         if (state != NULL) {
147                 state->in_progress = false;
148                 state->name = name;
149         }
150
151         return state;
152 }
153
154 static bool ctdb_op_is_disabled(struct ctdb_op_state *state)
155 {
156         return state->timer != NULL;
157 }
158
159 static bool ctdb_op_begin(struct ctdb_op_state *state)
160 {
161         if (ctdb_op_is_disabled(state)) {
162                 DEBUG(DEBUG_NOTICE,
163                       ("Unable to begin - %s are disabled\n", state->name));
164                 return false;
165         }
166
167         state->in_progress = true;
168         return true;
169 }
170
171 static bool ctdb_op_end(struct ctdb_op_state *state)
172 {
173         return state->in_progress = false;
174 }
175
176 static bool ctdb_op_is_in_progress(struct ctdb_op_state *state)
177 {
178         return state->in_progress;
179 }
180
181 static void ctdb_op_enable(struct ctdb_op_state *state)
182 {
183         TALLOC_FREE(state->timer);
184 }
185
186 static void ctdb_op_timeout_handler(struct tevent_context *ev,
187                                     struct tevent_timer *te,
188                                     struct timeval yt, void *p)
189 {
190         struct ctdb_op_state *state =
191                 talloc_get_type(p, struct ctdb_op_state);
192
193         DEBUG(DEBUG_NOTICE,("Reenabling %s after timeout\n", state->name));
194         ctdb_op_enable(state);
195 }
196
197 static int ctdb_op_disable(struct ctdb_op_state *state,
198                            struct tevent_context *ev,
199                            uint32_t timeout)
200 {
201         if (timeout == 0) {
202                 DEBUG(DEBUG_NOTICE,("Reenabling %s\n", state->name));
203                 ctdb_op_enable(state);
204                 return 0;
205         }
206
207         if (state->in_progress) {
208                 DEBUG(DEBUG_ERR,
209                       ("Unable to disable %s - in progress\n", state->name));
210                 return -EAGAIN;
211         }
212
213         DEBUG(DEBUG_NOTICE,("Disabling %s for %u seconds\n",
214                             state->name, timeout));
215
216         /* Clear any old timers */
217         talloc_free(state->timer);
218
219         /* Arrange for the timeout to occur */
220         state->timer = tevent_add_timer(ev, state,
221                                         timeval_current_ofs(timeout, 0),
222                                         ctdb_op_timeout_handler, state);
223         if (state->timer == NULL) {
224                 DEBUG(DEBUG_ERR,(__location__ " Unable to setup timer\n"));
225                 return -ENOMEM;
226         }
227
228         return 0;
229 }
230
231 struct ctdb_banning_state {
232         uint32_t count;
233         struct timeval last_reported_time;
234 };
235
236 /*
237   private state of recovery daemon
238  */
239 struct ctdb_recoverd {
240         struct ctdb_context *ctdb;
241         uint32_t recmaster;
242         uint32_t last_culprit_node;
243         struct ctdb_node_map_old *nodemap;
244         struct timeval priority_time;
245         bool need_takeover_run;
246         bool need_recovery;
247         uint32_t node_flags;
248         struct tevent_timer *send_election_te;
249         struct tevent_timer *election_timeout;
250         struct srvid_requests *reallocate_requests;
251         struct ctdb_op_state *takeover_run;
252         struct ctdb_op_state *recovery;
253         struct ctdb_iface_list_old *ifaces;
254         uint32_t *force_rebalance_nodes;
255         struct ctdb_node_capabilities *caps;
256 };
257
258 #define CONTROL_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_timeout, 0)
259 #define MONITOR_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_interval, 0)
260
261 static void ctdb_restart_recd(struct tevent_context *ev,
262                               struct tevent_timer *te, struct timeval t,
263                               void *private_data);
264
265 /*
266   ban a node for a period of time
267  */
268 static void ctdb_ban_node(struct ctdb_recoverd *rec, uint32_t pnn, uint32_t ban_time)
269 {
270         int ret;
271         struct ctdb_context *ctdb = rec->ctdb;
272         struct ctdb_ban_state bantime;
273
274         if (!ctdb_validate_pnn(ctdb, pnn)) {
275                 DEBUG(DEBUG_ERR,("Bad pnn %u in ctdb_ban_node\n", pnn));
276                 return;
277         }
278
279         DEBUG(DEBUG_NOTICE,("Banning node %u for %u seconds\n", pnn, ban_time));
280
281         bantime.pnn  = pnn;
282         bantime.time = ban_time;
283
284         ret = ctdb_ctrl_set_ban(ctdb, CONTROL_TIMEOUT(), pnn, &bantime);
285         if (ret != 0) {
286                 DEBUG(DEBUG_ERR,(__location__ " Failed to ban node %d\n", pnn));
287                 return;
288         }
289
290 }
291
292 enum monitor_result { MONITOR_OK, MONITOR_RECOVERY_NEEDED, MONITOR_ELECTION_NEEDED, MONITOR_FAILED};
293
294
295 /*
296   remember the trouble maker
297  */
298 static void ctdb_set_culprit_count(struct ctdb_recoverd *rec, uint32_t culprit, uint32_t count)
299 {
300         struct ctdb_context *ctdb = talloc_get_type(rec->ctdb, struct ctdb_context);
301         struct ctdb_banning_state *ban_state;
302
303         if (culprit > ctdb->num_nodes) {
304                 DEBUG(DEBUG_ERR,("Trying to set culprit %d but num_nodes is %d\n", culprit, ctdb->num_nodes));
305                 return;
306         }
307
308         /* If we are banned or stopped, do not set other nodes as culprits */
309         if (rec->node_flags & NODE_FLAGS_INACTIVE) {
310                 DEBUG(DEBUG_NOTICE, ("This node is INACTIVE, cannot set culprit node %d\n", culprit));
311                 return;
312         }
313
314         if (ctdb->nodes[culprit]->ban_state == NULL) {
315                 ctdb->nodes[culprit]->ban_state = talloc_zero(ctdb->nodes[culprit], struct ctdb_banning_state);
316                 CTDB_NO_MEMORY_VOID(ctdb, ctdb->nodes[culprit]->ban_state);
317
318                 
319         }
320         ban_state = ctdb->nodes[culprit]->ban_state;
321         if (timeval_elapsed(&ban_state->last_reported_time) > ctdb->tunable.recovery_grace_period) {
322                 /* this was the first time in a long while this node
323                    misbehaved so we will forgive any old transgressions.
324                 */
325                 ban_state->count = 0;
326         }
327
328         ban_state->count += count;
329         ban_state->last_reported_time = timeval_current();
330         rec->last_culprit_node = culprit;
331 }
332
333 /*
334   remember the trouble maker
335  */
336 static void ctdb_set_culprit(struct ctdb_recoverd *rec, uint32_t culprit)
337 {
338         ctdb_set_culprit_count(rec, culprit, 1);
339 }
340
341
342 /* this callback is called for every node that failed to execute the
343    recovered event
344 */
345 static void recovered_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
346 {
347         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
348
349         DEBUG(DEBUG_ERR, (__location__ " Node %u failed the recovered event. Setting it as recovery fail culprit\n", node_pnn));
350
351         ctdb_set_culprit(rec, node_pnn);
352 }
353
354 /*
355   run the "recovered" eventscript on all nodes
356  */
357 static int run_recovered_eventscript(struct ctdb_recoverd *rec, struct ctdb_node_map_old *nodemap, const char *caller)
358 {
359         TALLOC_CTX *tmp_ctx;
360         uint32_t *nodes;
361         struct ctdb_context *ctdb = rec->ctdb;
362
363         tmp_ctx = talloc_new(ctdb);
364         CTDB_NO_MEMORY(ctdb, tmp_ctx);
365
366         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
367         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_END_RECOVERY,
368                                         nodes, 0,
369                                         CONTROL_TIMEOUT(), false, tdb_null,
370                                         NULL, recovered_fail_callback,
371                                         rec) != 0) {
372                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event when called from %s\n", caller));
373
374                 talloc_free(tmp_ctx);
375                 return -1;
376         }
377
378         talloc_free(tmp_ctx);
379         return 0;
380 }
381
382 /* this callback is called for every node that failed to execute the
383    start recovery event
384 */
385 static void startrecovery_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
386 {
387         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
388
389         DEBUG(DEBUG_ERR, (__location__ " Node %u failed the startrecovery event. Setting it as recovery fail culprit\n", node_pnn));
390
391         ctdb_set_culprit(rec, node_pnn);
392 }
393
394 /*
395   run the "startrecovery" eventscript on all nodes
396  */
397 static int run_startrecovery_eventscript(struct ctdb_recoverd *rec, struct ctdb_node_map_old *nodemap)
398 {
399         TALLOC_CTX *tmp_ctx;
400         uint32_t *nodes;
401         struct ctdb_context *ctdb = rec->ctdb;
402
403         tmp_ctx = talloc_new(ctdb);
404         CTDB_NO_MEMORY(ctdb, tmp_ctx);
405
406         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
407         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_START_RECOVERY,
408                                         nodes, 0,
409                                         CONTROL_TIMEOUT(), false, tdb_null,
410                                         NULL,
411                                         startrecovery_fail_callback,
412                                         rec) != 0) {
413                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event. Recovery failed.\n"));
414                 talloc_free(tmp_ctx);
415                 return -1;
416         }
417
418         talloc_free(tmp_ctx);
419         return 0;
420 }
421
422 /*
423   Retrieve capabilities from all connected nodes
424  */
425 static int update_capabilities(struct ctdb_recoverd *rec,
426                                struct ctdb_node_map_old *nodemap)
427 {
428         uint32_t *capp;
429         TALLOC_CTX *tmp_ctx;
430         struct ctdb_node_capabilities *caps;
431         struct ctdb_context *ctdb = rec->ctdb;
432
433         tmp_ctx = talloc_new(rec);
434         CTDB_NO_MEMORY(ctdb, tmp_ctx);
435
436         caps = ctdb_get_capabilities(ctdb, tmp_ctx,
437                                      CONTROL_TIMEOUT(), nodemap);
438
439         if (caps == NULL) {
440                 DEBUG(DEBUG_ERR,
441                       (__location__ " Failed to get node capabilities\n"));
442                 talloc_free(tmp_ctx);
443                 return -1;
444         }
445
446         capp = ctdb_get_node_capabilities(caps, ctdb_get_pnn(ctdb));
447         if (capp == NULL) {
448                 DEBUG(DEBUG_ERR,
449                       (__location__
450                        " Capabilities don't include current node.\n"));
451                 talloc_free(tmp_ctx);
452                 return -1;
453         }
454         ctdb->capabilities = *capp;
455
456         TALLOC_FREE(rec->caps);
457         rec->caps = talloc_steal(rec, caps);
458
459         talloc_free(tmp_ctx);
460         return 0;
461 }
462
463 static void set_recmode_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
464 {
465         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
466
467         DEBUG(DEBUG_ERR,("Failed to freeze node %u during recovery. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
468         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
469 }
470
471 static void transaction_start_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
472 {
473         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
474
475         DEBUG(DEBUG_ERR,("Failed to start recovery transaction on node %u. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
476         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
477 }
478
479 /*
480   change recovery mode on all nodes
481  */
482 static int set_recovery_mode(struct ctdb_context *ctdb,
483                              struct ctdb_recoverd *rec,
484                              struct ctdb_node_map_old *nodemap,
485                              uint32_t rec_mode, bool freeze)
486 {
487         TDB_DATA data;
488         uint32_t *nodes;
489         TALLOC_CTX *tmp_ctx;
490
491         tmp_ctx = talloc_new(ctdb);
492         CTDB_NO_MEMORY(ctdb, tmp_ctx);
493
494         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
495
496         data.dsize = sizeof(uint32_t);
497         data.dptr = (unsigned char *)&rec_mode;
498
499         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMODE,
500                                         nodes, 0,
501                                         CONTROL_TIMEOUT(),
502                                         false, data,
503                                         NULL, NULL,
504                                         NULL) != 0) {
505                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode. Recovery failed.\n"));
506                 talloc_free(tmp_ctx);
507                 return -1;
508         }
509
510         /* freeze all nodes */
511         if (freeze && rec_mode == CTDB_RECOVERY_ACTIVE) {
512                 int i;
513
514                 for (i=1; i<=NUM_DB_PRIORITIES; i++) {
515                         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
516                                                 nodes, i,
517                                                 CONTROL_TIMEOUT(),
518                                                 false, tdb_null,
519                                                 NULL,
520                                                 set_recmode_fail_callback,
521                                                 rec) != 0) {
522                                 DEBUG(DEBUG_ERR, (__location__ " Unable to freeze nodes. Recovery failed.\n"));
523                                 talloc_free(tmp_ctx);
524                                 return -1;
525                         }
526                 }
527         }
528
529         talloc_free(tmp_ctx);
530         return 0;
531 }
532
533 /* update all remote nodes to use the same db priority that we have
534    this can fail if the remove node has not yet been upgraded to 
535    support this function, so we always return success and never fail
536    a recovery if this call fails.
537 */
538 static int update_db_priority_on_remote_nodes(struct ctdb_context *ctdb,
539         struct ctdb_node_map_old *nodemap, 
540         uint32_t pnn, struct ctdb_dbid_map_old *dbmap, TALLOC_CTX *mem_ctx)
541 {
542         int db;
543
544         /* step through all local databases */
545         for (db=0; db<dbmap->num;db++) {
546                 struct ctdb_db_priority db_prio;
547                 int ret;
548
549                 db_prio.db_id     = dbmap->dbs[db].db_id;
550                 ret = ctdb_ctrl_get_db_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, dbmap->dbs[db].db_id, &db_prio.priority);
551                 if (ret != 0) {
552                         DEBUG(DEBUG_ERR,(__location__ " Failed to read database priority from local node for db 0x%08x\n", dbmap->dbs[db].db_id));
553                         continue;
554                 }
555
556                 DEBUG(DEBUG_INFO,("Update DB priority for db 0x%08x to %u\n", dbmap->dbs[db].db_id, db_prio.priority)); 
557
558                 ret = ctdb_ctrl_set_db_priority(ctdb, CONTROL_TIMEOUT(),
559                                                 CTDB_CURRENT_NODE, &db_prio);
560                 if (ret != 0) {
561                         DEBUG(DEBUG_ERR,(__location__ " Failed to set DB priority for 0x%08x\n",
562                                          db_prio.db_id));
563                 }
564         }
565
566         return 0;
567 }                       
568
569 /*
570   ensure all other nodes have attached to any databases that we have
571  */
572 static int create_missing_remote_databases(struct ctdb_context *ctdb, struct ctdb_node_map_old *nodemap, 
573                                            uint32_t pnn, struct ctdb_dbid_map_old *dbmap, TALLOC_CTX *mem_ctx)
574 {
575         int i, j, db, ret;
576         struct ctdb_dbid_map_old *remote_dbmap;
577
578         /* verify that all other nodes have all our databases */
579         for (j=0; j<nodemap->num; j++) {
580                 /* we don't need to ourself ourselves */
581                 if (nodemap->nodes[j].pnn == pnn) {
582                         continue;
583                 }
584                 /* don't check nodes that are unavailable */
585                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
586                         continue;
587                 }
588
589                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
590                                          mem_ctx, &remote_dbmap);
591                 if (ret != 0) {
592                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
593                         return -1;
594                 }
595
596                 /* step through all local databases */
597                 for (db=0; db<dbmap->num;db++) {
598                         const char *name;
599
600
601                         for (i=0;i<remote_dbmap->num;i++) {
602                                 if (dbmap->dbs[db].db_id == remote_dbmap->dbs[i].db_id) {
603                                         break;
604                                 }
605                         }
606                         /* the remote node already have this database */
607                         if (i!=remote_dbmap->num) {
608                                 continue;
609                         }
610                         /* ok so we need to create this database */
611                         ret = ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), pnn,
612                                                   dbmap->dbs[db].db_id, mem_ctx,
613                                                   &name);
614                         if (ret != 0) {
615                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", pnn));
616                                 return -1;
617                         }
618                         ret = ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(),
619                                                  nodemap->nodes[j].pnn,
620                                                  mem_ctx, name,
621                                                  dbmap->dbs[db].flags & CTDB_DB_FLAGS_PERSISTENT);
622                         if (ret != 0) {
623                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create remote db:%s\n", name));
624                                 return -1;
625                         }
626                 }
627         }
628
629         return 0;
630 }
631
632
633 /*
634   ensure we are attached to any databases that anyone else is attached to
635  */
636 static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb_node_map_old *nodemap, 
637                                           uint32_t pnn, struct ctdb_dbid_map_old **dbmap, TALLOC_CTX *mem_ctx)
638 {
639         int i, j, db, ret;
640         struct ctdb_dbid_map_old *remote_dbmap;
641
642         /* verify that we have all database any other node has */
643         for (j=0; j<nodemap->num; j++) {
644                 /* we don't need to ourself ourselves */
645                 if (nodemap->nodes[j].pnn == pnn) {
646                         continue;
647                 }
648                 /* don't check nodes that are unavailable */
649                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
650                         continue;
651                 }
652
653                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
654                                          mem_ctx, &remote_dbmap);
655                 if (ret != 0) {
656                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
657                         return -1;
658                 }
659
660                 /* step through all databases on the remote node */
661                 for (db=0; db<remote_dbmap->num;db++) {
662                         const char *name;
663
664                         for (i=0;i<(*dbmap)->num;i++) {
665                                 if (remote_dbmap->dbs[db].db_id == (*dbmap)->dbs[i].db_id) {
666                                         break;
667                                 }
668                         }
669                         /* we already have this db locally */
670                         if (i!=(*dbmap)->num) {
671                                 continue;
672                         }
673                         /* ok so we need to create this database and
674                            rebuild dbmap
675                          */
676                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
677                                             remote_dbmap->dbs[db].db_id, mem_ctx, &name);
678                         if (ret != 0) {
679                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", 
680                                           nodemap->nodes[j].pnn));
681                                 return -1;
682                         }
683                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, name, 
684                                            remote_dbmap->dbs[db].flags & CTDB_DB_FLAGS_PERSISTENT);
685                         if (ret != 0) {
686                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create local db:%s\n", name));
687                                 return -1;
688                         }
689                         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, dbmap);
690                         if (ret != 0) {
691                                 DEBUG(DEBUG_ERR, (__location__ " Unable to reread dbmap on node %u\n", pnn));
692                                 return -1;
693                         }
694                 }
695         }
696
697         return 0;
698 }
699
700
701 /*
702   pull the remote database contents from one node into the recdb
703  */
704 static int pull_one_remote_database(struct ctdb_context *ctdb, uint32_t srcnode,
705                                     struct tdb_wrap *recdb, uint32_t dbid)
706 {
707         int ret;
708         TDB_DATA outdata;
709         struct ctdb_marshall_buffer *reply;
710         struct ctdb_rec_data_old *recdata;
711         int i;
712         TALLOC_CTX *tmp_ctx = talloc_new(recdb);
713
714         ret = ctdb_ctrl_pulldb(ctdb, srcnode, dbid, CTDB_LMASTER_ANY, tmp_ctx,
715                                CONTROL_TIMEOUT(), &outdata);
716         if (ret != 0) {
717                 DEBUG(DEBUG_ERR,(__location__ " Unable to copy db from node %u\n", srcnode));
718                 talloc_free(tmp_ctx);
719                 return -1;
720         }
721
722         reply = (struct ctdb_marshall_buffer *)outdata.dptr;
723
724         if (outdata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
725                 DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
726                 talloc_free(tmp_ctx);
727                 return -1;
728         }
729
730         recdata = (struct ctdb_rec_data_old *)&reply->data[0];
731
732         for (i=0;
733              i<reply->count;
734              recdata = (struct ctdb_rec_data_old *)(recdata->length + (uint8_t *)recdata), i++) {
735                 TDB_DATA key, data;
736                 struct ctdb_ltdb_header *hdr;
737                 TDB_DATA existing;
738
739                 key.dptr = &recdata->data[0];
740                 key.dsize = recdata->keylen;
741                 data.dptr = &recdata->data[key.dsize];
742                 data.dsize = recdata->datalen;
743
744                 hdr = (struct ctdb_ltdb_header *)data.dptr;
745
746                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
747                         DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
748                         talloc_free(tmp_ctx);
749                         return -1;
750                 }
751
752                 /* fetch the existing record, if any */
753                 existing = tdb_fetch(recdb->tdb, key);
754
755                 if (existing.dptr != NULL) {
756                         struct ctdb_ltdb_header header;
757                         if (existing.dsize < sizeof(struct ctdb_ltdb_header)) {
758                                 DEBUG(DEBUG_CRIT,(__location__ " Bad record size %u from node %u\n",
759                                          (unsigned)existing.dsize, srcnode));
760                                 free(existing.dptr);
761                                 talloc_free(tmp_ctx);
762                                 return -1;
763                         }
764                         header = *(struct ctdb_ltdb_header *)existing.dptr;
765                         free(existing.dptr);
766                         if (!(header.rsn < hdr->rsn ||
767                               (header.dmaster != ctdb_get_pnn(ctdb) &&
768                                header.rsn == hdr->rsn))) {
769                                 continue;
770                         }
771                 }
772
773                 if (tdb_store(recdb->tdb, key, data, TDB_REPLACE) != 0) {
774                         DEBUG(DEBUG_CRIT,(__location__ " Failed to store record\n"));
775                         talloc_free(tmp_ctx);
776                         return -1;
777                 }
778         }
779
780         talloc_free(tmp_ctx);
781
782         return 0;
783 }
784
785
786 struct pull_seqnum_cbdata {
787         int failed;
788         uint32_t pnn;
789         uint64_t seqnum;
790 };
791
792 static void pull_seqnum_cb(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
793 {
794         struct pull_seqnum_cbdata *cb_data = talloc_get_type(callback_data, struct pull_seqnum_cbdata);
795         uint64_t seqnum;
796
797         if (cb_data->failed != 0) {
798                 DEBUG(DEBUG_ERR, ("Got seqnum from node %d but we have already failed the entire operation\n", node_pnn));
799                 return;
800         }
801
802         if (res != 0) {
803                 DEBUG(DEBUG_ERR, ("Error when pulling seqnum from node %d\n", node_pnn));
804                 cb_data->failed = 1;
805                 return;
806         }
807
808         if (outdata.dsize != sizeof(uint64_t)) {
809                 DEBUG(DEBUG_ERR, ("Error when reading pull seqnum from node %d, got %d bytes but expected %d\n", node_pnn, (int)outdata.dsize, (int)sizeof(uint64_t)));
810                 cb_data->failed = -1;
811                 return;
812         }
813
814         seqnum = *((uint64_t *)outdata.dptr);
815
816         if (seqnum > cb_data->seqnum ||
817             (cb_data->pnn == -1 && seqnum == 0)) {
818                 cb_data->seqnum = seqnum;
819                 cb_data->pnn = node_pnn;
820         }
821 }
822
823 static void pull_seqnum_fail_cb(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
824 {
825         struct pull_seqnum_cbdata *cb_data = talloc_get_type(callback_data, struct pull_seqnum_cbdata);
826
827         DEBUG(DEBUG_ERR, ("Failed to pull db seqnum from node %d\n", node_pnn));
828         cb_data->failed = 1;
829 }
830
831 static int pull_highest_seqnum_pdb(struct ctdb_context *ctdb,
832                                 struct ctdb_recoverd *rec, 
833                                 struct ctdb_node_map_old *nodemap, 
834                                 struct tdb_wrap *recdb, uint32_t dbid)
835 {
836         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
837         uint32_t *nodes;
838         TDB_DATA data;
839         uint32_t outdata[2];
840         struct pull_seqnum_cbdata *cb_data;
841
842         DEBUG(DEBUG_NOTICE, ("Scan for highest seqnum pdb for db:0x%08x\n", dbid));
843
844         outdata[0] = dbid;
845         outdata[1] = 0;
846
847         data.dsize = sizeof(outdata);
848         data.dptr  = (uint8_t *)&outdata[0];
849
850         cb_data = talloc(tmp_ctx, struct pull_seqnum_cbdata);
851         if (cb_data == NULL) {
852                 DEBUG(DEBUG_ERR, ("Failed to allocate pull highest seqnum cb_data structure\n"));
853                 talloc_free(tmp_ctx);
854                 return -1;
855         }
856
857         cb_data->failed = 0;
858         cb_data->pnn    = -1;
859         cb_data->seqnum = 0;
860         
861         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
862         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_DB_SEQNUM,
863                                         nodes, 0,
864                                         CONTROL_TIMEOUT(), false, data,
865                                         pull_seqnum_cb,
866                                         pull_seqnum_fail_cb,
867                                         cb_data) != 0) {
868                 DEBUG(DEBUG_ERR, (__location__ " Failed to run async GET_DB_SEQNUM\n"));
869
870                 talloc_free(tmp_ctx);
871                 return -1;
872         }
873
874         if (cb_data->failed != 0) {
875                 DEBUG(DEBUG_NOTICE, ("Failed to pull sequence numbers for DB 0x%08x\n", dbid));
876                 talloc_free(tmp_ctx);
877                 return -1;
878         }
879
880         if (cb_data->pnn == -1) {
881                 DEBUG(DEBUG_NOTICE, ("Failed to find a node with highest sequence numbers for DB 0x%08x\n", dbid));
882                 talloc_free(tmp_ctx);
883                 return -1;
884         }
885
886         DEBUG(DEBUG_NOTICE, ("Pull persistent db:0x%08x from node %d with highest seqnum:%lld\n", dbid, cb_data->pnn, (long long)cb_data->seqnum)); 
887
888         if (pull_one_remote_database(ctdb, cb_data->pnn, recdb, dbid) != 0) {
889                 DEBUG(DEBUG_ERR, ("Failed to pull higest seqnum database 0x%08x from node %d\n", dbid, cb_data->pnn));
890                 talloc_free(tmp_ctx);
891                 return -1;
892         }
893
894         talloc_free(tmp_ctx);
895         return 0;
896 }
897
898
899 /*
900   pull all the remote database contents into the recdb
901  */
902 static int pull_remote_database(struct ctdb_context *ctdb,
903                                 struct ctdb_recoverd *rec, 
904                                 struct ctdb_node_map_old *nodemap, 
905                                 struct tdb_wrap *recdb, uint32_t dbid,
906                                 bool persistent)
907 {
908         int j;
909
910         if (persistent && ctdb->tunable.recover_pdb_by_seqnum != 0) {
911                 int ret;
912                 ret = pull_highest_seqnum_pdb(ctdb, rec, nodemap, recdb, dbid);
913                 if (ret == 0) {
914                         return 0;
915                 }
916         }
917
918         /* pull all records from all other nodes across onto this node
919            (this merges based on rsn)
920         */
921         for (j=0; j<nodemap->num; j++) {
922                 /* don't merge from nodes that are unavailable */
923                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
924                         continue;
925                 }
926                 if (pull_one_remote_database(ctdb, nodemap->nodes[j].pnn, recdb, dbid) != 0) {
927                         DEBUG(DEBUG_ERR,(__location__ " Failed to pull remote database from node %u\n", 
928                                  nodemap->nodes[j].pnn));
929                         ctdb_set_culprit_count(rec, nodemap->nodes[j].pnn, nodemap->num);
930                         return -1;
931                 }
932         }
933         
934         return 0;
935 }
936
937
938 /*
939   update flags on all active nodes
940  */
941 static int update_flags_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map_old *nodemap, uint32_t pnn, uint32_t flags)
942 {
943         int ret;
944
945         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, flags, ~flags);
946                 if (ret != 0) {
947                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
948                 return -1;
949         }
950
951         return 0;
952 }
953
954 /*
955   ensure all nodes have the same vnnmap we do
956  */
957 static int update_vnnmap_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map_old *nodemap, 
958                                       uint32_t pnn, struct ctdb_vnn_map *vnnmap, TALLOC_CTX *mem_ctx)
959 {
960         int j, ret;
961
962         /* push the new vnn map out to all the nodes */
963         for (j=0; j<nodemap->num; j++) {
964                 /* don't push to nodes that are unavailable */
965                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
966                         continue;
967                 }
968
969                 ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, mem_ctx, vnnmap);
970                 if (ret != 0) {
971                         DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
972                         return -1;
973                 }
974         }
975
976         return 0;
977 }
978
979
980 /*
981   called when a vacuum fetch has completed - just free it and do the next one
982  */
983 static void vacuum_fetch_callback(struct ctdb_client_call_state *state)
984 {
985         talloc_free(state);
986 }
987
988
989 /**
990  * Process one elements of the vacuum fetch list:
991  * Migrate it over to us with the special flag
992  * CTDB_CALL_FLAG_VACUUM_MIGRATION.
993  */
994 static bool vacuum_fetch_process_one(struct ctdb_db_context *ctdb_db,
995                                      uint32_t pnn,
996                                      struct ctdb_rec_data_old *r)
997 {
998         struct ctdb_client_call_state *state;
999         TDB_DATA data;
1000         struct ctdb_ltdb_header *hdr;
1001         struct ctdb_call call;
1002
1003         ZERO_STRUCT(call);
1004         call.call_id = CTDB_NULL_FUNC;
1005         call.flags = CTDB_IMMEDIATE_MIGRATION;
1006         call.flags |= CTDB_CALL_FLAG_VACUUM_MIGRATION;
1007
1008         call.key.dptr = &r->data[0];
1009         call.key.dsize = r->keylen;
1010
1011         /* ensure we don't block this daemon - just skip a record if we can't get
1012            the chainlock */
1013         if (tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, call.key) != 0) {
1014                 return true;
1015         }
1016
1017         data = tdb_fetch(ctdb_db->ltdb->tdb, call.key);
1018         if (data.dptr == NULL) {
1019                 tdb_chainunlock(ctdb_db->ltdb->tdb, call.key);
1020                 return true;
1021         }
1022
1023         if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
1024                 free(data.dptr);
1025                 tdb_chainunlock(ctdb_db->ltdb->tdb, call.key);
1026                 return true;
1027         }
1028
1029         hdr = (struct ctdb_ltdb_header *)data.dptr;
1030         if (hdr->dmaster == pnn) {
1031                 /* its already local */
1032                 free(data.dptr);
1033                 tdb_chainunlock(ctdb_db->ltdb->tdb, call.key);
1034                 return true;
1035         }
1036
1037         free(data.dptr);
1038
1039         state = ctdb_call_send(ctdb_db, &call);
1040         tdb_chainunlock(ctdb_db->ltdb->tdb, call.key);
1041         if (state == NULL) {
1042                 DEBUG(DEBUG_ERR,(__location__ " Failed to setup vacuum fetch call\n"));
1043                 return false;
1044         }
1045         state->async.fn = vacuum_fetch_callback;
1046         state->async.private_data = NULL;
1047
1048         return true;
1049 }
1050
1051
1052 /*
1053   handler for vacuum fetch
1054 */
1055 static void vacuum_fetch_handler(uint64_t srvid, TDB_DATA data,
1056                                  void *private_data)
1057 {
1058         struct ctdb_recoverd *rec = talloc_get_type(
1059                 private_data, struct ctdb_recoverd);
1060         struct ctdb_context *ctdb = rec->ctdb;
1061         struct ctdb_marshall_buffer *recs;
1062         int ret, i;
1063         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1064         const char *name;
1065         struct ctdb_dbid_map_old *dbmap=NULL;
1066         bool persistent = false;
1067         struct ctdb_db_context *ctdb_db;
1068         struct ctdb_rec_data_old *r;
1069
1070         recs = (struct ctdb_marshall_buffer *)data.dptr;
1071
1072         if (recs->count == 0) {
1073                 goto done;
1074         }
1075
1076         /* work out if the database is persistent */
1077         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &dbmap);
1078         if (ret != 0) {
1079                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from local node\n"));
1080                 goto done;
1081         }
1082
1083         for (i=0;i<dbmap->num;i++) {
1084                 if (dbmap->dbs[i].db_id == recs->db_id) {
1085                         persistent = dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT;
1086                         break;
1087                 }
1088         }
1089         if (i == dbmap->num) {
1090                 DEBUG(DEBUG_ERR, (__location__ " Unable to find db_id 0x%x on local node\n", recs->db_id));
1091                 goto done;
1092         }
1093
1094         /* find the name of this database */
1095         if (ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, recs->db_id, tmp_ctx, &name) != 0) {
1096                 DEBUG(DEBUG_ERR,(__location__ " Failed to get name of db 0x%x\n", recs->db_id));
1097                 goto done;
1098         }
1099
1100         /* attach to it */
1101         ctdb_db = ctdb_attach(ctdb, CONTROL_TIMEOUT(), name, persistent, 0);
1102         if (ctdb_db == NULL) {
1103                 DEBUG(DEBUG_ERR,(__location__ " Failed to attach to database '%s'\n", name));
1104                 goto done;
1105         }
1106
1107         r = (struct ctdb_rec_data_old *)&recs->data[0];
1108         while (recs->count) {
1109                 bool ok;
1110
1111                 ok = vacuum_fetch_process_one(ctdb_db, rec->ctdb->pnn, r);
1112                 if (!ok) {
1113                         break;
1114                 }
1115
1116                 r = (struct ctdb_rec_data_old *)(r->length + (uint8_t *)r);
1117                 recs->count--;
1118         }
1119
1120 done:
1121         talloc_free(tmp_ctx);
1122 }
1123
1124
1125 /*
1126  * handler for database detach
1127  */
1128 static void detach_database_handler(uint64_t srvid, TDB_DATA data,
1129                                     void *private_data)
1130 {
1131         struct ctdb_recoverd *rec = talloc_get_type(
1132                 private_data, struct ctdb_recoverd);
1133         struct ctdb_context *ctdb = rec->ctdb;
1134         uint32_t db_id;
1135         struct ctdb_db_context *ctdb_db;
1136
1137         if (data.dsize != sizeof(db_id)) {
1138                 return;
1139         }
1140         db_id = *(uint32_t *)data.dptr;
1141
1142         ctdb_db = find_ctdb_db(ctdb, db_id);
1143         if (ctdb_db == NULL) {
1144                 /* database is not attached */
1145                 return;
1146         }
1147
1148         DLIST_REMOVE(ctdb->db_list, ctdb_db);
1149
1150         DEBUG(DEBUG_NOTICE, ("Detached from database '%s'\n",
1151                              ctdb_db->db_name));
1152         talloc_free(ctdb_db);
1153 }
1154
1155 /*
1156   called when ctdb_wait_timeout should finish
1157  */
1158 static void ctdb_wait_handler(struct tevent_context *ev,
1159                               struct tevent_timer *te,
1160                               struct timeval yt, void *p)
1161 {
1162         uint32_t *timed_out = (uint32_t *)p;
1163         (*timed_out) = 1;
1164 }
1165
1166 /*
1167   wait for a given number of seconds
1168  */
1169 static void ctdb_wait_timeout(struct ctdb_context *ctdb, double secs)
1170 {
1171         uint32_t timed_out = 0;
1172         time_t usecs = (secs - (time_t)secs) * 1000000;
1173         tevent_add_timer(ctdb->ev, ctdb, timeval_current_ofs(secs, usecs),
1174                          ctdb_wait_handler, &timed_out);
1175         while (!timed_out) {
1176                 tevent_loop_once(ctdb->ev);
1177         }
1178 }
1179
1180 /*
1181   called when an election times out (ends)
1182  */
1183 static void ctdb_election_timeout(struct tevent_context *ev,
1184                                   struct tevent_timer *te,
1185                                   struct timeval t, void *p)
1186 {
1187         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1188         rec->election_timeout = NULL;
1189         fast_start = false;
1190
1191         DEBUG(DEBUG_WARNING,("Election period ended\n"));
1192 }
1193
1194
1195 /*
1196   wait for an election to finish. It finished election_timeout seconds after
1197   the last election packet is received
1198  */
1199 static void ctdb_wait_election(struct ctdb_recoverd *rec)
1200 {
1201         struct ctdb_context *ctdb = rec->ctdb;
1202         while (rec->election_timeout) {
1203                 tevent_loop_once(ctdb->ev);
1204         }
1205 }
1206
1207 /*
1208   Update our local flags from all remote connected nodes. 
1209   This is only run when we are or we belive we are the recovery master
1210  */
1211 static int update_local_flags(struct ctdb_recoverd *rec, struct ctdb_node_map_old *nodemap)
1212 {
1213         int j;
1214         struct ctdb_context *ctdb = rec->ctdb;
1215         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
1216
1217         /* get the nodemap for all active remote nodes and verify
1218            they are the same as for this node
1219          */
1220         for (j=0; j<nodemap->num; j++) {
1221                 struct ctdb_node_map_old *remote_nodemap=NULL;
1222                 int ret;
1223
1224                 if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
1225                         continue;
1226                 }
1227                 if (nodemap->nodes[j].pnn == ctdb->pnn) {
1228                         continue;
1229                 }
1230
1231                 ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
1232                                            mem_ctx, &remote_nodemap);
1233                 if (ret != 0) {
1234                         DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from remote node %u\n", 
1235                                   nodemap->nodes[j].pnn));
1236                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
1237                         talloc_free(mem_ctx);
1238                         return -1;
1239                 }
1240                 if (nodemap->nodes[j].flags != remote_nodemap->nodes[j].flags) {
1241                         /* We should tell our daemon about this so it
1242                            updates its flags or else we will log the same 
1243                            message again in the next iteration of recovery.
1244                            Since we are the recovery master we can just as
1245                            well update the flags on all nodes.
1246                         */
1247                         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags, ~remote_nodemap->nodes[j].flags);
1248                         if (ret != 0) {
1249                                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
1250                                 return -1;
1251                         }
1252
1253                         /* Update our local copy of the flags in the recovery
1254                            daemon.
1255                         */
1256                         DEBUG(DEBUG_NOTICE,("Remote node %u had flags 0x%x, local had 0x%x - updating local\n",
1257                                  nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags,
1258                                  nodemap->nodes[j].flags));
1259                         nodemap->nodes[j].flags = remote_nodemap->nodes[j].flags;
1260                 }
1261                 talloc_free(remote_nodemap);
1262         }
1263         talloc_free(mem_ctx);
1264         return 0;
1265 }
1266
1267
1268 /* Create a new random generation id.
1269    The generation id can not be the INVALID_GENERATION id
1270 */
1271 static uint32_t new_generation(void)
1272 {
1273         uint32_t generation;
1274
1275         while (1) {
1276                 generation = random();
1277
1278                 if (generation != INVALID_GENERATION) {
1279                         break;
1280                 }
1281         }
1282
1283         return generation;
1284 }
1285
1286
1287 /*
1288   create a temporary working database
1289  */
1290 static struct tdb_wrap *create_recdb(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx)
1291 {
1292         char *name;
1293         struct tdb_wrap *recdb;
1294         unsigned tdb_flags;
1295
1296         /* open up the temporary recovery database */
1297         name = talloc_asprintf(mem_ctx, "%s/recdb.tdb.%u",
1298                                ctdb->db_directory_state,
1299                                ctdb->pnn);
1300         if (name == NULL) {
1301                 return NULL;
1302         }
1303         unlink(name);
1304
1305         tdb_flags = TDB_NOLOCK;
1306         if (ctdb->valgrinding) {
1307                 tdb_flags |= TDB_NOMMAP;
1308         }
1309         tdb_flags |= (TDB_INCOMPATIBLE_HASH | TDB_DISALLOW_NESTING);
1310
1311         recdb = tdb_wrap_open(mem_ctx, name, ctdb->tunable.database_hash_size, 
1312                               tdb_flags, O_RDWR|O_CREAT|O_EXCL, 0600);
1313         if (recdb == NULL) {
1314                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create temp recovery database '%s'\n", name));
1315         }
1316
1317         talloc_free(name);
1318
1319         return recdb;
1320 }
1321
1322
1323 /* 
1324    a traverse function for pulling all relevant records from recdb
1325  */
1326 struct recdb_data {
1327         struct ctdb_context *ctdb;
1328         struct ctdb_marshall_buffer *recdata;
1329         uint32_t len;
1330         uint32_t allocated_len;
1331         bool failed;
1332         bool persistent;
1333 };
1334
1335 static int traverse_recdb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
1336 {
1337         struct recdb_data *params = (struct recdb_data *)p;
1338         struct ctdb_rec_data_old *recdata;
1339         struct ctdb_ltdb_header *hdr;
1340
1341         /*
1342          * skip empty records - but NOT for persistent databases:
1343          *
1344          * The record-by-record mode of recovery deletes empty records.
1345          * For persistent databases, this can lead to data corruption
1346          * by deleting records that should be there:
1347          *
1348          * - Assume the cluster has been running for a while.
1349          *
1350          * - A record R in a persistent database has been created and
1351          *   deleted a couple of times, the last operation being deletion,
1352          *   leaving an empty record with a high RSN, say 10.
1353          *
1354          * - Now a node N is turned off.
1355          *
1356          * - This leaves the local database copy of D on N with the empty
1357          *   copy of R and RSN 10. On all other nodes, the recovery has deleted
1358          *   the copy of record R.
1359          *
1360          * - Now the record is created again while node N is turned off.
1361          *   This creates R with RSN = 1 on all nodes except for N.
1362          *
1363          * - Now node N is turned on again. The following recovery will chose
1364          *   the older empty copy of R due to RSN 10 > RSN 1.
1365          *
1366          * ==> Hence the record is gone after the recovery.
1367          *
1368          * On databases like Samba's registry, this can damage the higher-level
1369          * data structures built from the various tdb-level records.
1370          */
1371         if (!params->persistent && data.dsize <= sizeof(struct ctdb_ltdb_header)) {
1372                 return 0;
1373         }
1374
1375         /* update the dmaster field to point to us */
1376         hdr = (struct ctdb_ltdb_header *)data.dptr;
1377         if (!params->persistent) {
1378                 hdr->dmaster = params->ctdb->pnn;
1379                 hdr->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
1380         }
1381
1382         /* add the record to the blob ready to send to the nodes */
1383         recdata = ctdb_marshall_record(params->recdata, 0, key, NULL, data);
1384         if (recdata == NULL) {
1385                 params->failed = true;
1386                 return -1;
1387         }
1388         if (params->len + recdata->length >= params->allocated_len) {
1389                 params->allocated_len = recdata->length + params->len + params->ctdb->tunable.pulldb_preallocation_size;
1390                 params->recdata = talloc_realloc_size(NULL, params->recdata, params->allocated_len);
1391         }
1392         if (params->recdata == NULL) {
1393                 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata to %u\n",
1394                          recdata->length + params->len));
1395                 params->failed = true;
1396                 return -1;
1397         }
1398         params->recdata->count++;
1399         memcpy(params->len+(uint8_t *)params->recdata, recdata, recdata->length);
1400         params->len += recdata->length;
1401         talloc_free(recdata);
1402
1403         return 0;
1404 }
1405
1406 /*
1407   push the recdb database out to all nodes
1408  */
1409 static int push_recdb_database(struct ctdb_context *ctdb, uint32_t dbid,
1410                                bool persistent,
1411                                struct tdb_wrap *recdb, struct ctdb_node_map_old *nodemap)
1412 {
1413         struct recdb_data params;
1414         struct ctdb_marshall_buffer *recdata;
1415         TDB_DATA outdata;
1416         TALLOC_CTX *tmp_ctx;
1417         uint32_t *nodes;
1418
1419         tmp_ctx = talloc_new(ctdb);
1420         CTDB_NO_MEMORY(ctdb, tmp_ctx);
1421
1422         recdata = talloc_zero(recdb, struct ctdb_marshall_buffer);
1423         CTDB_NO_MEMORY(ctdb, recdata);
1424
1425         recdata->db_id = dbid;
1426
1427         params.ctdb = ctdb;
1428         params.recdata = recdata;
1429         params.len = offsetof(struct ctdb_marshall_buffer, data);
1430         params.allocated_len = params.len;
1431         params.failed = false;
1432         params.persistent = persistent;
1433
1434         if (tdb_traverse_read(recdb->tdb, traverse_recdb, &params) == -1) {
1435                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1436                 talloc_free(params.recdata);
1437                 talloc_free(tmp_ctx);
1438                 return -1;
1439         }
1440
1441         if (params.failed) {
1442                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1443                 talloc_free(params.recdata);
1444                 talloc_free(tmp_ctx);
1445                 return -1;              
1446         }
1447
1448         recdata = params.recdata;
1449
1450         outdata.dptr = (void *)recdata;
1451         outdata.dsize = params.len;
1452
1453         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
1454         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_PUSH_DB,
1455                                         nodes, 0,
1456                                         CONTROL_TIMEOUT(), false, outdata,
1457                                         NULL, NULL,
1458                                         NULL) != 0) {
1459                 DEBUG(DEBUG_ERR,(__location__ " Failed to push recdb records to nodes for db 0x%x\n", dbid));
1460                 talloc_free(recdata);
1461                 talloc_free(tmp_ctx);
1462                 return -1;
1463         }
1464
1465         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pushed remote database 0x%x of size %u\n", 
1466                   dbid, recdata->count));
1467
1468         talloc_free(recdata);
1469         talloc_free(tmp_ctx);
1470
1471         return 0;
1472 }
1473
1474
1475 /*
1476   go through a full recovery on one database 
1477  */
1478 static int recover_database(struct ctdb_recoverd *rec, 
1479                             TALLOC_CTX *mem_ctx,
1480                             uint32_t dbid,
1481                             bool persistent,
1482                             uint32_t pnn, 
1483                             struct ctdb_node_map_old *nodemap,
1484                             uint32_t transaction_id)
1485 {
1486         struct tdb_wrap *recdb;
1487         int ret;
1488         struct ctdb_context *ctdb = rec->ctdb;
1489         TDB_DATA data;
1490         struct ctdb_transdb w;
1491         uint32_t *nodes;
1492
1493         recdb = create_recdb(ctdb, mem_ctx);
1494         if (recdb == NULL) {
1495                 return -1;
1496         }
1497
1498         /* pull all remote databases onto the recdb */
1499         ret = pull_remote_database(ctdb, rec, nodemap, recdb, dbid, persistent);
1500         if (ret != 0) {
1501                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull remote database 0x%x\n", dbid));
1502                 return -1;
1503         }
1504
1505         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pulled remote database 0x%x\n", dbid));
1506
1507         /* wipe all the remote databases. This is safe as we are in a transaction */
1508         w.db_id = dbid;
1509         w.tid = transaction_id;
1510
1511         data.dptr = (void *)&w;
1512         data.dsize = sizeof(w);
1513
1514         nodes = list_of_active_nodes(ctdb, nodemap, recdb, true);
1515         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
1516                                         nodes, 0,
1517                                         CONTROL_TIMEOUT(), false, data,
1518                                         NULL, NULL,
1519                                         NULL) != 0) {
1520                 DEBUG(DEBUG_ERR, (__location__ " Unable to wipe database. Recovery failed.\n"));
1521                 talloc_free(recdb);
1522                 return -1;
1523         }
1524         
1525         /* push out the correct database. This sets the dmaster and skips 
1526            the empty records */
1527         ret = push_recdb_database(ctdb, dbid, persistent, recdb, nodemap);
1528         if (ret != 0) {
1529                 talloc_free(recdb);
1530                 return -1;
1531         }
1532
1533         /* all done with this database */
1534         talloc_free(recdb);
1535
1536         return 0;
1537 }
1538
1539 static bool ctdb_recovery_have_lock(struct ctdb_context *ctdb)
1540 {
1541         return (ctdb->recovery_lock_handle != NULL);
1542 }
1543
1544 struct hold_reclock_state {
1545         bool done;
1546         char status;
1547 };
1548
1549 static void hold_reclock_handler(struct ctdb_context *ctdb,
1550                                  char status,
1551                                  double latency,
1552                                  struct ctdb_cluster_mutex_handle *h,
1553                                  void *private_data)
1554 {
1555         struct hold_reclock_state *s =
1556                 (struct hold_reclock_state *) private_data;
1557
1558         switch (status) {
1559         case '0':
1560                 ctdb->recovery_lock_handle = h;
1561                 ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(),
1562                                                    latency);
1563                 break;
1564
1565         case '1':
1566                 DEBUG(DEBUG_ERR,
1567                       ("Unable to take recovery lock - contention\n"));
1568                 talloc_free(h);
1569                 break;
1570
1571         default:
1572                 DEBUG(DEBUG_ERR, ("ERROR: when taking recovery lock\n"));
1573                 talloc_free(h);
1574         }
1575
1576         s->done = true;
1577         s->status = status;
1578 }
1579
1580 static bool ctdb_recovery_lock(struct ctdb_context *ctdb)
1581 {
1582         struct ctdb_cluster_mutex_handle *h;
1583         struct hold_reclock_state s = {
1584                 .done = false,
1585                 .status = '0',
1586         };
1587
1588         h = ctdb_cluster_mutex(ctdb, ctdb->recovery_lock_file, 0);
1589         if (h == NULL) {
1590                 return -1;
1591         }
1592
1593         ctdb_cluster_mutex_set_handler(h, hold_reclock_handler, &s);
1594
1595         while (!s.done) {
1596                 tevent_loop_once(ctdb->ev);
1597         }
1598
1599         /* Ensure no attempts to access to s after function return */
1600         ctdb_cluster_mutex_set_handler(h, hold_reclock_handler, NULL);
1601
1602         return (s.status == '0');
1603 }
1604
1605 static void ctdb_recovery_unlock(struct ctdb_context *ctdb)
1606 {
1607         if (ctdb->recovery_lock_handle != NULL) {
1608                 DEBUG(DEBUG_NOTICE, ("Releasing recovery lock\n"));
1609                 TALLOC_FREE(ctdb->recovery_lock_handle);
1610         }
1611 }
1612
1613 /* when we start a recovery, make sure all nodes use the same reclock file
1614    setting
1615 */
1616 static int sync_recovery_lock_file_across_cluster(struct ctdb_recoverd *rec)
1617 {
1618         struct ctdb_context *ctdb = rec->ctdb;
1619         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
1620         TDB_DATA data;
1621         uint32_t *nodes;
1622
1623         if (ctdb->recovery_lock_file == NULL) {
1624                 data.dptr  = NULL;
1625                 data.dsize = 0;
1626         } else {
1627                 data.dsize = strlen(ctdb->recovery_lock_file) + 1;
1628                 data.dptr  = (uint8_t *)ctdb->recovery_lock_file;
1629         }
1630
1631         nodes = list_of_active_nodes(ctdb, rec->nodemap, tmp_ctx, true);
1632         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECLOCK_FILE,
1633                                         nodes, 0,
1634                                         CONTROL_TIMEOUT(),
1635                                         false, data,
1636                                         NULL, NULL,
1637                                         rec) != 0) {
1638                 DEBUG(DEBUG_ERR, (__location__ " Failed to sync reclock file settings\n"));
1639                 talloc_free(tmp_ctx);
1640                 return -1;
1641         }
1642
1643         talloc_free(tmp_ctx);
1644         return 0;
1645 }
1646
1647
1648 /*
1649  * this callback is called for every node that failed to execute ctdb_takeover_run()
1650  * and set flag to re-run takeover run.
1651  */
1652 static void takeover_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
1653 {
1654         DEBUG(DEBUG_ERR, ("Node %u failed the takeover run\n", node_pnn));
1655
1656         if (callback_data != NULL) {
1657                 struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
1658
1659                 DEBUG(DEBUG_ERR, ("Setting node %u as recovery fail culprit\n", node_pnn));
1660
1661                 ctdb_set_culprit(rec, node_pnn);
1662         }
1663 }
1664
1665
1666 static void ban_misbehaving_nodes(struct ctdb_recoverd *rec, bool *self_ban)
1667 {
1668         struct ctdb_context *ctdb = rec->ctdb;
1669         int i;
1670         struct ctdb_banning_state *ban_state;
1671
1672         *self_ban = false;
1673         for (i=0; i<ctdb->num_nodes; i++) {
1674                 if (ctdb->nodes[i]->ban_state == NULL) {
1675                         continue;
1676                 }
1677                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
1678                 if (ban_state->count < 2*ctdb->num_nodes) {
1679                         continue;
1680                 }
1681
1682                 DEBUG(DEBUG_NOTICE,("Node %u reached %u banning credits - banning it for %u seconds\n",
1683                         ctdb->nodes[i]->pnn, ban_state->count,
1684                         ctdb->tunable.recovery_ban_period));
1685                 ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
1686                 ban_state->count = 0;
1687
1688                 /* Banning ourself? */
1689                 if (ctdb->nodes[i]->pnn == rec->ctdb->pnn) {
1690                         *self_ban = true;
1691                 }
1692         }
1693 }
1694
1695 static bool do_takeover_run(struct ctdb_recoverd *rec,
1696                             struct ctdb_node_map_old *nodemap,
1697                             bool banning_credits_on_fail)
1698 {
1699         uint32_t *nodes = NULL;
1700         struct ctdb_disable_message dtr;
1701         TDB_DATA data;
1702         int i;
1703         uint32_t *rebalance_nodes = rec->force_rebalance_nodes;
1704         int ret;
1705         bool ok;
1706
1707         DEBUG(DEBUG_NOTICE, ("Takeover run starting\n"));
1708
1709         if (ctdb_op_is_in_progress(rec->takeover_run)) {
1710                 DEBUG(DEBUG_ERR, (__location__
1711                                   " takeover run already in progress \n"));
1712                 ok = false;
1713                 goto done;
1714         }
1715
1716         if (!ctdb_op_begin(rec->takeover_run)) {
1717                 ok = false;
1718                 goto done;
1719         }
1720
1721         /* Disable IP checks (takeover runs, really) on other nodes
1722          * while doing this takeover run.  This will stop those other
1723          * nodes from triggering takeover runs when think they should
1724          * be hosting an IP but it isn't yet on an interface.  Don't
1725          * wait for replies since a failure here might cause some
1726          * noise in the logs but will not actually cause a problem.
1727          */
1728         ZERO_STRUCT(dtr);
1729         dtr.srvid = 0; /* No reply */
1730         dtr.pnn = -1;
1731
1732         data.dptr  = (uint8_t*)&dtr;
1733         data.dsize = sizeof(dtr);
1734
1735         nodes = list_of_connected_nodes(rec->ctdb, nodemap, rec, false);
1736
1737         /* Disable for 60 seconds.  This can be a tunable later if
1738          * necessary.
1739          */
1740         dtr.timeout = 60;
1741         for (i = 0; i < talloc_array_length(nodes); i++) {
1742                 if (ctdb_client_send_message(rec->ctdb, nodes[i],
1743                                              CTDB_SRVID_DISABLE_TAKEOVER_RUNS,
1744                                              data) != 0) {
1745                         DEBUG(DEBUG_INFO,("Failed to disable takeover runs\n"));
1746                 }
1747         }
1748
1749         ret = ctdb_takeover_run(rec->ctdb, nodemap,
1750                                 rec->force_rebalance_nodes,
1751                                 takeover_fail_callback,
1752                                 banning_credits_on_fail ? rec : NULL);
1753
1754         /* Reenable takeover runs and IP checks on other nodes */
1755         dtr.timeout = 0;
1756         for (i = 0; i < talloc_array_length(nodes); i++) {
1757                 if (ctdb_client_send_message(rec->ctdb, nodes[i],
1758                                              CTDB_SRVID_DISABLE_TAKEOVER_RUNS,
1759                                              data) != 0) {
1760                         DEBUG(DEBUG_INFO,("Failed to re-enable takeover runs\n"));
1761                 }
1762         }
1763
1764         if (ret != 0) {
1765                 DEBUG(DEBUG_ERR, ("ctdb_takeover_run() failed\n"));
1766                 ok = false;
1767                 goto done;
1768         }
1769
1770         ok = true;
1771         /* Takeover run was successful so clear force rebalance targets */
1772         if (rebalance_nodes == rec->force_rebalance_nodes) {
1773                 TALLOC_FREE(rec->force_rebalance_nodes);
1774         } else {
1775                 DEBUG(DEBUG_WARNING,
1776                       ("Rebalance target nodes changed during takeover run - not clearing\n"));
1777         }
1778 done:
1779         rec->need_takeover_run = !ok;
1780         talloc_free(nodes);
1781         ctdb_op_end(rec->takeover_run);
1782
1783         DEBUG(DEBUG_NOTICE, ("Takeover run %s\n", ok ? "completed successfully" : "unsuccessful"));
1784         return ok;
1785 }
1786
1787 struct recovery_helper_state {
1788         int fd[2];
1789         pid_t pid;
1790         int result;
1791         bool done;
1792 };
1793
1794 static void ctdb_recovery_handler(struct tevent_context *ev,
1795                                   struct tevent_fd *fde,
1796                                   uint16_t flags, void *private_data)
1797 {
1798         struct recovery_helper_state *state = talloc_get_type_abort(
1799                 private_data, struct recovery_helper_state);
1800         int ret;
1801
1802         ret = sys_read(state->fd[0], &state->result, sizeof(state->result));
1803         if (ret != sizeof(state->result)) {
1804                 state->result = EPIPE;
1805         }
1806
1807         state->done = true;
1808 }
1809
1810
1811 static int db_recovery_parallel(struct ctdb_recoverd *rec, TALLOC_CTX *mem_ctx)
1812 {
1813         static char prog[PATH_MAX+1] = "";
1814         const char **args;
1815         struct recovery_helper_state *state;
1816         struct tevent_fd *fde;
1817         int nargs, ret;
1818
1819         if (!ctdb_set_helper("recovery_helper", prog, sizeof(prog),
1820                              "CTDB_RECOVERY_HELPER", CTDB_HELPER_BINDIR,
1821                              "ctdb_recovery_helper")) {
1822                 ctdb_die(rec->ctdb, "Unable to set recovery helper\n");
1823         }
1824
1825         state = talloc_zero(mem_ctx, struct recovery_helper_state);
1826         if (state == NULL) {
1827                 DEBUG(DEBUG_ERR, (__location__ " memory error\n"));
1828                 return -1;
1829         }
1830
1831         state->pid = -1;
1832
1833         ret = pipe(state->fd);
1834         if (ret != 0) {
1835                 DEBUG(DEBUG_ERR,
1836                       ("Failed to create pipe for recovery helper\n"));
1837                 goto fail;
1838         }
1839
1840         set_close_on_exec(state->fd[0]);
1841
1842         nargs = 4;
1843         args = talloc_array(state, const char *, nargs);
1844         if (args == NULL) {
1845                 DEBUG(DEBUG_ERR, (__location__ " memory error\n"));
1846                 goto fail;
1847         }
1848
1849         args[0] = talloc_asprintf(args, "%d", state->fd[1]);
1850         args[1] = rec->ctdb->daemon.name;
1851         args[2] = talloc_asprintf(args, "%u", new_generation());
1852         args[3] = NULL;
1853
1854         if (args[0] == NULL || args[2] == NULL) {
1855                 DEBUG(DEBUG_ERR, (__location__ " memory error\n"));
1856                 goto fail;
1857         }
1858
1859         setenv("CTDB_DBDIR_STATE", rec->ctdb->db_directory_state, 1);
1860
1861         if (!ctdb_vfork_with_logging(state, rec->ctdb, "recovery", prog, nargs,
1862                                      args, NULL, NULL, &state->pid)) {
1863                 DEBUG(DEBUG_ERR,
1864                       ("Failed to create child for recovery helper\n"));
1865                 goto fail;
1866         }
1867
1868         close(state->fd[1]);
1869         state->fd[1] = -1;
1870
1871         state->done = false;
1872
1873         fde = tevent_add_fd(rec->ctdb->ev, rec->ctdb, state->fd[0],
1874                             TEVENT_FD_READ, ctdb_recovery_handler, state);
1875         if (fde == NULL) {
1876                 goto fail;
1877         }
1878         tevent_fd_set_auto_close(fde);
1879
1880         while (!state->done) {
1881                 tevent_loop_once(rec->ctdb->ev);
1882         }
1883
1884         close(state->fd[0]);
1885         state->fd[0] = -1;
1886
1887         if (state->result != 0) {
1888                 goto fail;
1889         }
1890
1891         ctdb_kill(rec->ctdb, state->pid, SIGKILL);
1892         talloc_free(state);
1893         return 0;
1894
1895 fail:
1896         if (state->fd[0] != -1) {
1897                 close(state->fd[0]);
1898         }
1899         if (state->fd[1] != -1) {
1900                 close(state->fd[1]);
1901         }
1902         if (state->pid != -1) {
1903                 ctdb_kill(rec->ctdb, state->pid, SIGKILL);
1904         }
1905         talloc_free(state);
1906         return -1;
1907 }
1908
1909 static int db_recovery_serial(struct ctdb_recoverd *rec, TALLOC_CTX *mem_ctx,
1910                               uint32_t pnn, struct ctdb_node_map_old *nodemap,
1911                               struct ctdb_vnn_map *vnnmap,
1912                               struct ctdb_dbid_map_old *dbmap)
1913 {
1914         struct ctdb_context *ctdb = rec->ctdb;
1915         uint32_t generation;
1916         TDB_DATA data;
1917         uint32_t *nodes;
1918         int ret, i, j;
1919
1920         /* set recovery mode to active on all nodes */
1921         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE, true);
1922         if (ret != 0) {
1923                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
1924                 return -1;
1925         }
1926
1927         /* execute the "startrecovery" event script on all nodes */
1928         ret = run_startrecovery_eventscript(rec, nodemap);
1929         if (ret!=0) {
1930                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
1931                 return -1;
1932         }
1933
1934         /* pick a new generation number */
1935         generation = new_generation();
1936
1937         /* change the vnnmap on this node to use the new generation 
1938            number but not on any other nodes.
1939            this guarantees that if we abort the recovery prematurely
1940            for some reason (a node stops responding?)
1941            that we can just return immediately and we will reenter
1942            recovery shortly again.
1943            I.e. we deliberately leave the cluster with an inconsistent
1944            generation id to allow us to abort recovery at any stage and
1945            just restart it from scratch.
1946          */
1947         vnnmap->generation = generation;
1948         ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, vnnmap);
1949         if (ret != 0) {
1950                 DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
1951                 return -1;
1952         }
1953
1954         /* Database generations are updated when the transaction is commited to
1955          * the databases.  So make sure to use the final generation as the
1956          * transaction id
1957          */
1958         generation = new_generation();
1959
1960         data.dptr = (void *)&generation;
1961         data.dsize = sizeof(uint32_t);
1962
1963         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
1964         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
1965                                         nodes, 0,
1966                                         CONTROL_TIMEOUT(), false, data,
1967                                         NULL,
1968                                         transaction_start_fail_callback,
1969                                         rec) != 0) {
1970                 DEBUG(DEBUG_ERR, (__location__ " Unable to start transactions. Recovery failed.\n"));
1971                 if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_CANCEL,
1972                                         nodes, 0,
1973                                         CONTROL_TIMEOUT(), false, tdb_null,
1974                                         NULL,
1975                                         NULL,
1976                                         NULL) != 0) {
1977                         DEBUG(DEBUG_ERR,("Failed to cancel recovery transaction\n"));
1978                 }
1979                 return -1;
1980         }
1981
1982         DEBUG(DEBUG_NOTICE,(__location__ " started transactions on all nodes\n"));
1983
1984         for (i=0;i<dbmap->num;i++) {
1985                 ret = recover_database(rec, mem_ctx,
1986                                        dbmap->dbs[i].db_id,
1987                                        dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT,
1988                                        pnn, nodemap, generation);
1989                 if (ret != 0) {
1990                         DEBUG(DEBUG_ERR, (__location__ " Failed to recover database 0x%x\n", dbmap->dbs[i].db_id));
1991                         return -1;
1992                 }
1993         }
1994
1995         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - starting database commits\n"));
1996
1997         /* commit all the changes */
1998         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
1999                                         nodes, 0,
2000                                         CONTROL_TIMEOUT(), false, data,
2001                                         NULL, NULL,
2002                                         NULL) != 0) {
2003                 DEBUG(DEBUG_ERR, (__location__ " Unable to commit recovery changes. Recovery failed.\n"));
2004                 return -1;
2005         }
2006
2007         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - committed databases\n"));
2008
2009         /* build a new vnn map with all the currently active and
2010            unbanned nodes */
2011         vnnmap = talloc(mem_ctx, struct ctdb_vnn_map);
2012         CTDB_NO_MEMORY(ctdb, vnnmap);
2013         vnnmap->generation = generation;
2014         vnnmap->size = 0;
2015         vnnmap->map = talloc_zero_array(vnnmap, uint32_t, vnnmap->size);
2016         CTDB_NO_MEMORY(ctdb, vnnmap->map);
2017         for (i=j=0;i<nodemap->num;i++) {
2018                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2019                         continue;
2020                 }
2021                 if (!ctdb_node_has_capabilities(rec->caps,
2022                                                 ctdb->nodes[i]->pnn,
2023                                                 CTDB_CAP_LMASTER)) {
2024                         /* this node can not be an lmaster */
2025                         DEBUG(DEBUG_DEBUG, ("Node %d cant be a LMASTER, skipping it\n", i));
2026                         continue;
2027                 }
2028
2029                 vnnmap->size++;
2030                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
2031                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
2032                 vnnmap->map[j++] = nodemap->nodes[i].pnn;
2033
2034         }
2035         if (vnnmap->size == 0) {
2036                 DEBUG(DEBUG_NOTICE, ("No suitable lmasters found. Adding local node (recmaster) anyway.\n"));
2037                 vnnmap->size++;
2038                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
2039                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
2040                 vnnmap->map[0] = pnn;
2041         }
2042
2043         /* update to the new vnnmap on all nodes */
2044         ret = update_vnnmap_on_all_nodes(ctdb, nodemap, pnn, vnnmap, mem_ctx);
2045         if (ret != 0) {
2046                 DEBUG(DEBUG_ERR, (__location__ " Unable to update vnnmap on all nodes\n"));
2047                 return -1;
2048         }
2049
2050         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated vnnmap\n"));
2051
2052         /* disable recovery mode */
2053         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_NORMAL, false);
2054         if (ret != 0) {
2055                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to normal on cluster\n"));
2056                 return -1;
2057         }
2058
2059         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - disabled recovery mode\n"));
2060
2061         return 0;
2062 }
2063
2064 /*
2065   we are the recmaster, and recovery is needed - start a recovery run
2066  */
2067 static int do_recovery(struct ctdb_recoverd *rec,
2068                        TALLOC_CTX *mem_ctx, uint32_t pnn,
2069                        struct ctdb_node_map_old *nodemap, struct ctdb_vnn_map *vnnmap)
2070 {
2071         struct ctdb_context *ctdb = rec->ctdb;
2072         int i, ret;
2073         struct ctdb_dbid_map_old *dbmap;
2074         bool self_ban;
2075         bool par_recovery;
2076
2077         DEBUG(DEBUG_NOTICE, (__location__ " Starting do_recovery\n"));
2078
2079         /* Check if the current node is still the recmaster.  It's possible that
2080          * re-election has changed the recmaster.
2081          */
2082         if (pnn != rec->recmaster) {
2083                 DEBUG(DEBUG_NOTICE,
2084                       ("Recovery master changed to %u, aborting recovery\n",
2085                        rec->recmaster));
2086                 return -1;
2087         }
2088
2089         /* if recovery fails, force it again */
2090         rec->need_recovery = true;
2091
2092         if (!ctdb_op_begin(rec->recovery)) {
2093                 return -1;
2094         }
2095
2096         if (rec->election_timeout) {
2097                 /* an election is in progress */
2098                 DEBUG(DEBUG_ERR, ("do_recovery called while election in progress - try again later\n"));
2099                 goto fail;
2100         }
2101
2102         ban_misbehaving_nodes(rec, &self_ban);
2103         if (self_ban) {
2104                 DEBUG(DEBUG_NOTICE, ("This node was banned, aborting recovery\n"));
2105                 goto fail;
2106         }
2107
2108         if (ctdb->recovery_lock_file != NULL) {
2109                 if (ctdb_recovery_have_lock(ctdb)) {
2110                         DEBUG(DEBUG_NOTICE, ("Already holding recovery lock\n"));
2111                 } else {
2112                         DEBUG(DEBUG_NOTICE, ("Attempting to take recovery lock (%s)\n",
2113                                              ctdb->recovery_lock_file));
2114                         if (!ctdb_recovery_lock(ctdb)) {
2115                                 if (ctdb->runstate == CTDB_RUNSTATE_FIRST_RECOVERY) {
2116                                         /* If ctdb is trying first recovery, it's
2117                                          * possible that current node does not know
2118                                          * yet who the recmaster is.
2119                                          */
2120                                         DEBUG(DEBUG_ERR, ("Unable to get recovery lock"
2121                                                           " - retrying recovery\n"));
2122                                         goto fail;
2123                                 }
2124
2125                                 DEBUG(DEBUG_ERR,("Unable to get recovery lock - aborting recovery "
2126                                                  "and ban ourself for %u seconds\n",
2127                                                  ctdb->tunable.recovery_ban_period));
2128                                 ctdb_ban_node(rec, pnn, ctdb->tunable.recovery_ban_period);
2129                                 goto fail;
2130                         }
2131                         DEBUG(DEBUG_NOTICE,
2132                               ("Recovery lock taken successfully by recovery daemon\n"));
2133                 }
2134         }
2135
2136         DEBUG(DEBUG_NOTICE, (__location__ " Recovery initiated due to problem with node %u\n", rec->last_culprit_node));
2137
2138         /* get a list of all databases */
2139         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &dbmap);
2140         if (ret != 0) {
2141                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node :%u\n", pnn));
2142                 goto fail;
2143         }
2144
2145         /* we do the db creation before we set the recovery mode, so the freeze happens
2146            on all databases we will be dealing with. */
2147
2148         /* verify that we have all the databases any other node has */
2149         ret = create_missing_local_databases(ctdb, nodemap, pnn, &dbmap, mem_ctx);
2150         if (ret != 0) {
2151                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing local databases\n"));
2152                 goto fail;
2153         }
2154
2155         /* verify that all other nodes have all our databases */
2156         ret = create_missing_remote_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
2157         if (ret != 0) {
2158                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing remote databases\n"));
2159                 goto fail;
2160         }
2161         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - created remote databases\n"));
2162
2163         /* update the database priority for all remote databases */
2164         ret = update_db_priority_on_remote_nodes(ctdb, nodemap, pnn, dbmap, mem_ctx);
2165         if (ret != 0) {
2166                 DEBUG(DEBUG_ERR, (__location__ " Unable to set db priority on remote nodes\n"));
2167         }
2168         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated db priority for all databases\n"));
2169
2170
2171         /* update all other nodes to use the same setting for reclock files
2172            as the local recovery master.
2173         */
2174         sync_recovery_lock_file_across_cluster(rec);
2175
2176         /* Retrieve capabilities from all connected nodes */
2177         ret = update_capabilities(rec, nodemap);
2178         if (ret!=0) {
2179                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
2180                 return -1;
2181         }
2182
2183         /*
2184           update all nodes to have the same flags that we have
2185          */
2186         for (i=0;i<nodemap->num;i++) {
2187                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
2188                         continue;
2189                 }
2190
2191                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
2192                 if (ret != 0) {
2193                         if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2194                                 DEBUG(DEBUG_WARNING, (__location__ "Unable to update flags on inactive node %d\n", i));
2195                         } else {
2196                                 DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
2197                                 return -1;
2198                         }
2199                 }
2200         }
2201
2202         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
2203
2204         /* Check if all participating nodes have parallel recovery capability */
2205         par_recovery = true;
2206         for (i=0; i<nodemap->num; i++) {
2207                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2208                         continue;
2209                 }
2210
2211                 if (!(rec->caps[i].capabilities &
2212                       CTDB_CAP_PARALLEL_RECOVERY)) {
2213                         par_recovery = false;
2214                         break;
2215                 }
2216         }
2217
2218         if (par_recovery) {
2219                 ret = db_recovery_parallel(rec, mem_ctx);
2220         } else {
2221                 ret = db_recovery_serial(rec, mem_ctx, pnn, nodemap, vnnmap,
2222                                          dbmap);
2223         }
2224
2225         if (ret != 0) {
2226                 goto fail;
2227         }
2228
2229         do_takeover_run(rec, nodemap, false);
2230
2231         /* execute the "recovered" event script on all nodes */
2232         ret = run_recovered_eventscript(rec, nodemap, "do_recovery");
2233         if (ret!=0) {
2234                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Recovery process failed.\n"));
2235                 goto fail;
2236         }
2237
2238         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - finished the recovered event\n"));
2239
2240         /* send a message to all clients telling them that the cluster 
2241            has been reconfigured */
2242         ret = ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED,
2243                                        CTDB_SRVID_RECONFIGURE, tdb_null);
2244         if (ret != 0) {
2245                 DEBUG(DEBUG_ERR, (__location__ " Failed to send reconfigure message\n"));
2246                 goto fail;
2247         }
2248
2249         DEBUG(DEBUG_NOTICE, (__location__ " Recovery complete\n"));
2250
2251         rec->need_recovery = false;
2252         ctdb_op_end(rec->recovery);
2253
2254         /* we managed to complete a full recovery, make sure to forgive
2255            any past sins by the nodes that could now participate in the
2256            recovery.
2257         */
2258         DEBUG(DEBUG_ERR,("Resetting ban count to 0 for all nodes\n"));
2259         for (i=0;i<nodemap->num;i++) {
2260                 struct ctdb_banning_state *ban_state;
2261
2262                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
2263                         continue;
2264                 }
2265
2266                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[nodemap->nodes[i].pnn]->ban_state;
2267                 if (ban_state == NULL) {
2268                         continue;
2269                 }
2270
2271                 ban_state->count = 0;
2272         }
2273
2274         /* We just finished a recovery successfully.
2275            We now wait for rerecovery_timeout before we allow
2276            another recovery to take place.
2277         */
2278         DEBUG(DEBUG_NOTICE, ("Just finished a recovery. New recoveries will now be supressed for the rerecovery timeout (%d seconds)\n", ctdb->tunable.rerecovery_timeout));
2279         ctdb_op_disable(rec->recovery, ctdb->ev,
2280                         ctdb->tunable.rerecovery_timeout);
2281         return 0;
2282
2283 fail:
2284         ctdb_op_end(rec->recovery);
2285         return -1;
2286 }
2287
2288
2289 /*
2290   elections are won by first checking the number of connected nodes, then
2291   the priority time, then the pnn
2292  */
2293 struct election_message {
2294         uint32_t num_connected;
2295         struct timeval priority_time;
2296         uint32_t pnn;
2297         uint32_t node_flags;
2298 };
2299
2300 /*
2301   form this nodes election data
2302  */
2303 static void ctdb_election_data(struct ctdb_recoverd *rec, struct election_message *em)
2304 {
2305         int ret, i;
2306         struct ctdb_node_map_old *nodemap;
2307         struct ctdb_context *ctdb = rec->ctdb;
2308
2309         ZERO_STRUCTP(em);
2310
2311         em->pnn = rec->ctdb->pnn;
2312         em->priority_time = rec->priority_time;
2313
2314         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, rec, &nodemap);
2315         if (ret != 0) {
2316                 DEBUG(DEBUG_ERR,(__location__ " unable to get node map\n"));
2317                 return;
2318         }
2319
2320         rec->node_flags = nodemap->nodes[ctdb->pnn].flags;
2321         em->node_flags = rec->node_flags;
2322
2323         for (i=0;i<nodemap->num;i++) {
2324                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
2325                         em->num_connected++;
2326                 }
2327         }
2328
2329         /* we shouldnt try to win this election if we cant be a recmaster */
2330         if ((ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
2331                 em->num_connected = 0;
2332                 em->priority_time = timeval_current();
2333         }
2334
2335         talloc_free(nodemap);
2336 }
2337
2338 /*
2339   see if the given election data wins
2340  */
2341 static bool ctdb_election_win(struct ctdb_recoverd *rec, struct election_message *em)
2342 {
2343         struct election_message myem;
2344         int cmp = 0;
2345
2346         ctdb_election_data(rec, &myem);
2347
2348         /* we cant win if we don't have the recmaster capability */
2349         if ((rec->ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
2350                 return false;
2351         }
2352
2353         /* we cant win if we are banned */
2354         if (rec->node_flags & NODE_FLAGS_BANNED) {
2355                 return false;
2356         }
2357
2358         /* we cant win if we are stopped */
2359         if (rec->node_flags & NODE_FLAGS_STOPPED) {
2360                 return false;
2361         }
2362
2363         /* we will automatically win if the other node is banned */
2364         if (em->node_flags & NODE_FLAGS_BANNED) {
2365                 return true;
2366         }
2367
2368         /* we will automatically win if the other node is banned */
2369         if (em->node_flags & NODE_FLAGS_STOPPED) {
2370                 return true;
2371         }
2372
2373         /* then the longest running node */
2374         if (cmp == 0) {
2375                 cmp = timeval_compare(&em->priority_time, &myem.priority_time);
2376         }
2377
2378         if (cmp == 0) {
2379                 cmp = (int)myem.pnn - (int)em->pnn;
2380         }
2381
2382         return cmp > 0;
2383 }
2384
2385 /*
2386   send out an election request
2387  */
2388 static int send_election_request(struct ctdb_recoverd *rec, uint32_t pnn)
2389 {
2390         int ret;
2391         TDB_DATA election_data;
2392         struct election_message emsg;
2393         uint64_t srvid;
2394         struct ctdb_context *ctdb = rec->ctdb;
2395
2396         srvid = CTDB_SRVID_ELECTION;
2397
2398         ctdb_election_data(rec, &emsg);
2399
2400         election_data.dsize = sizeof(struct election_message);
2401         election_data.dptr  = (unsigned char *)&emsg;
2402
2403
2404         /* first we assume we will win the election and set 
2405            recoverymaster to be ourself on the current node
2406          */
2407         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(),
2408                                      CTDB_CURRENT_NODE, pnn);
2409         if (ret != 0) {
2410                 DEBUG(DEBUG_ERR, (__location__ " failed to set recmaster\n"));
2411                 return -1;
2412         }
2413         rec->recmaster = pnn;
2414
2415         /* send an election message to all active nodes */
2416         DEBUG(DEBUG_INFO,(__location__ " Send election request to all active nodes\n"));
2417         return ctdb_client_send_message(ctdb, CTDB_BROADCAST_ALL, srvid, election_data);
2418 }
2419
2420 /*
2421   we think we are winning the election - send a broadcast election request
2422  */
2423 static void election_send_request(struct tevent_context *ev,
2424                                   struct tevent_timer *te,
2425                                   struct timeval t, void *p)
2426 {
2427         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
2428         int ret;
2429
2430         ret = send_election_request(rec, ctdb_get_pnn(rec->ctdb));
2431         if (ret != 0) {
2432                 DEBUG(DEBUG_ERR,("Failed to send election request!\n"));
2433         }
2434
2435         TALLOC_FREE(rec->send_election_te);
2436 }
2437
2438 /*
2439   handler for memory dumps
2440 */
2441 static void mem_dump_handler(uint64_t srvid, TDB_DATA data, void *private_data)
2442 {
2443         struct ctdb_recoverd *rec = talloc_get_type(
2444                 private_data, struct ctdb_recoverd);
2445         struct ctdb_context *ctdb = rec->ctdb;
2446         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2447         TDB_DATA *dump;
2448         int ret;
2449         struct ctdb_srvid_message *rd;
2450
2451         if (data.dsize != sizeof(struct ctdb_srvid_message)) {
2452                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
2453                 talloc_free(tmp_ctx);
2454                 return;
2455         }
2456         rd = (struct ctdb_srvid_message *)data.dptr;
2457
2458         dump = talloc_zero(tmp_ctx, TDB_DATA);
2459         if (dump == NULL) {
2460                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate memory for memdump\n"));
2461                 talloc_free(tmp_ctx);
2462                 return;
2463         }
2464         ret = ctdb_dump_memory(ctdb, dump);
2465         if (ret != 0) {
2466                 DEBUG(DEBUG_ERR, (__location__ " ctdb_dump_memory() failed\n"));
2467                 talloc_free(tmp_ctx);
2468                 return;
2469         }
2470
2471 DEBUG(DEBUG_ERR, ("recovery master memory dump\n"));            
2472
2473         ret = ctdb_client_send_message(ctdb, rd->pnn, rd->srvid, *dump);
2474         if (ret != 0) {
2475                 DEBUG(DEBUG_ERR,("Failed to send rd memdump reply message\n"));
2476                 talloc_free(tmp_ctx);
2477                 return;
2478         }
2479
2480         talloc_free(tmp_ctx);
2481 }
2482
2483 /*
2484   handler for reload_nodes
2485 */
2486 static void reload_nodes_handler(uint64_t srvid, TDB_DATA data,
2487                                  void *private_data)
2488 {
2489         struct ctdb_recoverd *rec = talloc_get_type(
2490                 private_data, struct ctdb_recoverd);
2491
2492         DEBUG(DEBUG_ERR, (__location__ " Reload nodes file from recovery daemon\n"));
2493
2494         ctdb_load_nodes_file(rec->ctdb);
2495 }
2496
2497
2498 static void recd_node_rebalance_handler(uint64_t srvid, TDB_DATA data,
2499                                         void *private_data)
2500 {
2501         struct ctdb_recoverd *rec = talloc_get_type(
2502                 private_data, struct ctdb_recoverd);
2503         struct ctdb_context *ctdb = rec->ctdb;
2504         uint32_t pnn;
2505         uint32_t *t;
2506         int len;
2507
2508         if (rec->recmaster != ctdb_get_pnn(ctdb)) {
2509                 return;
2510         }
2511
2512         if (data.dsize != sizeof(uint32_t)) {
2513                 DEBUG(DEBUG_ERR,(__location__ " Incorrect size of node rebalance message. Was %zd but expected %zd bytes\n", data.dsize, sizeof(uint32_t)));
2514                 return;
2515         }
2516
2517         pnn = *(uint32_t *)&data.dptr[0];
2518
2519         DEBUG(DEBUG_NOTICE,("Setting up rebalance of IPs to node %u\n", pnn));
2520
2521         /* Copy any existing list of nodes.  There's probably some
2522          * sort of realloc variant that will do this but we need to
2523          * make sure that freeing the old array also cancels the timer
2524          * event for the timeout... not sure if realloc will do that.
2525          */
2526         len = (rec->force_rebalance_nodes != NULL) ?
2527                 talloc_array_length(rec->force_rebalance_nodes) :
2528                 0;
2529
2530         /* This allows duplicates to be added but they don't cause
2531          * harm.  A call to add a duplicate PNN arguably means that
2532          * the timeout should be reset, so this is the simplest
2533          * solution.
2534          */
2535         t = talloc_zero_array(rec, uint32_t, len+1);
2536         CTDB_NO_MEMORY_VOID(ctdb, t);
2537         if (len > 0) {
2538                 memcpy(t, rec->force_rebalance_nodes, sizeof(uint32_t) * len);
2539         }
2540         t[len] = pnn;
2541
2542         talloc_free(rec->force_rebalance_nodes);
2543
2544         rec->force_rebalance_nodes = t;
2545 }
2546
2547
2548
2549 static void recd_update_ip_handler(uint64_t srvid, TDB_DATA data,
2550                                    void *private_data)
2551 {
2552         struct ctdb_recoverd *rec = talloc_get_type(
2553                 private_data, struct ctdb_recoverd);
2554         struct ctdb_public_ip *ip;
2555
2556         if (rec->recmaster != rec->ctdb->pnn) {
2557                 DEBUG(DEBUG_INFO,("Not recmaster, ignore update ip message\n"));
2558                 return;
2559         }
2560
2561         if (data.dsize != sizeof(struct ctdb_public_ip)) {
2562                 DEBUG(DEBUG_ERR,(__location__ " Incorrect size of recd update ip message. Was %zd but expected %zd bytes\n", data.dsize, sizeof(struct ctdb_public_ip)));
2563                 return;
2564         }
2565
2566         ip = (struct ctdb_public_ip *)data.dptr;
2567
2568         update_ip_assignment_tree(rec->ctdb, ip);
2569 }
2570
2571 static void srvid_disable_and_reply(struct ctdb_context *ctdb,
2572                                     TDB_DATA data,
2573                                     struct ctdb_op_state *op_state)
2574 {
2575         struct ctdb_disable_message *r;
2576         uint32_t timeout;
2577         TDB_DATA result;
2578         int32_t ret = 0;
2579
2580         /* Validate input data */
2581         if (data.dsize != sizeof(struct ctdb_disable_message)) {
2582                 DEBUG(DEBUG_ERR,(__location__ " Wrong size for data :%lu "
2583                                  "expecting %lu\n", (long unsigned)data.dsize,
2584                                  (long unsigned)sizeof(struct ctdb_srvid_message)));
2585                 return;
2586         }
2587         if (data.dptr == NULL) {
2588                 DEBUG(DEBUG_ERR,(__location__ " No data received\n"));
2589                 return;
2590         }
2591
2592         r = (struct ctdb_disable_message *)data.dptr;
2593         timeout = r->timeout;
2594
2595         ret = ctdb_op_disable(op_state, ctdb->ev, timeout);
2596         if (ret != 0) {
2597                 goto done;
2598         }
2599
2600         /* Returning our PNN tells the caller that we succeeded */
2601         ret = ctdb_get_pnn(ctdb);
2602 done:
2603         result.dsize = sizeof(int32_t);
2604         result.dptr  = (uint8_t *)&ret;
2605         srvid_request_reply(ctdb, (struct ctdb_srvid_message *)r, result);
2606 }
2607
2608 static void disable_takeover_runs_handler(uint64_t srvid, TDB_DATA data,
2609                                           void *private_data)
2610 {
2611         struct ctdb_recoverd *rec = talloc_get_type(
2612                 private_data, struct ctdb_recoverd);
2613
2614         srvid_disable_and_reply(rec->ctdb, data, rec->takeover_run);
2615 }
2616
2617 /* Backward compatibility for this SRVID */
2618 static void disable_ip_check_handler(uint64_t srvid, TDB_DATA data,
2619                                      void *private_data)
2620 {
2621         struct ctdb_recoverd *rec = talloc_get_type(
2622                 private_data, struct ctdb_recoverd);
2623         uint32_t timeout;
2624
2625         if (data.dsize != sizeof(uint32_t)) {
2626                 DEBUG(DEBUG_ERR,(__location__ " Wrong size for data :%lu "
2627                                  "expecting %lu\n", (long unsigned)data.dsize,
2628                                  (long unsigned)sizeof(uint32_t)));
2629                 return;
2630         }
2631         if (data.dptr == NULL) {
2632                 DEBUG(DEBUG_ERR,(__location__ " No data received\n"));
2633                 return;
2634         }
2635
2636         timeout = *((uint32_t *)data.dptr);
2637
2638         ctdb_op_disable(rec->takeover_run, rec->ctdb->ev, timeout);
2639 }
2640
2641 static void disable_recoveries_handler(uint64_t srvid, TDB_DATA data,
2642                                        void *private_data)
2643 {
2644         struct ctdb_recoverd *rec = talloc_get_type(
2645                 private_data, struct ctdb_recoverd);
2646
2647         srvid_disable_and_reply(rec->ctdb, data, rec->recovery);
2648 }
2649
2650 /*
2651   handler for ip reallocate, just add it to the list of requests and 
2652   handle this later in the monitor_cluster loop so we do not recurse
2653   with other requests to takeover_run()
2654 */
2655 static void ip_reallocate_handler(uint64_t srvid, TDB_DATA data,
2656                                   void *private_data)
2657 {
2658         struct ctdb_srvid_message *request;
2659         struct ctdb_recoverd *rec = talloc_get_type(
2660                 private_data, struct ctdb_recoverd);
2661
2662         if (data.dsize != sizeof(struct ctdb_srvid_message)) {
2663                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
2664                 return;
2665         }
2666
2667         request = (struct ctdb_srvid_message *)data.dptr;
2668
2669         srvid_request_add(rec->ctdb, &rec->reallocate_requests, request);
2670 }
2671
2672 static void process_ipreallocate_requests(struct ctdb_context *ctdb,
2673                                           struct ctdb_recoverd *rec)
2674 {
2675         TDB_DATA result;
2676         int32_t ret;
2677         struct srvid_requests *current;
2678
2679         DEBUG(DEBUG_INFO, ("recovery master forced ip reallocation\n"));
2680
2681         /* Only process requests that are currently pending.  More
2682          * might come in while the takeover run is in progress and
2683          * they will need to be processed later since they might
2684          * be in response flag changes.
2685          */
2686         current = rec->reallocate_requests;
2687         rec->reallocate_requests = NULL;
2688
2689         if (do_takeover_run(rec, rec->nodemap, false)) {
2690                 ret = ctdb_get_pnn(ctdb);
2691         } else {
2692                 ret = -1;
2693         }
2694
2695         result.dsize = sizeof(int32_t);
2696         result.dptr  = (uint8_t *)&ret;
2697
2698         srvid_requests_reply(ctdb, &current, result);
2699 }
2700
2701 /*
2702  * handler for assigning banning credits
2703  */
2704 static void banning_handler(uint64_t srvid, TDB_DATA data, void *private_data)
2705 {
2706         struct ctdb_recoverd *rec = talloc_get_type(
2707                 private_data, struct ctdb_recoverd);
2708         uint32_t ban_pnn;
2709
2710         /* Ignore if we are not recmaster */
2711         if (rec->ctdb->pnn != rec->recmaster) {
2712                 return;
2713         }
2714
2715         if (data.dsize != sizeof(uint32_t)) {
2716                 DEBUG(DEBUG_ERR, (__location__ "invalid data size %zu\n",
2717                                   data.dsize));
2718                 return;
2719         }
2720
2721         ban_pnn = *(uint32_t *)data.dptr;
2722
2723         ctdb_set_culprit_count(rec, ban_pnn, rec->nodemap->num);
2724 }
2725
2726 /*
2727   handler for recovery master elections
2728 */
2729 static void election_handler(uint64_t srvid, TDB_DATA data, void *private_data)
2730 {
2731         struct ctdb_recoverd *rec = talloc_get_type(
2732                 private_data, struct ctdb_recoverd);
2733         struct ctdb_context *ctdb = rec->ctdb;
2734         int ret;
2735         struct election_message *em = (struct election_message *)data.dptr;
2736
2737         /* Ignore election packets from ourself */
2738         if (ctdb->pnn == em->pnn) {
2739                 return;
2740         }
2741
2742         /* we got an election packet - update the timeout for the election */
2743         talloc_free(rec->election_timeout);
2744         rec->election_timeout = tevent_add_timer(
2745                         ctdb->ev, ctdb,
2746                         fast_start ?
2747                                 timeval_current_ofs(0, 500000) :
2748                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0),
2749                         ctdb_election_timeout, rec);
2750
2751         /* someone called an election. check their election data
2752            and if we disagree and we would rather be the elected node, 
2753            send a new election message to all other nodes
2754          */
2755         if (ctdb_election_win(rec, em)) {
2756                 if (!rec->send_election_te) {
2757                         rec->send_election_te = tevent_add_timer(
2758                                         ctdb->ev, rec,
2759                                         timeval_current_ofs(0, 500000),
2760                                         election_send_request, rec);
2761                 }
2762                 return;
2763         }
2764
2765         /* we didn't win */
2766         TALLOC_FREE(rec->send_election_te);
2767
2768         /* Release the recovery lock file */
2769         if (ctdb_recovery_have_lock(ctdb)) {
2770                 ctdb_recovery_unlock(ctdb);
2771         }
2772
2773         clear_ip_assignment_tree(ctdb);
2774
2775         /* ok, let that guy become recmaster then */
2776         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(),
2777                                      CTDB_CURRENT_NODE, em->pnn);
2778         if (ret != 0) {
2779                 DEBUG(DEBUG_ERR, (__location__ " failed to set recmaster"));
2780                 return;
2781         }
2782         rec->recmaster = em->pnn;
2783
2784         return;
2785 }
2786
2787
2788 /*
2789   force the start of the election process
2790  */
2791 static void force_election(struct ctdb_recoverd *rec, uint32_t pnn, 
2792                            struct ctdb_node_map_old *nodemap)
2793 {
2794         int ret;
2795         struct ctdb_context *ctdb = rec->ctdb;
2796
2797         DEBUG(DEBUG_INFO,(__location__ " Force an election\n"));
2798
2799         /* set all nodes to recovery mode to stop all internode traffic */
2800         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE, false);
2801         if (ret != 0) {
2802                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
2803                 return;
2804         }
2805
2806         talloc_free(rec->election_timeout);
2807         rec->election_timeout = tevent_add_timer(
2808                         ctdb->ev, ctdb,
2809                         fast_start ?
2810                                 timeval_current_ofs(0, 500000) :
2811                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0),
2812                         ctdb_election_timeout, rec);
2813
2814         ret = send_election_request(rec, pnn);
2815         if (ret!=0) {
2816                 DEBUG(DEBUG_ERR, (__location__ " failed to initiate recmaster election"));
2817                 return;
2818         }
2819
2820         /* wait for a few seconds to collect all responses */
2821         ctdb_wait_election(rec);
2822 }
2823
2824
2825
2826 /*
2827   handler for when a node changes its flags
2828 */
2829 static void monitor_handler(uint64_t srvid, TDB_DATA data, void *private_data)
2830 {
2831         struct ctdb_recoverd *rec = talloc_get_type(
2832                 private_data, struct ctdb_recoverd);
2833         struct ctdb_context *ctdb = rec->ctdb;
2834         int ret;
2835         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2836         struct ctdb_node_map_old *nodemap=NULL;
2837         TALLOC_CTX *tmp_ctx;
2838         int i;
2839         int disabled_flag_changed;
2840
2841         if (data.dsize != sizeof(*c)) {
2842                 DEBUG(DEBUG_ERR,(__location__ "Invalid data in ctdb_node_flag_change\n"));
2843                 return;
2844         }
2845
2846         tmp_ctx = talloc_new(ctdb);
2847         CTDB_NO_MEMORY_VOID(ctdb, tmp_ctx);
2848
2849         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
2850         if (ret != 0) {
2851                 DEBUG(DEBUG_ERR,(__location__ "ctdb_ctrl_getnodemap failed in monitor_handler\n"));
2852                 talloc_free(tmp_ctx);
2853                 return;         
2854         }
2855
2856
2857         for (i=0;i<nodemap->num;i++) {
2858                 if (nodemap->nodes[i].pnn == c->pnn) break;
2859         }
2860
2861         if (i == nodemap->num) {
2862                 DEBUG(DEBUG_CRIT,(__location__ "Flag change for non-existant node %u\n", c->pnn));
2863                 talloc_free(tmp_ctx);
2864                 return;
2865         }
2866
2867         if (c->old_flags != c->new_flags) {
2868                 DEBUG(DEBUG_NOTICE,("Node %u has changed flags - now 0x%x  was 0x%x\n", c->pnn, c->new_flags, c->old_flags));
2869         }
2870
2871         disabled_flag_changed =  (nodemap->nodes[i].flags ^ c->new_flags) & NODE_FLAGS_DISABLED;
2872
2873         nodemap->nodes[i].flags = c->new_flags;
2874
2875         ret = ctdb_ctrl_getrecmode(ctdb, tmp_ctx, CONTROL_TIMEOUT(),
2876                                    CTDB_CURRENT_NODE, &ctdb->recovery_mode);
2877
2878         if (ret == 0 &&
2879             rec->recmaster == ctdb->pnn &&
2880             ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
2881                 /* Only do the takeover run if the perm disabled or unhealthy
2882                    flags changed since these will cause an ip failover but not
2883                    a recovery.
2884                    If the node became disconnected or banned this will also
2885                    lead to an ip address failover but that is handled 
2886                    during recovery
2887                 */
2888                 if (disabled_flag_changed) {
2889                         rec->need_takeover_run = true;
2890                 }
2891         }
2892
2893         talloc_free(tmp_ctx);
2894 }
2895
2896 /*
2897   handler for when we need to push out flag changes ot all other nodes
2898 */
2899 static void push_flags_handler(uint64_t srvid, TDB_DATA data,
2900                                void *private_data)
2901 {
2902         struct ctdb_recoverd *rec = talloc_get_type(
2903                 private_data, struct ctdb_recoverd);
2904         struct ctdb_context *ctdb = rec->ctdb;
2905         int ret;
2906         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2907         struct ctdb_node_map_old *nodemap=NULL;
2908         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2909         uint32_t *nodes;
2910
2911         /* read the node flags from the recmaster */
2912         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), rec->recmaster,
2913                                    tmp_ctx, &nodemap);
2914         if (ret != 0) {
2915                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", c->pnn));
2916                 talloc_free(tmp_ctx);
2917                 return;
2918         }
2919         if (c->pnn >= nodemap->num) {
2920                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", c->pnn));
2921                 talloc_free(tmp_ctx);
2922                 return;
2923         }
2924
2925         /* send the flags update to all connected nodes */
2926         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2927
2928         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
2929                                       nodes, 0, CONTROL_TIMEOUT(),
2930                                       false, data,
2931                                       NULL, NULL,
2932                                       NULL) != 0) {
2933                 DEBUG(DEBUG_ERR, (__location__ " ctdb_control to modify node flags failed\n"));
2934
2935                 talloc_free(tmp_ctx);
2936                 return;
2937         }
2938
2939         talloc_free(tmp_ctx);
2940 }
2941
2942
2943 struct verify_recmode_normal_data {
2944         uint32_t count;
2945         enum monitor_result status;
2946 };
2947
2948 static void verify_recmode_normal_callback(struct ctdb_client_control_state *state)
2949 {
2950         struct verify_recmode_normal_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmode_normal_data);
2951
2952
2953         /* one more node has responded with recmode data*/
2954         rmdata->count--;
2955
2956         /* if we failed to get the recmode, then return an error and let
2957            the main loop try again.
2958         */
2959         if (state->state != CTDB_CONTROL_DONE) {
2960                 if (rmdata->status == MONITOR_OK) {
2961                         rmdata->status = MONITOR_FAILED;
2962                 }
2963                 return;
2964         }
2965
2966         /* if we got a response, then the recmode will be stored in the
2967            status field
2968         */
2969         if (state->status != CTDB_RECOVERY_NORMAL) {
2970                 DEBUG(DEBUG_NOTICE, ("Node:%u was in recovery mode. Start recovery process\n", state->c->hdr.destnode));
2971                 rmdata->status = MONITOR_RECOVERY_NEEDED;
2972         }
2973
2974         return;
2975 }
2976
2977
2978 /* verify that all nodes are in normal recovery mode */
2979 static enum monitor_result verify_recmode(struct ctdb_context *ctdb, struct ctdb_node_map_old *nodemap)
2980 {
2981         struct verify_recmode_normal_data *rmdata;
2982         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2983         struct ctdb_client_control_state *state;
2984         enum monitor_result status;
2985         int j;
2986         
2987         rmdata = talloc(mem_ctx, struct verify_recmode_normal_data);
2988         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2989         rmdata->count  = 0;
2990         rmdata->status = MONITOR_OK;
2991
2992         /* loop over all active nodes and send an async getrecmode call to 
2993            them*/
2994         for (j=0; j<nodemap->num; j++) {
2995                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2996                         continue;
2997                 }
2998                 state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, 
2999                                         CONTROL_TIMEOUT(), 
3000                                         nodemap->nodes[j].pnn);
3001                 if (state == NULL) {
3002                         /* we failed to send the control, treat this as 
3003                            an error and try again next iteration
3004                         */                      
3005                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmode_send during monitoring\n"));
3006                         talloc_free(mem_ctx);
3007                         return MONITOR_FAILED;
3008                 }
3009
3010                 /* set up the callback functions */
3011                 state->async.fn = verify_recmode_normal_callback;
3012                 state->async.private_data = rmdata;
3013
3014                 /* one more control to wait for to complete */
3015                 rmdata->count++;
3016         }
3017
3018
3019         /* now wait for up to the maximum number of seconds allowed
3020            or until all nodes we expect a response from has replied
3021         */
3022         while (rmdata->count > 0) {
3023                 tevent_loop_once(ctdb->ev);
3024         }
3025
3026         status = rmdata->status;
3027         talloc_free(mem_ctx);
3028         return status;
3029 }
3030
3031
3032 struct verify_recmaster_data {
3033         struct ctdb_recoverd *rec;
3034         uint32_t count;
3035         uint32_t pnn;
3036         enum monitor_result status;
3037 };
3038
3039 static void verify_recmaster_callback(struct ctdb_client_control_state *state)
3040 {
3041         struct verify_recmaster_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmaster_data);
3042
3043
3044         /* one more node has responded with recmaster data*/
3045         rmdata->count--;
3046
3047         /* if we failed to get the recmaster, then return an error and let
3048            the main loop try again.
3049         */
3050         if (state->state != CTDB_CONTROL_DONE) {
3051                 if (rmdata->status == MONITOR_OK) {
3052                         rmdata->status = MONITOR_FAILED;
3053                 }
3054                 return;
3055         }
3056
3057         /* if we got a response, then the recmaster will be stored in the
3058            status field
3059         */
3060         if (state->status != rmdata->pnn) {
3061                 DEBUG(DEBUG_ERR,("Node %d thinks node %d is recmaster. Need a new recmaster election\n", state->c->hdr.destnode, state->status));
3062                 ctdb_set_culprit(rmdata->rec, state->c->hdr.destnode);
3063                 rmdata->status = MONITOR_ELECTION_NEEDED;
3064         }
3065
3066         return;
3067 }
3068
3069
3070 /* verify that all nodes agree that we are the recmaster */
3071 static enum monitor_result verify_recmaster(struct ctdb_recoverd *rec, struct ctdb_node_map_old *nodemap, uint32_t pnn)
3072 {
3073         struct ctdb_context *ctdb = rec->ctdb;
3074         struct verify_recmaster_data *rmdata;
3075         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
3076         struct ctdb_client_control_state *state;
3077         enum monitor_result status;
3078         int j;
3079         
3080         rmdata = talloc(mem_ctx, struct verify_recmaster_data);
3081         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
3082         rmdata->rec    = rec;
3083         rmdata->count  = 0;
3084         rmdata->pnn    = pnn;
3085         rmdata->status = MONITOR_OK;
3086
3087         /* loop over all active nodes and send an async getrecmaster call to
3088            them*/
3089         for (j=0; j<nodemap->num; j++) {
3090                 if (nodemap->nodes[j].pnn == rec->recmaster) {
3091                         continue;
3092                 }
3093                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3094                         continue;
3095                 }
3096                 state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, 
3097                                         CONTROL_TIMEOUT(),
3098                                         nodemap->nodes[j].pnn);
3099                 if (state == NULL) {
3100                         /* we failed to send the control, treat this as 
3101                            an error and try again next iteration
3102                         */                      
3103                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmaster_send during monitoring\n"));
3104                         talloc_free(mem_ctx);
3105                         return MONITOR_FAILED;
3106                 }
3107
3108                 /* set up the callback functions */
3109                 state->async.fn = verify_recmaster_callback;
3110                 state->async.private_data = rmdata;
3111
3112                 /* one more control to wait for to complete */
3113                 rmdata->count++;
3114         }
3115
3116
3117         /* now wait for up to the maximum number of seconds allowed
3118            or until all nodes we expect a response from has replied
3119         */
3120         while (rmdata->count > 0) {
3121                 tevent_loop_once(ctdb->ev);
3122         }
3123
3124         status = rmdata->status;
3125         talloc_free(mem_ctx);
3126         return status;
3127 }
3128
3129 static bool interfaces_have_changed(struct ctdb_context *ctdb,
3130                                     struct ctdb_recoverd *rec)
3131 {
3132         struct ctdb_iface_list_old *ifaces = NULL;
3133         TALLOC_CTX *mem_ctx;
3134         bool ret = false;
3135
3136         mem_ctx = talloc_new(NULL);
3137
3138         /* Read the interfaces from the local node */
3139         if (ctdb_ctrl_get_ifaces(ctdb, CONTROL_TIMEOUT(),
3140                                  CTDB_CURRENT_NODE, mem_ctx, &ifaces) != 0) {
3141                 DEBUG(DEBUG_ERR, ("Unable to get interfaces from local node %u\n", ctdb->pnn));
3142                 /* We could return an error.  However, this will be
3143                  * rare so we'll decide that the interfaces have
3144                  * actually changed, just in case.
3145                  */
3146                 talloc_free(mem_ctx);
3147                 return true;
3148         }
3149
3150         if (!rec->ifaces) {
3151                 /* We haven't been here before so things have changed */
3152                 DEBUG(DEBUG_NOTICE, ("Initial interface fetched\n"));
3153                 ret = true;
3154         } else if (rec->ifaces->num != ifaces->num) {
3155                 /* Number of interfaces has changed */
3156                 DEBUG(DEBUG_NOTICE, ("Interface count changed from %d to %d\n",
3157                                      rec->ifaces->num, ifaces->num));
3158                 ret = true;
3159         } else {
3160                 /* See if interface names or link states have changed */
3161                 int i;
3162                 for (i = 0; i < rec->ifaces->num; i++) {
3163                         struct ctdb_iface * iface = &rec->ifaces->ifaces[i];
3164                         if (strcmp(iface->name, ifaces->ifaces[i].name) != 0) {
3165                                 DEBUG(DEBUG_NOTICE,
3166                                       ("Interface in slot %d changed: %s => %s\n",
3167                                        i, iface->name, ifaces->ifaces[i].name));
3168                                 ret = true;
3169                                 break;
3170                         }
3171                         if (iface->link_state != ifaces->ifaces[i].link_state) {
3172                                 DEBUG(DEBUG_NOTICE,
3173                                       ("Interface %s changed state: %d => %d\n",
3174                                        iface->name, iface->link_state,
3175                                        ifaces->ifaces[i].link_state));
3176                                 ret = true;
3177                                 break;
3178                         }
3179                 }
3180         }
3181
3182         talloc_free(rec->ifaces);
3183         rec->ifaces = talloc_steal(rec, ifaces);
3184
3185         talloc_free(mem_ctx);
3186         return ret;
3187 }
3188
3189 /* Check that the local allocation of public IP addresses is correct
3190  * and do some house-keeping */
3191 static int verify_local_ip_allocation(struct ctdb_context *ctdb,
3192                                       struct ctdb_recoverd *rec,
3193                                       uint32_t pnn,
3194                                       struct ctdb_node_map_old *nodemap)
3195 {
3196         TALLOC_CTX *mem_ctx = talloc_new(NULL);
3197         int ret, j;
3198         bool need_takeover_run = false;
3199         struct ctdb_public_ip_list_old *ips = NULL;
3200
3201         /* If we are not the recmaster then do some housekeeping */
3202         if (rec->recmaster != pnn) {
3203                 /* Ignore any IP reallocate requests - only recmaster
3204                  * processes them
3205                  */
3206                 TALLOC_FREE(rec->reallocate_requests);
3207                 /* Clear any nodes that should be force rebalanced in
3208                  * the next takeover run.  If the recovery master role
3209                  * has moved then we don't want to process these some
3210                  * time in the future.
3211                  */
3212                 TALLOC_FREE(rec->force_rebalance_nodes);
3213         }
3214
3215         /* Return early if disabled... */
3216         if (ctdb->tunable.disable_ip_failover != 0 ||
3217             ctdb_op_is_disabled(rec->takeover_run)) {
3218                 return  0;
3219         }
3220
3221         if (interfaces_have_changed(ctdb, rec)) {
3222                 need_takeover_run = true;
3223         }
3224
3225         /* If there are unhosted IPs but this node can host them then
3226          * trigger an IP reallocation */
3227
3228         /* Read *available* IPs from local node */
3229         ret = ctdb_ctrl_get_public_ips_flags(
3230                 ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx,
3231                 CTDB_PUBLIC_IP_FLAGS_ONLY_AVAILABLE, &ips);
3232         if (ret != 0) {
3233                 DEBUG(DEBUG_ERR, ("Unable to retrieve available public IPs\n"));
3234                 talloc_free(mem_ctx);
3235                 return -1;
3236         }
3237
3238         for (j=0; j<ips->num; j++) {
3239                 if (ips->ips[j].pnn == -1 &&
3240                     nodemap->nodes[pnn].flags == 0) {
3241                         DEBUG(DEBUG_WARNING,
3242                               ("Unassigned IP %s can be served by this node\n",
3243                                ctdb_addr_to_str(&ips->ips[j].addr)));
3244                         need_takeover_run = true;
3245                 }
3246         }
3247
3248         talloc_free(ips);
3249
3250         if (!ctdb->do_checkpublicip) {
3251                 goto done;
3252         }
3253
3254         /* Validate the IP addresses that this node has on network
3255          * interfaces.  If there is an inconsistency between reality
3256          * and the state expected by CTDB then try to fix it by
3257          * triggering an IP reallocation or releasing extraneous IP
3258          * addresses. */
3259
3260         /* Read *known* IPs from local node */
3261         ret = ctdb_ctrl_get_public_ips_flags(
3262                 ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, 0, &ips);
3263         if (ret != 0) {
3264                 DEBUG(DEBUG_ERR, ("Unable to retrieve known public IPs\n"));
3265                 talloc_free(mem_ctx);
3266                 return -1;
3267         }
3268
3269         for (j=0; j<ips->num; j++) {
3270                 if (ips->ips[j].pnn == pnn) {
3271                         if (!ctdb_sys_have_ip(&ips->ips[j].addr)) {
3272                                 DEBUG(DEBUG_ERR,
3273                                       ("Assigned IP %s not on an interface\n",
3274                                        ctdb_addr_to_str(&ips->ips[j].addr)));
3275                                 need_takeover_run = true;
3276                         }
3277                 } else {
3278                         if (ctdb_sys_have_ip(&ips->ips[j].addr)) {
3279                                 DEBUG(DEBUG_ERR,
3280                                       ("IP %s incorrectly on an interface - releasing\n",
3281                                        ctdb_addr_to_str(&ips->ips[j].addr)));
3282                                 ret = ctdb_ctrl_release_ip(ctdb,
3283                                                            CONTROL_TIMEOUT(),
3284                                                            CTDB_CURRENT_NODE,
3285                                                            &ips->ips[j]);
3286                                 if (ret != 0) {
3287                                         DEBUG(DEBUG_ERR,
3288                                               ("Failed to release IP address\n"));
3289                                 }
3290                         }
3291                 }
3292         }
3293
3294 done:
3295         if (need_takeover_run) {
3296                 struct ctdb_srvid_message rd;
3297                 TDB_DATA data;
3298
3299                 DEBUG(DEBUG_NOTICE,("Trigger takeoverrun\n"));
3300
3301                 ZERO_STRUCT(rd);
3302                 rd.pnn = ctdb->pnn;
3303                 rd.srvid = 0;
3304                 data.dptr = (uint8_t *)&rd;
3305                 data.dsize = sizeof(rd);
3306
3307                 ret = ctdb_client_send_message(ctdb, rec->recmaster, CTDB_SRVID_TAKEOVER_RUN, data);
3308                 if (ret != 0) {
3309                         DEBUG(DEBUG_ERR,
3310                               ("Failed to send takeover run request\n"));
3311                 }
3312         }
3313         talloc_free(mem_ctx);
3314         return 0;
3315 }
3316
3317
3318 static void async_getnodemap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
3319 {
3320         struct ctdb_node_map_old **remote_nodemaps = callback_data;
3321
3322         if (node_pnn >= ctdb->num_nodes) {
3323                 DEBUG(DEBUG_ERR,(__location__ " pnn from invalid node\n"));
3324                 return;
3325         }
3326
3327         remote_nodemaps[node_pnn] = (struct ctdb_node_map_old *)talloc_steal(remote_nodemaps, outdata.dptr);
3328
3329 }
3330
3331 static int get_remote_nodemaps(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx,
3332         struct ctdb_node_map_old *nodemap,
3333         struct ctdb_node_map_old **remote_nodemaps)
3334 {
3335         uint32_t *nodes;
3336
3337         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
3338         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_NODEMAP,
3339                                         nodes, 0,
3340                                         CONTROL_TIMEOUT(), false, tdb_null,
3341                                         async_getnodemap_callback,
3342                                         NULL,
3343                                         remote_nodemaps) != 0) {
3344                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull all remote nodemaps\n"));
3345
3346                 return -1;
3347         }
3348
3349         return 0;
3350 }
3351
3352 static int update_recovery_lock_file(struct ctdb_context *ctdb)
3353 {
3354         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3355         const char *reclockfile;
3356
3357         if (ctdb_ctrl_getreclock(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &reclockfile) != 0) {
3358                 DEBUG(DEBUG_ERR,("Failed to read reclock file from daemon\n"));
3359                 talloc_free(tmp_ctx);
3360                 return -1;      
3361         }
3362
3363         if (reclockfile == NULL) {
3364                 if (ctdb->recovery_lock_file != NULL) {
3365                         DEBUG(DEBUG_NOTICE,("Recovery lock file disabled\n"));
3366                         talloc_free(ctdb->recovery_lock_file);
3367                         ctdb->recovery_lock_file = NULL;
3368                         ctdb_recovery_unlock(ctdb);
3369                 }
3370                 talloc_free(tmp_ctx);
3371                 return 0;
3372         }
3373
3374         if (ctdb->recovery_lock_file == NULL) {
3375                 DEBUG(DEBUG_NOTICE,
3376                       ("Recovery lock file enabled (%s)\n", reclockfile));
3377                 ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
3378                 ctdb_recovery_unlock(ctdb);
3379                 talloc_free(tmp_ctx);
3380                 return 0;
3381         }
3382
3383
3384         if (!strcmp(reclockfile, ctdb->recovery_lock_file)) {
3385                 talloc_free(tmp_ctx);
3386                 return 0;
3387         }
3388
3389         DEBUG(DEBUG_NOTICE,
3390               ("Recovery lock file changed (now %s)\n", reclockfile));
3391         talloc_free(ctdb->recovery_lock_file);
3392         ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
3393         ctdb_recovery_unlock(ctdb);
3394
3395         talloc_free(tmp_ctx);
3396         return 0;
3397 }
3398
3399 static bool validate_recovery_master(struct ctdb_recoverd *rec,
3400                                      TALLOC_CTX *mem_ctx)
3401 {
3402         struct ctdb_context *ctdb = rec->ctdb;
3403         uint32_t pnn = ctdb_get_pnn(ctdb);
3404         struct ctdb_node_map_old *nodemap = rec->nodemap;
3405         struct ctdb_node_map_old *recmaster_nodemap = NULL;
3406         int ret;
3407
3408         /* When recovery daemon is started, recmaster is set to
3409          * "unknown" so it knows to start an election.
3410          */
3411         if (rec->recmaster == CTDB_UNKNOWN_PNN) {
3412                 DEBUG(DEBUG_NOTICE,
3413                       ("Initial recovery master set - forcing election\n"));
3414                 force_election(rec, pnn, nodemap);
3415                 return false;
3416         }
3417
3418         /*
3419          * If the current recmaster does not have CTDB_CAP_RECMASTER,
3420          * but we have, then force an election and try to become the new
3421          * recmaster.
3422          */
3423         if (!ctdb_node_has_capabilities(rec->caps,
3424                                         rec->recmaster,
3425                                         CTDB_CAP_RECMASTER) &&
3426             (rec->ctdb->capabilities & CTDB_CAP_RECMASTER) &&
3427             !(nodemap->nodes[pnn].flags & NODE_FLAGS_INACTIVE)) {
3428                 DEBUG(DEBUG_ERR,
3429                       (" Current recmaster node %u does not have CAP_RECMASTER,"
3430                        " but we (node %u) have - force an election\n",
3431                        rec->recmaster, pnn));
3432                 force_election(rec, pnn, nodemap);
3433                 return false;
3434         }
3435
3436         /* Verify that the master node has not been deleted.  This
3437          * should not happen because a node should always be shutdown
3438          * before being deleted, causing a new master to be elected
3439          * before now.  However, if something strange has happened
3440          * then checking here will ensure we don't index beyond the
3441          * end of the nodemap array. */
3442         if (rec->recmaster >= nodemap->num) {
3443                 DEBUG(DEBUG_ERR,
3444                       ("Recmaster node %u has been deleted. Force election\n",
3445                        rec->recmaster));
3446                 force_election(rec, pnn, nodemap);
3447                 return false;
3448         }
3449
3450         /* if recovery master is disconnected/deleted we must elect a new recmaster */
3451         if (nodemap->nodes[rec->recmaster].flags &
3452             (NODE_FLAGS_DISCONNECTED|NODE_FLAGS_DELETED)) {
3453                 DEBUG(DEBUG_NOTICE,
3454                       ("Recmaster node %u is disconnected/deleted. Force election\n",
3455                        rec->recmaster));
3456                 force_election(rec, pnn, nodemap);
3457                 return false;
3458         }
3459
3460         /* get nodemap from the recovery master to check if it is inactive */
3461         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), rec->recmaster,
3462                                    mem_ctx, &recmaster_nodemap);
3463         if (ret != 0) {
3464                 DEBUG(DEBUG_ERR,
3465                       (__location__
3466                        " Unable to get nodemap from recovery master %u\n",
3467                           rec->recmaster));
3468                 /* No election, just error */
3469                 return false;
3470         }
3471
3472
3473         if ((recmaster_nodemap->nodes[rec->recmaster].flags & NODE_FLAGS_INACTIVE) &&
3474             (rec->node_flags & NODE_FLAGS_INACTIVE) == 0) {
3475                 DEBUG(DEBUG_NOTICE,
3476                       ("Recmaster node %u is inactive. Force election\n",
3477                        rec->recmaster));
3478                 /*
3479                  * update our nodemap to carry the recmaster's notion of
3480                  * its own flags, so that we don't keep freezing the
3481                  * inactive recmaster node...
3482                  */
3483                 nodemap->nodes[rec->recmaster].flags =
3484                         recmaster_nodemap->nodes[rec->recmaster].flags;
3485                 force_election(rec, pnn, nodemap);
3486                 return false;
3487         }
3488
3489         return true;
3490 }
3491
3492 static void main_loop(struct ctdb_context *ctdb, struct ctdb_recoverd *rec,
3493                       TALLOC_CTX *mem_ctx)
3494 {
3495         uint32_t pnn;
3496         struct ctdb_node_map_old *nodemap=NULL;
3497         struct ctdb_node_map_old **remote_nodemaps=NULL;
3498         struct ctdb_vnn_map *vnnmap=NULL;
3499         struct ctdb_vnn_map *remote_vnnmap=NULL;
3500         uint32_t num_lmasters;
3501         int32_t debug_level;
3502         int i, j, ret;
3503         bool self_ban;
3504
3505
3506         /* verify that the main daemon is still running */
3507         if (ctdb_kill(ctdb, ctdb->ctdbd_pid, 0) != 0) {
3508                 DEBUG(DEBUG_CRIT,("CTDB daemon is no longer available. Shutting down recovery daemon\n"));
3509                 exit(-1);
3510         }
3511
3512         /* ping the local daemon to tell it we are alive */
3513         ctdb_ctrl_recd_ping(ctdb);
3514
3515         if (rec->election_timeout) {
3516                 /* an election is in progress */
3517                 return;
3518         }
3519
3520         /* read the debug level from the parent and update locally */
3521         ret = ctdb_ctrl_get_debuglevel(ctdb, CTDB_CURRENT_NODE, &debug_level);
3522         if (ret !=0) {
3523                 DEBUG(DEBUG_ERR, (__location__ " Failed to read debuglevel from parent\n"));
3524                 return;
3525         }
3526         DEBUGLEVEL = debug_level;
3527
3528         /* get relevant tunables */
3529         ret = ctdb_ctrl_get_all_tunables(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->tunable);
3530         if (ret != 0) {
3531                 DEBUG(DEBUG_ERR,("Failed to get tunables - retrying\n"));
3532                 return;
3533         }
3534
3535         /* get runstate */
3536         ret = ctdb_ctrl_get_runstate(ctdb, CONTROL_TIMEOUT(),
3537                                      CTDB_CURRENT_NODE, &ctdb->runstate);
3538         if (ret != 0) {
3539                 DEBUG(DEBUG_ERR, ("Failed to get runstate - retrying\n"));
3540                 return;
3541         }
3542
3543         /* get the current recovery lock file from the server */
3544         if (update_recovery_lock_file(ctdb) != 0) {
3545                 DEBUG(DEBUG_ERR,("Failed to update the recovery lock file\n"));
3546                 return;
3547         }
3548
3549         pnn = ctdb_get_pnn(ctdb);
3550
3551         /* get nodemap */
3552         TALLOC_FREE(rec->nodemap);
3553         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), pnn, rec, &rec->nodemap);
3554         if (ret != 0) {
3555                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", pnn));
3556                 return;
3557         }
3558         nodemap = rec->nodemap;
3559
3560         /* remember our own node flags */
3561         rec->node_flags = nodemap->nodes[pnn].flags;
3562
3563         ban_misbehaving_nodes(rec, &self_ban);
3564         if (self_ban) {
3565                 DEBUG(DEBUG_NOTICE, ("This node was banned, restart main_loop\n"));
3566                 return;
3567         }
3568
3569         /* if the local daemon is STOPPED or BANNED, we verify that the databases are
3570            also frozen and that the recmode is set to active.
3571         */
3572         if (rec->node_flags & (NODE_FLAGS_STOPPED | NODE_FLAGS_BANNED)) {
3573                 /* If this node has become inactive then we want to
3574                  * reduce the chances of it taking over the recovery
3575                  * master role when it becomes active again.  This
3576                  * helps to stabilise the recovery master role so that
3577                  * it stays on the most stable node.
3578                  */
3579                 rec->priority_time = timeval_current();
3580
3581                 ret = ctdb_ctrl_getrecmode(ctdb, mem_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->recovery_mode);
3582                 if (ret != 0) {
3583                         DEBUG(DEBUG_ERR,(__location__ " Failed to read recmode from local node\n"));
3584                 }
3585                 if (ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
3586                         DEBUG(DEBUG_ERR,("Node is stopped or banned but recovery mode is not active. Activate recovery mode and lock databases\n"));
3587
3588                         ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
3589                         if (ret != 0) {
3590                                 DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode in STOPPED or BANNED state\n"));
3591
3592                                 return;
3593                         }
3594                         ret = ctdb_ctrl_freeze(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
3595                         if (ret != 0) {
3596                                 DEBUG(DEBUG_ERR,(__location__ " Failed to freeze node in STOPPED or BANNED state\n"));
3597                                 return;
3598                         }
3599                 }
3600
3601                 /* If this node is stopped or banned then it is not the recovery
3602                  * master, so don't do anything. This prevents stopped or banned
3603                  * node from starting election and sending unnecessary controls.
3604                  */
3605                 return;
3606         }
3607
3608         /* Retrieve capabilities from all connected nodes */
3609         ret = update_capabilities(rec, nodemap);
3610         if (ret != 0) {
3611                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
3612                 return;
3613         }
3614
3615         if (! validate_recovery_master(rec, mem_ctx)) {
3616                 return;
3617         }
3618
3619         /* Check if an IP takeover run is needed and trigger one if
3620          * necessary */
3621         verify_local_ip_allocation(ctdb, rec, pnn, nodemap);
3622
3623         /* if we are not the recmaster then we do not need to check
3624            if recovery is needed
3625          */
3626         if (pnn != rec->recmaster) {
3627                 return;
3628         }
3629
3630
3631         /* ensure our local copies of flags are right */
3632         ret = update_local_flags(rec, nodemap);
3633         if (ret != 0) {
3634                 DEBUG(DEBUG_ERR,("Unable to update local flags\n"));
3635                 return;
3636         }
3637
3638         if (ctdb->num_nodes != nodemap->num) {
3639                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) reloading nodes file\n", ctdb->num_nodes, nodemap->num));
3640                 ctdb_load_nodes_file(ctdb);
3641                 return;
3642         }
3643
3644         /* verify that all active nodes agree that we are the recmaster */
3645         switch (verify_recmaster(rec, nodemap, pnn)) {
3646         case MONITOR_RECOVERY_NEEDED:
3647                 /* can not happen */
3648                 return;
3649         case MONITOR_ELECTION_NEEDED:
3650                 force_election(rec, pnn, nodemap);
3651                 return;
3652         case MONITOR_OK:
3653                 break;
3654         case MONITOR_FAILED:
3655                 return;
3656         }
3657
3658
3659         /* get the vnnmap */
3660         ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &vnnmap);
3661         if (ret != 0) {
3662                 DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from node %u\n", pnn));
3663                 return;
3664         }
3665
3666         if (rec->need_recovery) {
3667                 /* a previous recovery didn't finish */
3668                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3669                 return;
3670         }
3671
3672         /* verify that all active nodes are in normal mode 
3673            and not in recovery mode 
3674         */
3675         switch (verify_recmode(ctdb, nodemap)) {
3676         case MONITOR_RECOVERY_NEEDED:
3677                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3678                 return;
3679         case MONITOR_FAILED:
3680                 return;
3681         case MONITOR_ELECTION_NEEDED:
3682                 /* can not happen */
3683         case MONITOR_OK:
3684                 break;
3685         }
3686
3687
3688         if (ctdb->recovery_lock_file != NULL) {
3689                 /* We must already hold the recovery lock */
3690                 if (!ctdb_recovery_have_lock(ctdb)) {
3691                         DEBUG(DEBUG_ERR,("Failed recovery lock sanity check.  Force a recovery\n"));
3692                         ctdb_set_culprit(rec, ctdb->pnn);
3693                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3694                         return;
3695                 }
3696         }
3697
3698
3699         /* if there are takeovers requested, perform it and notify the waiters */
3700         if (!ctdb_op_is_disabled(rec->takeover_run) &&
3701             rec->reallocate_requests) {
3702                 process_ipreallocate_requests(ctdb, rec);
3703         }
3704
3705         /* If recoveries are disabled then there is no use doing any
3706          * nodemap or flags checks.  Recoveries might be disabled due
3707          * to "reloadnodes", so doing these checks might cause an
3708          * unnecessary recovery.  */
3709         if (ctdb_op_is_disabled(rec->recovery)) {
3710                 return;
3711         }
3712
3713         /* get the nodemap for all active remote nodes
3714          */
3715         remote_nodemaps = talloc_array(mem_ctx, struct ctdb_node_map_old *, nodemap->num);
3716         if (remote_nodemaps == NULL) {
3717                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate remote nodemap array\n"));
3718                 return;
3719         }
3720         for(i=0; i<nodemap->num; i++) {
3721                 remote_nodemaps[i] = NULL;
3722         }
3723         if (get_remote_nodemaps(ctdb, mem_ctx, nodemap, remote_nodemaps) != 0) {
3724                 DEBUG(DEBUG_ERR,(__location__ " Failed to read remote nodemaps\n"));
3725                 return;
3726         } 
3727
3728         /* verify that all other nodes have the same nodemap as we have
3729         */
3730         for (j=0; j<nodemap->num; j++) {
3731                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3732                         continue;
3733                 }
3734
3735                 if (remote_nodemaps[j] == NULL) {
3736                         DEBUG(DEBUG_ERR,(__location__ " Did not get a remote nodemap for node %d, restarting monitoring\n", j));
3737                         ctdb_set_culprit(rec, j);
3738
3739                         return;
3740                 }
3741
3742                 /* if the nodes disagree on how many nodes there are
3743                    then this is a good reason to try recovery
3744                  */
3745                 if (remote_nodemaps[j]->num != nodemap->num) {
3746                         DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different node count. %u vs %u of the local node\n",
3747                                   nodemap->nodes[j].pnn, remote_nodemaps[j]->num, nodemap->num));
3748                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3749                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3750                         return;
3751                 }
3752
3753                 /* if the nodes disagree on which nodes exist and are
3754                    active, then that is also a good reason to do recovery
3755                  */
3756                 for (i=0;i<nodemap->num;i++) {
3757                         if (remote_nodemaps[j]->nodes[i].pnn != nodemap->nodes[i].pnn) {
3758                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different nodemap pnn for %d (%u vs %u).\n", 
3759                                           nodemap->nodes[j].pnn, i, 
3760                                           remote_nodemaps[j]->nodes[i].pnn, nodemap->nodes[i].pnn));
3761                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3762                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3763                                             vnnmap);
3764                                 return;
3765                         }
3766                 }
3767         }
3768
3769         /*
3770          * Update node flags obtained from each active node. This ensure we have
3771          * up-to-date information for all the nodes.
3772          */
3773         for (j=0; j<nodemap->num; j++) {
3774                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3775                         continue;
3776                 }
3777                 nodemap->nodes[j].flags = remote_nodemaps[j]->nodes[j].flags;
3778         }
3779
3780         for (j=0; j<nodemap->num; j++) {
3781                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3782                         continue;
3783                 }
3784
3785                 /* verify the flags are consistent
3786                 */
3787                 for (i=0; i<nodemap->num; i++) {
3788                         if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3789                                 continue;
3790                         }
3791                         
3792                         if (nodemap->nodes[i].flags != remote_nodemaps[j]->nodes[i].flags) {
3793                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different flags for node %u. It has 0x%02x vs our 0x%02x\n", 
3794                                   nodemap->nodes[j].pnn, 
3795                                   nodemap->nodes[i].pnn, 
3796                                   remote_nodemaps[j]->nodes[i].flags,
3797                                   nodemap->nodes[i].flags));
3798                                 if (i == j) {
3799                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from remote node %d for cluster update of its own flags\n", remote_nodemaps[j]->nodes[i].flags, j));
3800                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, remote_nodemaps[j]->nodes[i].flags);
3801                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3802                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3803                                                     vnnmap);
3804                                         return;
3805                                 } else {
3806                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from local recmaster node for cluster update of node %d flags\n", nodemap->nodes[i].flags, i));
3807                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, nodemap->nodes[i].flags);
3808                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3809                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3810                                                     vnnmap);
3811                                         return;
3812                                 }
3813                         }
3814                 }
3815         }
3816
3817
3818         /* count how many active nodes there are */
3819         num_lmasters  = 0;
3820         for (i=0; i<nodemap->num; i++) {
3821                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
3822                         if (ctdb_node_has_capabilities(rec->caps,
3823                                                        ctdb->nodes[i]->pnn,
3824                                                        CTDB_CAP_LMASTER)) {
3825                                 num_lmasters++;
3826                         }
3827                 }
3828         }
3829
3830
3831         /* There must be the same number of lmasters in the vnn map as
3832          * there are active nodes with the lmaster capability...  or
3833          * do a recovery.
3834          */
3835         if (vnnmap->size != num_lmasters) {
3836                 DEBUG(DEBUG_ERR, (__location__ " The vnnmap count is different from the number of active lmaster nodes: %u vs %u\n",
3837                           vnnmap->size, num_lmasters));
3838                 ctdb_set_culprit(rec, ctdb->pnn);
3839                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3840                 return;
3841         }
3842
3843         /* verify that all active nodes in the nodemap also exist in 
3844            the vnnmap.
3845          */
3846         for (j=0; j<nodemap->num; j++) {
3847                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3848                         continue;
3849                 }
3850                 if (nodemap->nodes[j].pnn == pnn) {
3851                         continue;
3852                 }
3853
3854                 for (i=0; i<vnnmap->size; i++) {
3855                         if (vnnmap->map[i] == nodemap->nodes[j].pnn) {
3856                                 break;
3857                         }
3858                 }
3859                 if (i == vnnmap->size) {
3860                         DEBUG(DEBUG_ERR, (__location__ " Node %u is active in the nodemap but did not exist in the vnnmap\n", 
3861                                   nodemap->nodes[j].pnn));
3862                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3863                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3864                         return;
3865                 }
3866         }
3867
3868         
3869         /* verify that all other nodes have the same vnnmap
3870            and are from the same generation
3871          */
3872         for (j=0; j<nodemap->num; j++) {
3873                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3874                         continue;
3875                 }
3876                 if (nodemap->nodes[j].pnn == pnn) {
3877                         continue;
3878                 }
3879
3880                 ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
3881                                           mem_ctx, &remote_vnnmap);
3882                 if (ret != 0) {
3883                         DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from remote node %u\n", 
3884                                   nodemap->nodes[j].pnn));
3885                         return;
3886                 }
3887
3888                 /* verify the vnnmap generation is the same */
3889                 if (vnnmap->generation != remote_vnnmap->generation) {
3890                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different generation of vnnmap. %u vs %u (ours)\n", 
3891                                   nodemap->nodes[j].pnn, remote_vnnmap->generation, vnnmap->generation));
3892                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3893                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3894                         return;
3895                 }
3896
3897                 /* verify the vnnmap size is the same */
3898                 if (vnnmap->size != remote_vnnmap->size) {
3899                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different size of vnnmap. %u vs %u (ours)\n", 
3900                                   nodemap->nodes[j].pnn, remote_vnnmap->size, vnnmap->size));
3901                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3902                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3903                         return;
3904                 }
3905
3906                 /* verify the vnnmap is the same */
3907                 for (i=0;i<vnnmap->size;i++) {
3908                         if (remote_vnnmap->map[i] != vnnmap->map[i]) {
3909                                 DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different vnnmap.\n", 
3910                                           nodemap->nodes[j].pnn));
3911                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3912                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3913                                             vnnmap);
3914                                 return;
3915                         }
3916                 }
3917         }
3918
3919         /* we might need to change who has what IP assigned */
3920         if (rec->need_takeover_run) {
3921                 /* If takeover run fails, then the offending nodes are
3922                  * assigned ban culprit counts. And we re-try takeover.
3923                  * If takeover run fails repeatedly, the node would get
3924                  * banned.
3925                  */
3926                 do_takeover_run(rec, nodemap, true);
3927         }
3928 }
3929
3930 /*
3931   the main monitoring loop
3932  */
3933 static void monitor_cluster(struct ctdb_context *ctdb)
3934 {
3935         struct ctdb_recoverd *rec;
3936
3937         DEBUG(DEBUG_NOTICE,("monitor_cluster starting\n"));
3938
3939         rec = talloc_zero(ctdb, struct ctdb_recoverd);
3940         CTDB_NO_MEMORY_FATAL(ctdb, rec);
3941
3942         rec->ctdb = ctdb;
3943         rec->recmaster = CTDB_UNKNOWN_PNN;
3944
3945         rec->takeover_run = ctdb_op_init(rec, "takeover runs");
3946         CTDB_NO_MEMORY_FATAL(ctdb, rec->takeover_run);
3947
3948         rec->recovery = ctdb_op_init(rec, "recoveries");
3949         CTDB_NO_MEMORY_FATAL(ctdb, rec->recovery);
3950
3951         rec->priority_time = timeval_current();
3952
3953         /* register a message port for sending memory dumps */
3954         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_MEM_DUMP, mem_dump_handler, rec);
3955
3956         /* when a node is assigned banning credits */
3957         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_BANNING,
3958                                         banning_handler, rec);
3959
3960         /* register a message port for recovery elections */
3961         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_ELECTION, election_handler, rec);
3962
3963         /* when nodes are disabled/enabled */
3964         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_SET_NODE_FLAGS, monitor_handler, rec);
3965
3966         /* when we are asked to puch out a flag change */
3967         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_PUSH_NODE_FLAGS, push_flags_handler, rec);
3968
3969         /* register a message port for vacuum fetch */
3970         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_VACUUM_FETCH, vacuum_fetch_handler, rec);
3971
3972         /* register a message port for reloadnodes  */
3973         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RELOAD_NODES, reload_nodes_handler, rec);
3974
3975         /* register a message port for performing a takeover run */
3976         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_TAKEOVER_RUN, ip_reallocate_handler, rec);
3977
3978         /* register a message port for disabling the ip check for a short while */
3979         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_DISABLE_IP_CHECK, disable_ip_check_handler, rec);
3980
3981         /* register a message port for updating the recovery daemons node assignment for an ip */
3982         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RECD_UPDATE_IP, recd_update_ip_handler, rec);
3983
3984         /* register a message port for forcing a rebalance of a node next
3985            reallocation */
3986         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_REBALANCE_NODE, recd_node_rebalance_handler, rec);
3987
3988         /* Register a message port for disabling takeover runs */
3989         ctdb_client_set_message_handler(ctdb,
3990                                         CTDB_SRVID_DISABLE_TAKEOVER_RUNS,
3991                                         disable_takeover_runs_handler, rec);
3992
3993         /* Register a message port for disabling recoveries */
3994         ctdb_client_set_message_handler(ctdb,
3995                                         CTDB_SRVID_DISABLE_RECOVERIES,
3996                                         disable_recoveries_handler, rec);
3997
3998         /* register a message port for detaching database */
3999         ctdb_client_set_message_handler(ctdb,
4000                                         CTDB_SRVID_DETACH_DATABASE,
4001                                         detach_database_handler, rec);
4002
4003         for (;;) {
4004                 TALLOC_CTX *mem_ctx = talloc_new(ctdb);
4005                 struct timeval start;
4006                 double elapsed;
4007
4008                 if (!mem_ctx) {
4009                         DEBUG(DEBUG_CRIT,(__location__
4010                                           " Failed to create temp context\n"));
4011                         exit(-1);
4012                 }
4013
4014                 start = timeval_current();
4015                 main_loop(ctdb, rec, mem_ctx);
4016                 talloc_free(mem_ctx);
4017
4018                 /* we only check for recovery once every second */
4019                 elapsed = timeval_elapsed(&start);
4020                 if (elapsed < ctdb->tunable.recover_interval) {
4021                         ctdb_wait_timeout(ctdb, ctdb->tunable.recover_interval
4022                                           - elapsed);
4023                 }
4024         }
4025 }
4026
4027 /*
4028   event handler for when the main ctdbd dies
4029  */
4030 static void ctdb_recoverd_parent(struct tevent_context *ev,
4031                                  struct tevent_fd *fde,
4032                                  uint16_t flags, void *private_data)
4033 {
4034         DEBUG(DEBUG_ALERT,("recovery daemon parent died - exiting\n"));
4035         _exit(1);
4036 }
4037
4038 /*
4039   called regularly to verify that the recovery daemon is still running
4040  */
4041 static void ctdb_check_recd(struct tevent_context *ev,
4042                             struct tevent_timer *te,
4043                             struct timeval yt, void *p)
4044 {
4045         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
4046
4047         if (ctdb_kill(ctdb, ctdb->recoverd_pid, 0) != 0) {
4048                 DEBUG(DEBUG_ERR,("Recovery daemon (pid:%d) is no longer running. Trying to restart recovery daemon.\n", (int)ctdb->recoverd_pid));
4049
4050                 tevent_add_timer(ctdb->ev, ctdb, timeval_zero(),
4051                                  ctdb_restart_recd, ctdb);
4052
4053                 return;
4054         }
4055
4056         tevent_add_timer(ctdb->ev, ctdb->recd_ctx,
4057                          timeval_current_ofs(30, 0),
4058                          ctdb_check_recd, ctdb);
4059 }
4060
4061 static void recd_sig_child_handler(struct tevent_context *ev,
4062                                    struct tevent_signal *se, int signum,
4063                                    int count, void *dont_care,
4064                                    void *private_data)
4065 {
4066 //      struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
4067         int status;
4068         pid_t pid = -1;
4069
4070         while (pid != 0) {
4071                 pid = waitpid(-1, &status, WNOHANG);
4072                 if (pid == -1) {
4073                         if (errno != ECHILD) {
4074                                 DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%s(%d)\n", strerror(errno),errno));
4075                         }
4076                         return;
4077                 }
4078                 if (pid > 0) {
4079                         DEBUG(DEBUG_DEBUG, ("RECD SIGCHLD from %d\n", (int)pid));
4080                 }
4081         }
4082 }
4083
4084 /*
4085   startup the recovery daemon as a child of the main ctdb daemon
4086  */
4087 int ctdb_start_recoverd(struct ctdb_context *ctdb)
4088 {
4089         int fd[2];
4090         struct tevent_signal *se;
4091         struct tevent_fd *fde;
4092
4093         if (pipe(fd) != 0) {
4094                 return -1;
4095         }
4096
4097         ctdb->recoverd_pid = ctdb_fork(ctdb);
4098         if (ctdb->recoverd_pid == -1) {
4099                 return -1;
4100         }
4101
4102         if (ctdb->recoverd_pid != 0) {
4103                 talloc_free(ctdb->recd_ctx);
4104                 ctdb->recd_ctx = talloc_new(ctdb);
4105                 CTDB_NO_MEMORY(ctdb, ctdb->recd_ctx);
4106
4107                 close(fd[0]);
4108                 tevent_add_timer(ctdb->ev, ctdb->recd_ctx,
4109                                  timeval_current_ofs(30, 0),
4110                                  ctdb_check_recd, ctdb);
4111                 return 0;
4112         }
4113
4114         close(fd[1]);
4115
4116         srandom(getpid() ^ time(NULL));
4117
4118         prctl_set_comment("ctdb_recovered");
4119         if (switch_from_server_to_client(ctdb, "recoverd") != 0) {
4120                 DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch recovery daemon into client mode. shutting down.\n"));
4121                 exit(1);
4122         }
4123
4124         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d to recovery daemon\n", fd[0]));
4125
4126         fde = tevent_add_fd(ctdb->ev, ctdb, fd[0], TEVENT_FD_READ,
4127                             ctdb_recoverd_parent, &fd[0]);
4128         tevent_fd_set_auto_close(fde);
4129
4130         /* set up a handler to pick up sigchld */
4131         se = tevent_add_signal(ctdb->ev, ctdb, SIGCHLD, 0,
4132                                recd_sig_child_handler, ctdb);
4133         if (se == NULL) {
4134                 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD in recovery daemon\n"));
4135                 exit(1);
4136         }
4137
4138         monitor_cluster(ctdb);
4139
4140         DEBUG(DEBUG_ALERT,("ERROR: ctdb_recoverd finished!?\n"));
4141         return -1;
4142 }
4143
4144 /*
4145   shutdown the recovery daemon
4146  */
4147 void ctdb_stop_recoverd(struct ctdb_context *ctdb)
4148 {
4149         if (ctdb->recoverd_pid == 0) {
4150                 return;
4151         }
4152
4153         DEBUG(DEBUG_NOTICE,("Shutting down recovery daemon\n"));
4154         ctdb_kill(ctdb, ctdb->recoverd_pid, SIGTERM);
4155
4156         TALLOC_FREE(ctdb->recd_ctx);
4157         TALLOC_FREE(ctdb->recd_ping_count);
4158 }
4159
4160 static void ctdb_restart_recd(struct tevent_context *ev,
4161                               struct tevent_timer *te,
4162                               struct timeval t, void *private_data)
4163 {
4164         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
4165
4166         DEBUG(DEBUG_ERR,("Restarting recovery daemon\n"));
4167         ctdb_stop_recoverd(ctdb);
4168         ctdb_start_recoverd(ctdb);
4169 }