0f30153fc13f30e7b963e8f3cf0c2fedd0994b78
[vlendec/samba-autobuild/.git] / ctdb / server / ctdb_recoverd.c
1 /* 
2    ctdb recovery daemon
3
4    Copyright (C) Ronnie Sahlberg  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "system/filesys.h"
22 #include "system/time.h"
23 #include "system/network.h"
24 #include "system/wait.h"
25 #include "popt.h"
26 #include "common/cmdline.h"
27 #include "../include/ctdb_client.h"
28 #include "../include/ctdb_private.h"
29 #include "lib/tdb_wrap/tdb_wrap.h"
30 #include "lib/util/dlinklist.h"
31 #include "common/system.h"
32
33
34 /* List of SRVID requests that need to be processed */
35 struct srvid_list {
36         struct srvid_list *next, *prev;
37         struct srvid_request *request;
38 };
39
40 struct srvid_requests {
41         struct srvid_list *requests;
42 };
43
44 static void srvid_request_reply(struct ctdb_context *ctdb,
45                                 struct srvid_request *request,
46                                 TDB_DATA result)
47 {
48         /* Someone that sent srvid==0 does not want a reply */
49         if (request->srvid == 0) {
50                 talloc_free(request);
51                 return;
52         }
53
54         if (ctdb_client_send_message(ctdb, request->pnn, request->srvid,
55                                      result) == 0) {
56                 DEBUG(DEBUG_INFO,("Sent SRVID reply to %u:%llu\n",
57                                   (unsigned)request->pnn,
58                                   (unsigned long long)request->srvid));
59         } else {
60                 DEBUG(DEBUG_ERR,("Failed to send SRVID reply to %u:%llu\n",
61                                  (unsigned)request->pnn,
62                                  (unsigned long long)request->srvid));
63         }
64
65         talloc_free(request);
66 }
67
68 static void srvid_requests_reply(struct ctdb_context *ctdb,
69                                  struct srvid_requests **requests,
70                                  TDB_DATA result)
71 {
72         struct srvid_list *r;
73
74         for (r = (*requests)->requests; r != NULL; r = r->next) {
75                 srvid_request_reply(ctdb, r->request, result);
76         }
77
78         /* Free the list structure... */
79         TALLOC_FREE(*requests);
80 }
81
82 static void srvid_request_add(struct ctdb_context *ctdb,
83                               struct srvid_requests **requests,
84                               struct srvid_request *request)
85 {
86         struct srvid_list *t;
87         int32_t ret;
88         TDB_DATA result;
89
90         if (*requests == NULL) {
91                 *requests = talloc_zero(ctdb, struct srvid_requests);
92                 if (*requests == NULL) {
93                         goto nomem;
94                 }
95         }
96
97         t = talloc_zero(*requests, struct srvid_list);
98         if (t == NULL) {
99                 /* If *requests was just allocated above then free it */
100                 if ((*requests)->requests == NULL) {
101                         TALLOC_FREE(*requests);
102                 }
103                 goto nomem;
104         }
105
106         t->request = (struct srvid_request *)talloc_steal(t, request);
107         DLIST_ADD((*requests)->requests, t);
108
109         return;
110
111 nomem:
112         /* Failed to add the request to the list.  Send a fail. */
113         DEBUG(DEBUG_ERR, (__location__
114                           " Out of memory, failed to queue SRVID request\n"));
115         ret = -ENOMEM;
116         result.dsize = sizeof(ret);
117         result.dptr = (uint8_t *)&ret;
118         srvid_request_reply(ctdb, request, result);
119 }
120
121 /* An abstraction to allow an operation (takeover runs, recoveries,
122  * ...) to be disabled for a given timeout */
123 struct ctdb_op_state {
124         struct tevent_timer *timer;
125         bool in_progress;
126         const char *name;
127 };
128
129 static struct ctdb_op_state *ctdb_op_init(TALLOC_CTX *mem_ctx, const char *name)
130 {
131         struct ctdb_op_state *state = talloc_zero(mem_ctx, struct ctdb_op_state);
132
133         if (state != NULL) {
134                 state->in_progress = false;
135                 state->name = name;
136         }
137
138         return state;
139 }
140
141 static bool ctdb_op_is_disabled(struct ctdb_op_state *state)
142 {
143         return state->timer != NULL;
144 }
145
146 static bool ctdb_op_begin(struct ctdb_op_state *state)
147 {
148         if (ctdb_op_is_disabled(state)) {
149                 DEBUG(DEBUG_NOTICE,
150                       ("Unable to begin - %s are disabled\n", state->name));
151                 return false;
152         }
153
154         state->in_progress = true;
155         return true;
156 }
157
158 static bool ctdb_op_end(struct ctdb_op_state *state)
159 {
160         return state->in_progress = false;
161 }
162
163 static bool ctdb_op_is_in_progress(struct ctdb_op_state *state)
164 {
165         return state->in_progress;
166 }
167
168 static void ctdb_op_enable(struct ctdb_op_state *state)
169 {
170         TALLOC_FREE(state->timer);
171 }
172
173 static void ctdb_op_timeout_handler(struct event_context *ev,
174                                     struct timed_event *te,
175                                     struct timeval yt, void *p)
176 {
177         struct ctdb_op_state *state =
178                 talloc_get_type(p, struct ctdb_op_state);
179
180         DEBUG(DEBUG_NOTICE,("Reenabling %s after timeout\n", state->name));
181         ctdb_op_enable(state);
182 }
183
184 static int ctdb_op_disable(struct ctdb_op_state *state,
185                            struct tevent_context *ev,
186                            uint32_t timeout)
187 {
188         if (timeout == 0) {
189                 DEBUG(DEBUG_NOTICE,("Reenabling %s\n", state->name));
190                 ctdb_op_enable(state);
191                 return 0;
192         }
193
194         if (state->in_progress) {
195                 DEBUG(DEBUG_ERR,
196                       ("Unable to disable %s - in progress\n", state->name));
197                 return -EAGAIN;
198         }
199
200         DEBUG(DEBUG_NOTICE,("Disabling %s for %u seconds\n",
201                             state->name, timeout));
202
203         /* Clear any old timers */
204         talloc_free(state->timer);
205
206         /* Arrange for the timeout to occur */
207         state->timer = tevent_add_timer(ev, state,
208                                         timeval_current_ofs(timeout, 0),
209                                         ctdb_op_timeout_handler, state);
210         if (state->timer == NULL) {
211                 DEBUG(DEBUG_ERR,(__location__ " Unable to setup timer\n"));
212                 return -ENOMEM;
213         }
214
215         return 0;
216 }
217
218 struct ctdb_banning_state {
219         uint32_t count;
220         struct timeval last_reported_time;
221 };
222
223 /*
224   private state of recovery daemon
225  */
226 struct ctdb_recoverd {
227         struct ctdb_context *ctdb;
228         uint32_t recmaster;
229         uint32_t last_culprit_node;
230         struct ctdb_node_map *nodemap;
231         struct timeval priority_time;
232         bool need_takeover_run;
233         bool need_recovery;
234         uint32_t node_flags;
235         struct timed_event *send_election_te;
236         struct timed_event *election_timeout;
237         struct srvid_requests *reallocate_requests;
238         struct ctdb_op_state *takeover_run;
239         struct ctdb_op_state *recovery;
240         struct ctdb_control_get_ifaces *ifaces;
241         uint32_t *force_rebalance_nodes;
242         struct ctdb_node_capabilities *caps;
243 };
244
245 #define CONTROL_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_timeout, 0)
246 #define MONITOR_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_interval, 0)
247
248 static void ctdb_restart_recd(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data);
249
250 /*
251   ban a node for a period of time
252  */
253 static void ctdb_ban_node(struct ctdb_recoverd *rec, uint32_t pnn, uint32_t ban_time)
254 {
255         int ret;
256         struct ctdb_context *ctdb = rec->ctdb;
257         struct ctdb_ban_time bantime;
258        
259         if (!ctdb_validate_pnn(ctdb, pnn)) {
260                 DEBUG(DEBUG_ERR,("Bad pnn %u in ctdb_ban_node\n", pnn));
261                 return;
262         }
263
264         DEBUG(DEBUG_NOTICE,("Banning node %u for %u seconds\n", pnn, ban_time));
265
266         bantime.pnn  = pnn;
267         bantime.time = ban_time;
268
269         ret = ctdb_ctrl_set_ban(ctdb, CONTROL_TIMEOUT(), pnn, &bantime);
270         if (ret != 0) {
271                 DEBUG(DEBUG_ERR,(__location__ " Failed to ban node %d\n", pnn));
272                 return;
273         }
274
275 }
276
277 enum monitor_result { MONITOR_OK, MONITOR_RECOVERY_NEEDED, MONITOR_ELECTION_NEEDED, MONITOR_FAILED};
278
279
280 /*
281   remember the trouble maker
282  */
283 static void ctdb_set_culprit_count(struct ctdb_recoverd *rec, uint32_t culprit, uint32_t count)
284 {
285         struct ctdb_context *ctdb = talloc_get_type(rec->ctdb, struct ctdb_context);
286         struct ctdb_banning_state *ban_state;
287
288         if (culprit > ctdb->num_nodes) {
289                 DEBUG(DEBUG_ERR,("Trying to set culprit %d but num_nodes is %d\n", culprit, ctdb->num_nodes));
290                 return;
291         }
292
293         /* If we are banned or stopped, do not set other nodes as culprits */
294         if (rec->node_flags & NODE_FLAGS_INACTIVE) {
295                 DEBUG(DEBUG_NOTICE, ("This node is INACTIVE, cannot set culprit node %d\n", culprit));
296                 return;
297         }
298
299         if (ctdb->nodes[culprit]->ban_state == NULL) {
300                 ctdb->nodes[culprit]->ban_state = talloc_zero(ctdb->nodes[culprit], struct ctdb_banning_state);
301                 CTDB_NO_MEMORY_VOID(ctdb, ctdb->nodes[culprit]->ban_state);
302
303                 
304         }
305         ban_state = ctdb->nodes[culprit]->ban_state;
306         if (timeval_elapsed(&ban_state->last_reported_time) > ctdb->tunable.recovery_grace_period) {
307                 /* this was the first time in a long while this node
308                    misbehaved so we will forgive any old transgressions.
309                 */
310                 ban_state->count = 0;
311         }
312
313         ban_state->count += count;
314         ban_state->last_reported_time = timeval_current();
315         rec->last_culprit_node = culprit;
316 }
317
318 /*
319   remember the trouble maker
320  */
321 static void ctdb_set_culprit(struct ctdb_recoverd *rec, uint32_t culprit)
322 {
323         ctdb_set_culprit_count(rec, culprit, 1);
324 }
325
326
327 /* this callback is called for every node that failed to execute the
328    recovered event
329 */
330 static void recovered_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
331 {
332         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
333
334         DEBUG(DEBUG_ERR, (__location__ " Node %u failed the recovered event. Setting it as recovery fail culprit\n", node_pnn));
335
336         ctdb_set_culprit(rec, node_pnn);
337 }
338
339 /*
340   run the "recovered" eventscript on all nodes
341  */
342 static int run_recovered_eventscript(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, const char *caller)
343 {
344         TALLOC_CTX *tmp_ctx;
345         uint32_t *nodes;
346         struct ctdb_context *ctdb = rec->ctdb;
347
348         tmp_ctx = talloc_new(ctdb);
349         CTDB_NO_MEMORY(ctdb, tmp_ctx);
350
351         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
352         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_END_RECOVERY,
353                                         nodes, 0,
354                                         CONTROL_TIMEOUT(), false, tdb_null,
355                                         NULL, recovered_fail_callback,
356                                         rec) != 0) {
357                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event when called from %s\n", caller));
358
359                 talloc_free(tmp_ctx);
360                 return -1;
361         }
362
363         talloc_free(tmp_ctx);
364         return 0;
365 }
366
367 /* this callback is called for every node that failed to execute the
368    start recovery event
369 */
370 static void startrecovery_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
371 {
372         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
373
374         DEBUG(DEBUG_ERR, (__location__ " Node %u failed the startrecovery event. Setting it as recovery fail culprit\n", node_pnn));
375
376         ctdb_set_culprit(rec, node_pnn);
377 }
378
379 /*
380   run the "startrecovery" eventscript on all nodes
381  */
382 static int run_startrecovery_eventscript(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
383 {
384         TALLOC_CTX *tmp_ctx;
385         uint32_t *nodes;
386         struct ctdb_context *ctdb = rec->ctdb;
387
388         tmp_ctx = talloc_new(ctdb);
389         CTDB_NO_MEMORY(ctdb, tmp_ctx);
390
391         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
392         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_START_RECOVERY,
393                                         nodes, 0,
394                                         CONTROL_TIMEOUT(), false, tdb_null,
395                                         NULL,
396                                         startrecovery_fail_callback,
397                                         rec) != 0) {
398                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event. Recovery failed.\n"));
399                 talloc_free(tmp_ctx);
400                 return -1;
401         }
402
403         talloc_free(tmp_ctx);
404         return 0;
405 }
406
407 /*
408   update the node capabilities for all connected nodes
409  */
410 static int update_capabilities(struct ctdb_recoverd *rec,
411                                struct ctdb_node_map *nodemap)
412 {
413         uint32_t *capp;
414         TALLOC_CTX *tmp_ctx;
415         struct ctdb_node_capabilities *caps;
416         struct ctdb_context *ctdb = rec->ctdb;
417
418         tmp_ctx = talloc_new(rec);
419         CTDB_NO_MEMORY(ctdb, tmp_ctx);
420
421         caps = ctdb_get_capabilities(ctdb, tmp_ctx,
422                                      CONTROL_TIMEOUT(), nodemap);
423
424         if (caps == NULL) {
425                 DEBUG(DEBUG_ERR,
426                       (__location__ " Failed to get node capabilities\n"));
427                 talloc_free(tmp_ctx);
428                 return -1;
429         }
430
431         capp = ctdb_get_node_capabilities(caps, ctdb_get_pnn(ctdb));
432         if (capp == NULL) {
433                 DEBUG(DEBUG_ERR,
434                       (__location__
435                        " Capabilities don't include current node.\n"));
436                 talloc_free(tmp_ctx);
437                 return -1;
438         }
439         ctdb->capabilities = *capp;
440
441         TALLOC_FREE(rec->caps);
442         rec->caps = talloc_steal(rec, caps);
443
444         talloc_free(tmp_ctx);
445         return 0;
446 }
447
448 static void set_recmode_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
449 {
450         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
451
452         DEBUG(DEBUG_ERR,("Failed to freeze node %u during recovery. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
453         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
454 }
455
456 static void transaction_start_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
457 {
458         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
459
460         DEBUG(DEBUG_ERR,("Failed to start recovery transaction on node %u. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
461         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
462 }
463
464 /*
465   change recovery mode on all nodes
466  */
467 static int set_recovery_mode(struct ctdb_context *ctdb,
468                              struct ctdb_recoverd *rec,
469                              struct ctdb_node_map *nodemap,
470                              uint32_t rec_mode, bool freeze)
471 {
472         TDB_DATA data;
473         uint32_t *nodes;
474         TALLOC_CTX *tmp_ctx;
475
476         tmp_ctx = talloc_new(ctdb);
477         CTDB_NO_MEMORY(ctdb, tmp_ctx);
478
479         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
480
481         data.dsize = sizeof(uint32_t);
482         data.dptr = (unsigned char *)&rec_mode;
483
484         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMODE,
485                                         nodes, 0,
486                                         CONTROL_TIMEOUT(),
487                                         false, data,
488                                         NULL, NULL,
489                                         NULL) != 0) {
490                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode. Recovery failed.\n"));
491                 talloc_free(tmp_ctx);
492                 return -1;
493         }
494
495         /* freeze all nodes */
496         if (freeze && rec_mode == CTDB_RECOVERY_ACTIVE) {
497                 int i;
498
499                 for (i=1; i<=NUM_DB_PRIORITIES; i++) {
500                         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
501                                                 nodes, i,
502                                                 CONTROL_TIMEOUT(),
503                                                 false, tdb_null,
504                                                 NULL,
505                                                 set_recmode_fail_callback,
506                                                 rec) != 0) {
507                                 DEBUG(DEBUG_ERR, (__location__ " Unable to freeze nodes. Recovery failed.\n"));
508                                 talloc_free(tmp_ctx);
509                                 return -1;
510                         }
511                 }
512         }
513
514         talloc_free(tmp_ctx);
515         return 0;
516 }
517
518 /*
519   change recovery master on all node
520  */
521 static int set_recovery_master(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn)
522 {
523         TDB_DATA data;
524         TALLOC_CTX *tmp_ctx;
525         uint32_t *nodes;
526
527         tmp_ctx = talloc_new(ctdb);
528         CTDB_NO_MEMORY(ctdb, tmp_ctx);
529
530         data.dsize = sizeof(uint32_t);
531         data.dptr = (unsigned char *)&pnn;
532
533         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
534         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMASTER,
535                                         nodes, 0,
536                                         CONTROL_TIMEOUT(), false, data,
537                                         NULL, NULL,
538                                         NULL) != 0) {
539                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recmaster. Recovery failed.\n"));
540                 talloc_free(tmp_ctx);
541                 return -1;
542         }
543
544         talloc_free(tmp_ctx);
545         return 0;
546 }
547
548 /* update all remote nodes to use the same db priority that we have
549    this can fail if the remove node has not yet been upgraded to 
550    support this function, so we always return success and never fail
551    a recovery if this call fails.
552 */
553 static int update_db_priority_on_remote_nodes(struct ctdb_context *ctdb,
554         struct ctdb_node_map *nodemap, 
555         uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
556 {
557         int db;
558
559         /* step through all local databases */
560         for (db=0; db<dbmap->num;db++) {
561                 struct ctdb_db_priority db_prio;
562                 int ret;
563
564                 db_prio.db_id     = dbmap->dbs[db].dbid;
565                 ret = ctdb_ctrl_get_db_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, dbmap->dbs[db].dbid, &db_prio.priority);
566                 if (ret != 0) {
567                         DEBUG(DEBUG_ERR,(__location__ " Failed to read database priority from local node for db 0x%08x\n", dbmap->dbs[db].dbid));
568                         continue;
569                 }
570
571                 DEBUG(DEBUG_INFO,("Update DB priority for db 0x%08x to %u\n", dbmap->dbs[db].dbid, db_prio.priority)); 
572
573                 ret = ctdb_ctrl_set_db_priority(ctdb, CONTROL_TIMEOUT(),
574                                                 CTDB_CURRENT_NODE, &db_prio);
575                 if (ret != 0) {
576                         DEBUG(DEBUG_ERR,(__location__ " Failed to set DB priority for 0x%08x\n",
577                                          db_prio.db_id));
578                 }
579         }
580
581         return 0;
582 }                       
583
584 /*
585   ensure all other nodes have attached to any databases that we have
586  */
587 static int create_missing_remote_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
588                                            uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
589 {
590         int i, j, db, ret;
591         struct ctdb_dbid_map *remote_dbmap;
592
593         /* verify that all other nodes have all our databases */
594         for (j=0; j<nodemap->num; j++) {
595                 /* we dont need to ourself ourselves */
596                 if (nodemap->nodes[j].pnn == pnn) {
597                         continue;
598                 }
599                 /* dont check nodes that are unavailable */
600                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
601                         continue;
602                 }
603
604                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
605                                          mem_ctx, &remote_dbmap);
606                 if (ret != 0) {
607                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
608                         return -1;
609                 }
610
611                 /* step through all local databases */
612                 for (db=0; db<dbmap->num;db++) {
613                         const char *name;
614
615
616                         for (i=0;i<remote_dbmap->num;i++) {
617                                 if (dbmap->dbs[db].dbid == remote_dbmap->dbs[i].dbid) {
618                                         break;
619                                 }
620                         }
621                         /* the remote node already have this database */
622                         if (i!=remote_dbmap->num) {
623                                 continue;
624                         }
625                         /* ok so we need to create this database */
626                         ret = ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), pnn,
627                                                   dbmap->dbs[db].dbid, mem_ctx,
628                                                   &name);
629                         if (ret != 0) {
630                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", pnn));
631                                 return -1;
632                         }
633                         ret = ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(),
634                                                  nodemap->nodes[j].pnn,
635                                                  mem_ctx, name,
636                                                  dbmap->dbs[db].flags & CTDB_DB_FLAGS_PERSISTENT);
637                         if (ret != 0) {
638                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create remote db:%s\n", name));
639                                 return -1;
640                         }
641                 }
642         }
643
644         return 0;
645 }
646
647
648 /*
649   ensure we are attached to any databases that anyone else is attached to
650  */
651 static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
652                                           uint32_t pnn, struct ctdb_dbid_map **dbmap, TALLOC_CTX *mem_ctx)
653 {
654         int i, j, db, ret;
655         struct ctdb_dbid_map *remote_dbmap;
656
657         /* verify that we have all database any other node has */
658         for (j=0; j<nodemap->num; j++) {
659                 /* we dont need to ourself ourselves */
660                 if (nodemap->nodes[j].pnn == pnn) {
661                         continue;
662                 }
663                 /* dont check nodes that are unavailable */
664                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
665                         continue;
666                 }
667
668                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
669                                          mem_ctx, &remote_dbmap);
670                 if (ret != 0) {
671                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
672                         return -1;
673                 }
674
675                 /* step through all databases on the remote node */
676                 for (db=0; db<remote_dbmap->num;db++) {
677                         const char *name;
678
679                         for (i=0;i<(*dbmap)->num;i++) {
680                                 if (remote_dbmap->dbs[db].dbid == (*dbmap)->dbs[i].dbid) {
681                                         break;
682                                 }
683                         }
684                         /* we already have this db locally */
685                         if (i!=(*dbmap)->num) {
686                                 continue;
687                         }
688                         /* ok so we need to create this database and
689                            rebuild dbmap
690                          */
691                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
692                                             remote_dbmap->dbs[db].dbid, mem_ctx, &name);
693                         if (ret != 0) {
694                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", 
695                                           nodemap->nodes[j].pnn));
696                                 return -1;
697                         }
698                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, name, 
699                                            remote_dbmap->dbs[db].flags & CTDB_DB_FLAGS_PERSISTENT);
700                         if (ret != 0) {
701                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create local db:%s\n", name));
702                                 return -1;
703                         }
704                         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, dbmap);
705                         if (ret != 0) {
706                                 DEBUG(DEBUG_ERR, (__location__ " Unable to reread dbmap on node %u\n", pnn));
707                                 return -1;
708                         }
709                 }
710         }
711
712         return 0;
713 }
714
715
716 /*
717   pull the remote database contents from one node into the recdb
718  */
719 static int pull_one_remote_database(struct ctdb_context *ctdb, uint32_t srcnode,
720                                     struct tdb_wrap *recdb, uint32_t dbid)
721 {
722         int ret;
723         TDB_DATA outdata;
724         struct ctdb_marshall_buffer *reply;
725         struct ctdb_rec_data *recdata;
726         int i;
727         TALLOC_CTX *tmp_ctx = talloc_new(recdb);
728
729         ret = ctdb_ctrl_pulldb(ctdb, srcnode, dbid, CTDB_LMASTER_ANY, tmp_ctx,
730                                CONTROL_TIMEOUT(), &outdata);
731         if (ret != 0) {
732                 DEBUG(DEBUG_ERR,(__location__ " Unable to copy db from node %u\n", srcnode));
733                 talloc_free(tmp_ctx);
734                 return -1;
735         }
736
737         reply = (struct ctdb_marshall_buffer *)outdata.dptr;
738
739         if (outdata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
740                 DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
741                 talloc_free(tmp_ctx);
742                 return -1;
743         }
744
745         recdata = (struct ctdb_rec_data *)&reply->data[0];
746
747         for (i=0;
748              i<reply->count;
749              recdata = (struct ctdb_rec_data *)(recdata->length + (uint8_t *)recdata), i++) {
750                 TDB_DATA key, data;
751                 struct ctdb_ltdb_header *hdr;
752                 TDB_DATA existing;
753
754                 key.dptr = &recdata->data[0];
755                 key.dsize = recdata->keylen;
756                 data.dptr = &recdata->data[key.dsize];
757                 data.dsize = recdata->datalen;
758
759                 hdr = (struct ctdb_ltdb_header *)data.dptr;
760
761                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
762                         DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
763                         talloc_free(tmp_ctx);
764                         return -1;
765                 }
766
767                 /* fetch the existing record, if any */
768                 existing = tdb_fetch(recdb->tdb, key);
769
770                 if (existing.dptr != NULL) {
771                         struct ctdb_ltdb_header header;
772                         if (existing.dsize < sizeof(struct ctdb_ltdb_header)) {
773                                 DEBUG(DEBUG_CRIT,(__location__ " Bad record size %u from node %u\n",
774                                          (unsigned)existing.dsize, srcnode));
775                                 free(existing.dptr);
776                                 talloc_free(tmp_ctx);
777                                 return -1;
778                         }
779                         header = *(struct ctdb_ltdb_header *)existing.dptr;
780                         free(existing.dptr);
781                         if (!(header.rsn < hdr->rsn ||
782                               (header.dmaster != ctdb_get_pnn(ctdb) &&
783                                header.rsn == hdr->rsn))) {
784                                 continue;
785                         }
786                 }
787
788                 if (tdb_store(recdb->tdb, key, data, TDB_REPLACE) != 0) {
789                         DEBUG(DEBUG_CRIT,(__location__ " Failed to store record\n"));
790                         talloc_free(tmp_ctx);
791                         return -1;
792                 }
793         }
794
795         talloc_free(tmp_ctx);
796
797         return 0;
798 }
799
800
801 struct pull_seqnum_cbdata {
802         int failed;
803         uint32_t pnn;
804         uint64_t seqnum;
805 };
806
807 static void pull_seqnum_cb(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
808 {
809         struct pull_seqnum_cbdata *cb_data = talloc_get_type(callback_data, struct pull_seqnum_cbdata);
810         uint64_t seqnum;
811
812         if (cb_data->failed != 0) {
813                 DEBUG(DEBUG_ERR, ("Got seqnum from node %d but we have already failed the entire operation\n", node_pnn));
814                 return;
815         }
816
817         if (res != 0) {
818                 DEBUG(DEBUG_ERR, ("Error when pulling seqnum from node %d\n", node_pnn));
819                 cb_data->failed = 1;
820                 return;
821         }
822
823         if (outdata.dsize != sizeof(uint64_t)) {
824                 DEBUG(DEBUG_ERR, ("Error when reading pull seqnum from node %d, got %d bytes but expected %d\n", node_pnn, (int)outdata.dsize, (int)sizeof(uint64_t)));
825                 cb_data->failed = -1;
826                 return;
827         }
828
829         seqnum = *((uint64_t *)outdata.dptr);
830
831         if (seqnum > cb_data->seqnum ||
832             (cb_data->pnn == -1 && seqnum == 0)) {
833                 cb_data->seqnum = seqnum;
834                 cb_data->pnn = node_pnn;
835         }
836 }
837
838 static void pull_seqnum_fail_cb(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
839 {
840         struct pull_seqnum_cbdata *cb_data = talloc_get_type(callback_data, struct pull_seqnum_cbdata);
841
842         DEBUG(DEBUG_ERR, ("Failed to pull db seqnum from node %d\n", node_pnn));
843         cb_data->failed = 1;
844 }
845
846 static int pull_highest_seqnum_pdb(struct ctdb_context *ctdb,
847                                 struct ctdb_recoverd *rec, 
848                                 struct ctdb_node_map *nodemap, 
849                                 struct tdb_wrap *recdb, uint32_t dbid)
850 {
851         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
852         uint32_t *nodes;
853         TDB_DATA data;
854         uint32_t outdata[2];
855         struct pull_seqnum_cbdata *cb_data;
856
857         DEBUG(DEBUG_NOTICE, ("Scan for highest seqnum pdb for db:0x%08x\n", dbid));
858
859         outdata[0] = dbid;
860         outdata[1] = 0;
861
862         data.dsize = sizeof(outdata);
863         data.dptr  = (uint8_t *)&outdata[0];
864
865         cb_data = talloc(tmp_ctx, struct pull_seqnum_cbdata);
866         if (cb_data == NULL) {
867                 DEBUG(DEBUG_ERR, ("Failed to allocate pull highest seqnum cb_data structure\n"));
868                 talloc_free(tmp_ctx);
869                 return -1;
870         }
871
872         cb_data->failed = 0;
873         cb_data->pnn    = -1;
874         cb_data->seqnum = 0;
875         
876         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
877         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_DB_SEQNUM,
878                                         nodes, 0,
879                                         CONTROL_TIMEOUT(), false, data,
880                                         pull_seqnum_cb,
881                                         pull_seqnum_fail_cb,
882                                         cb_data) != 0) {
883                 DEBUG(DEBUG_ERR, (__location__ " Failed to run async GET_DB_SEQNUM\n"));
884
885                 talloc_free(tmp_ctx);
886                 return -1;
887         }
888
889         if (cb_data->failed != 0) {
890                 DEBUG(DEBUG_NOTICE, ("Failed to pull sequence numbers for DB 0x%08x\n", dbid));
891                 talloc_free(tmp_ctx);
892                 return -1;
893         }
894
895         if (cb_data->pnn == -1) {
896                 DEBUG(DEBUG_NOTICE, ("Failed to find a node with highest sequence numbers for DB 0x%08x\n", dbid));
897                 talloc_free(tmp_ctx);
898                 return -1;
899         }
900
901         DEBUG(DEBUG_NOTICE, ("Pull persistent db:0x%08x from node %d with highest seqnum:%lld\n", dbid, cb_data->pnn, (long long)cb_data->seqnum)); 
902
903         if (pull_one_remote_database(ctdb, cb_data->pnn, recdb, dbid) != 0) {
904                 DEBUG(DEBUG_ERR, ("Failed to pull higest seqnum database 0x%08x from node %d\n", dbid, cb_data->pnn));
905                 talloc_free(tmp_ctx);
906                 return -1;
907         }
908
909         talloc_free(tmp_ctx);
910         return 0;
911 }
912
913
914 /*
915   pull all the remote database contents into the recdb
916  */
917 static int pull_remote_database(struct ctdb_context *ctdb,
918                                 struct ctdb_recoverd *rec, 
919                                 struct ctdb_node_map *nodemap, 
920                                 struct tdb_wrap *recdb, uint32_t dbid,
921                                 bool persistent)
922 {
923         int j;
924
925         if (persistent && ctdb->tunable.recover_pdb_by_seqnum != 0) {
926                 int ret;
927                 ret = pull_highest_seqnum_pdb(ctdb, rec, nodemap, recdb, dbid);
928                 if (ret == 0) {
929                         return 0;
930                 }
931         }
932
933         /* pull all records from all other nodes across onto this node
934            (this merges based on rsn)
935         */
936         for (j=0; j<nodemap->num; j++) {
937                 /* dont merge from nodes that are unavailable */
938                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
939                         continue;
940                 }
941                 if (pull_one_remote_database(ctdb, nodemap->nodes[j].pnn, recdb, dbid) != 0) {
942                         DEBUG(DEBUG_ERR,(__location__ " Failed to pull remote database from node %u\n", 
943                                  nodemap->nodes[j].pnn));
944                         ctdb_set_culprit_count(rec, nodemap->nodes[j].pnn, nodemap->num);
945                         return -1;
946                 }
947         }
948         
949         return 0;
950 }
951
952
953 /*
954   update flags on all active nodes
955  */
956 static int update_flags_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn, uint32_t flags)
957 {
958         int ret;
959
960         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, flags, ~flags);
961                 if (ret != 0) {
962                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
963                 return -1;
964         }
965
966         return 0;
967 }
968
969 /*
970   ensure all nodes have the same vnnmap we do
971  */
972 static int update_vnnmap_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
973                                       uint32_t pnn, struct ctdb_vnn_map *vnnmap, TALLOC_CTX *mem_ctx)
974 {
975         int j, ret;
976
977         /* push the new vnn map out to all the nodes */
978         for (j=0; j<nodemap->num; j++) {
979                 /* dont push to nodes that are unavailable */
980                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
981                         continue;
982                 }
983
984                 ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, mem_ctx, vnnmap);
985                 if (ret != 0) {
986                         DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
987                         return -1;
988                 }
989         }
990
991         return 0;
992 }
993
994
995 /*
996   called when a vacuum fetch has completed - just free it and do the next one
997  */
998 static void vacuum_fetch_callback(struct ctdb_client_call_state *state)
999 {
1000         talloc_free(state);
1001 }
1002
1003
1004 /**
1005  * Process one elements of the vacuum fetch list:
1006  * Migrate it over to us with the special flag
1007  * CTDB_CALL_FLAG_VACUUM_MIGRATION.
1008  */
1009 static bool vacuum_fetch_process_one(struct ctdb_db_context *ctdb_db,
1010                                      uint32_t pnn,
1011                                      struct ctdb_rec_data *r)
1012 {
1013         struct ctdb_client_call_state *state;
1014         TDB_DATA data;
1015         struct ctdb_ltdb_header *hdr;
1016         struct ctdb_call call;
1017
1018         ZERO_STRUCT(call);
1019         call.call_id = CTDB_NULL_FUNC;
1020         call.flags = CTDB_IMMEDIATE_MIGRATION;
1021         call.flags |= CTDB_CALL_FLAG_VACUUM_MIGRATION;
1022
1023         call.key.dptr = &r->data[0];
1024         call.key.dsize = r->keylen;
1025
1026         /* ensure we don't block this daemon - just skip a record if we can't get
1027            the chainlock */
1028         if (tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, call.key) != 0) {
1029                 return true;
1030         }
1031
1032         data = tdb_fetch(ctdb_db->ltdb->tdb, call.key);
1033         if (data.dptr == NULL) {
1034                 tdb_chainunlock(ctdb_db->ltdb->tdb, call.key);
1035                 return true;
1036         }
1037
1038         if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
1039                 free(data.dptr);
1040                 tdb_chainunlock(ctdb_db->ltdb->tdb, call.key);
1041                 return true;
1042         }
1043
1044         hdr = (struct ctdb_ltdb_header *)data.dptr;
1045         if (hdr->dmaster == pnn) {
1046                 /* its already local */
1047                 free(data.dptr);
1048                 tdb_chainunlock(ctdb_db->ltdb->tdb, call.key);
1049                 return true;
1050         }
1051
1052         free(data.dptr);
1053
1054         state = ctdb_call_send(ctdb_db, &call);
1055         tdb_chainunlock(ctdb_db->ltdb->tdb, call.key);
1056         if (state == NULL) {
1057                 DEBUG(DEBUG_ERR,(__location__ " Failed to setup vacuum fetch call\n"));
1058                 return false;
1059         }
1060         state->async.fn = vacuum_fetch_callback;
1061         state->async.private_data = NULL;
1062
1063         return true;
1064 }
1065
1066
1067 /*
1068   handler for vacuum fetch
1069 */
1070 static void vacuum_fetch_handler(uint64_t srvid, TDB_DATA data,
1071                                  void *private_data)
1072 {
1073         struct ctdb_recoverd *rec = talloc_get_type(
1074                 private_data, struct ctdb_recoverd);
1075         struct ctdb_context *ctdb = rec->ctdb;
1076         struct ctdb_marshall_buffer *recs;
1077         int ret, i;
1078         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1079         const char *name;
1080         struct ctdb_dbid_map *dbmap=NULL;
1081         bool persistent = false;
1082         struct ctdb_db_context *ctdb_db;
1083         struct ctdb_rec_data *r;
1084
1085         recs = (struct ctdb_marshall_buffer *)data.dptr;
1086
1087         if (recs->count == 0) {
1088                 goto done;
1089         }
1090
1091         /* work out if the database is persistent */
1092         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &dbmap);
1093         if (ret != 0) {
1094                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from local node\n"));
1095                 goto done;
1096         }
1097
1098         for (i=0;i<dbmap->num;i++) {
1099                 if (dbmap->dbs[i].dbid == recs->db_id) {
1100                         persistent = dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT;
1101                         break;
1102                 }
1103         }
1104         if (i == dbmap->num) {
1105                 DEBUG(DEBUG_ERR, (__location__ " Unable to find db_id 0x%x on local node\n", recs->db_id));
1106                 goto done;
1107         }
1108
1109         /* find the name of this database */
1110         if (ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, recs->db_id, tmp_ctx, &name) != 0) {
1111                 DEBUG(DEBUG_ERR,(__location__ " Failed to get name of db 0x%x\n", recs->db_id));
1112                 goto done;
1113         }
1114
1115         /* attach to it */
1116         ctdb_db = ctdb_attach(ctdb, CONTROL_TIMEOUT(), name, persistent, 0);
1117         if (ctdb_db == NULL) {
1118                 DEBUG(DEBUG_ERR,(__location__ " Failed to attach to database '%s'\n", name));
1119                 goto done;
1120         }
1121
1122         r = (struct ctdb_rec_data *)&recs->data[0];
1123         while (recs->count) {
1124                 bool ok;
1125
1126                 ok = vacuum_fetch_process_one(ctdb_db, rec->ctdb->pnn, r);
1127                 if (!ok) {
1128                         break;
1129                 }
1130
1131                 r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
1132                 recs->count--;
1133         }
1134
1135 done:
1136         talloc_free(tmp_ctx);
1137 }
1138
1139
1140 /*
1141  * handler for database detach
1142  */
1143 static void detach_database_handler(uint64_t srvid, TDB_DATA data,
1144                                     void *private_data)
1145 {
1146         struct ctdb_recoverd *rec = talloc_get_type(
1147                 private_data, struct ctdb_recoverd);
1148         struct ctdb_context *ctdb = rec->ctdb;
1149         uint32_t db_id;
1150         struct ctdb_db_context *ctdb_db;
1151
1152         if (data.dsize != sizeof(db_id)) {
1153                 return;
1154         }
1155         db_id = *(uint32_t *)data.dptr;
1156
1157         ctdb_db = find_ctdb_db(ctdb, db_id);
1158         if (ctdb_db == NULL) {
1159                 /* database is not attached */
1160                 return;
1161         }
1162
1163         DLIST_REMOVE(ctdb->db_list, ctdb_db);
1164
1165         DEBUG(DEBUG_NOTICE, ("Detached from database '%s'\n",
1166                              ctdb_db->db_name));
1167         talloc_free(ctdb_db);
1168 }
1169
1170 /*
1171   called when ctdb_wait_timeout should finish
1172  */
1173 static void ctdb_wait_handler(struct event_context *ev, struct timed_event *te, 
1174                               struct timeval yt, void *p)
1175 {
1176         uint32_t *timed_out = (uint32_t *)p;
1177         (*timed_out) = 1;
1178 }
1179
1180 /*
1181   wait for a given number of seconds
1182  */
1183 static void ctdb_wait_timeout(struct ctdb_context *ctdb, double secs)
1184 {
1185         uint32_t timed_out = 0;
1186         time_t usecs = (secs - (time_t)secs) * 1000000;
1187         event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(secs, usecs), ctdb_wait_handler, &timed_out);
1188         while (!timed_out) {
1189                 event_loop_once(ctdb->ev);
1190         }
1191 }
1192
1193 /*
1194   called when an election times out (ends)
1195  */
1196 static void ctdb_election_timeout(struct event_context *ev, struct timed_event *te, 
1197                                   struct timeval t, void *p)
1198 {
1199         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1200         rec->election_timeout = NULL;
1201         fast_start = false;
1202
1203         DEBUG(DEBUG_WARNING,("Election period ended\n"));
1204 }
1205
1206
1207 /*
1208   wait for an election to finish. It finished election_timeout seconds after
1209   the last election packet is received
1210  */
1211 static void ctdb_wait_election(struct ctdb_recoverd *rec)
1212 {
1213         struct ctdb_context *ctdb = rec->ctdb;
1214         while (rec->election_timeout) {
1215                 event_loop_once(ctdb->ev);
1216         }
1217 }
1218
1219 /*
1220   Update our local flags from all remote connected nodes. 
1221   This is only run when we are or we belive we are the recovery master
1222  */
1223 static int update_local_flags(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
1224 {
1225         int j;
1226         struct ctdb_context *ctdb = rec->ctdb;
1227         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
1228
1229         /* get the nodemap for all active remote nodes and verify
1230            they are the same as for this node
1231          */
1232         for (j=0; j<nodemap->num; j++) {
1233                 struct ctdb_node_map *remote_nodemap=NULL;
1234                 int ret;
1235
1236                 if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
1237                         continue;
1238                 }
1239                 if (nodemap->nodes[j].pnn == ctdb->pnn) {
1240                         continue;
1241                 }
1242
1243                 ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
1244                                            mem_ctx, &remote_nodemap);
1245                 if (ret != 0) {
1246                         DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from remote node %u\n", 
1247                                   nodemap->nodes[j].pnn));
1248                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
1249                         talloc_free(mem_ctx);
1250                         return MONITOR_FAILED;
1251                 }
1252                 if (nodemap->nodes[j].flags != remote_nodemap->nodes[j].flags) {
1253                         /* We should tell our daemon about this so it
1254                            updates its flags or else we will log the same 
1255                            message again in the next iteration of recovery.
1256                            Since we are the recovery master we can just as
1257                            well update the flags on all nodes.
1258                         */
1259                         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags, ~remote_nodemap->nodes[j].flags);
1260                         if (ret != 0) {
1261                                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
1262                                 return -1;
1263                         }
1264
1265                         /* Update our local copy of the flags in the recovery
1266                            daemon.
1267                         */
1268                         DEBUG(DEBUG_NOTICE,("Remote node %u had flags 0x%x, local had 0x%x - updating local\n",
1269                                  nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags,
1270                                  nodemap->nodes[j].flags));
1271                         nodemap->nodes[j].flags = remote_nodemap->nodes[j].flags;
1272                 }
1273                 talloc_free(remote_nodemap);
1274         }
1275         talloc_free(mem_ctx);
1276         return MONITOR_OK;
1277 }
1278
1279
1280 /* Create a new random generation id.
1281    The generation id can not be the INVALID_GENERATION id
1282 */
1283 static uint32_t new_generation(void)
1284 {
1285         uint32_t generation;
1286
1287         while (1) {
1288                 generation = random();
1289
1290                 if (generation != INVALID_GENERATION) {
1291                         break;
1292                 }
1293         }
1294
1295         return generation;
1296 }
1297
1298
1299 /*
1300   create a temporary working database
1301  */
1302 static struct tdb_wrap *create_recdb(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx)
1303 {
1304         char *name;
1305         struct tdb_wrap *recdb;
1306         unsigned tdb_flags;
1307
1308         /* open up the temporary recovery database */
1309         name = talloc_asprintf(mem_ctx, "%s/recdb.tdb.%u",
1310                                ctdb->db_directory_state,
1311                                ctdb->pnn);
1312         if (name == NULL) {
1313                 return NULL;
1314         }
1315         unlink(name);
1316
1317         tdb_flags = TDB_NOLOCK;
1318         if (ctdb->valgrinding) {
1319                 tdb_flags |= TDB_NOMMAP;
1320         }
1321         tdb_flags |= (TDB_INCOMPATIBLE_HASH | TDB_DISALLOW_NESTING);
1322
1323         recdb = tdb_wrap_open(mem_ctx, name, ctdb->tunable.database_hash_size, 
1324                               tdb_flags, O_RDWR|O_CREAT|O_EXCL, 0600);
1325         if (recdb == NULL) {
1326                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create temp recovery database '%s'\n", name));
1327         }
1328
1329         talloc_free(name);
1330
1331         return recdb;
1332 }
1333
1334
1335 /* 
1336    a traverse function for pulling all relevant records from recdb
1337  */
1338 struct recdb_data {
1339         struct ctdb_context *ctdb;
1340         struct ctdb_marshall_buffer *recdata;
1341         uint32_t len;
1342         uint32_t allocated_len;
1343         bool failed;
1344         bool persistent;
1345 };
1346
1347 static int traverse_recdb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
1348 {
1349         struct recdb_data *params = (struct recdb_data *)p;
1350         struct ctdb_rec_data *recdata;
1351         struct ctdb_ltdb_header *hdr;
1352
1353         /*
1354          * skip empty records - but NOT for persistent databases:
1355          *
1356          * The record-by-record mode of recovery deletes empty records.
1357          * For persistent databases, this can lead to data corruption
1358          * by deleting records that should be there:
1359          *
1360          * - Assume the cluster has been running for a while.
1361          *
1362          * - A record R in a persistent database has been created and
1363          *   deleted a couple of times, the last operation being deletion,
1364          *   leaving an empty record with a high RSN, say 10.
1365          *
1366          * - Now a node N is turned off.
1367          *
1368          * - This leaves the local database copy of D on N with the empty
1369          *   copy of R and RSN 10. On all other nodes, the recovery has deleted
1370          *   the copy of record R.
1371          *
1372          * - Now the record is created again while node N is turned off.
1373          *   This creates R with RSN = 1 on all nodes except for N.
1374          *
1375          * - Now node N is turned on again. The following recovery will chose
1376          *   the older empty copy of R due to RSN 10 > RSN 1.
1377          *
1378          * ==> Hence the record is gone after the recovery.
1379          *
1380          * On databases like Samba's registry, this can damage the higher-level
1381          * data structures built from the various tdb-level records.
1382          */
1383         if (!params->persistent && data.dsize <= sizeof(struct ctdb_ltdb_header)) {
1384                 return 0;
1385         }
1386
1387         /* update the dmaster field to point to us */
1388         hdr = (struct ctdb_ltdb_header *)data.dptr;
1389         if (!params->persistent) {
1390                 hdr->dmaster = params->ctdb->pnn;
1391                 hdr->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
1392         }
1393
1394         /* add the record to the blob ready to send to the nodes */
1395         recdata = ctdb_marshall_record(params->recdata, 0, key, NULL, data);
1396         if (recdata == NULL) {
1397                 params->failed = true;
1398                 return -1;
1399         }
1400         if (params->len + recdata->length >= params->allocated_len) {
1401                 params->allocated_len = recdata->length + params->len + params->ctdb->tunable.pulldb_preallocation_size;
1402                 params->recdata = talloc_realloc_size(NULL, params->recdata, params->allocated_len);
1403         }
1404         if (params->recdata == NULL) {
1405                 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata to %u\n",
1406                          recdata->length + params->len));
1407                 params->failed = true;
1408                 return -1;
1409         }
1410         params->recdata->count++;
1411         memcpy(params->len+(uint8_t *)params->recdata, recdata, recdata->length);
1412         params->len += recdata->length;
1413         talloc_free(recdata);
1414
1415         return 0;
1416 }
1417
1418 /*
1419   push the recdb database out to all nodes
1420  */
1421 static int push_recdb_database(struct ctdb_context *ctdb, uint32_t dbid,
1422                                bool persistent,
1423                                struct tdb_wrap *recdb, struct ctdb_node_map *nodemap)
1424 {
1425         struct recdb_data params;
1426         struct ctdb_marshall_buffer *recdata;
1427         TDB_DATA outdata;
1428         TALLOC_CTX *tmp_ctx;
1429         uint32_t *nodes;
1430
1431         tmp_ctx = talloc_new(ctdb);
1432         CTDB_NO_MEMORY(ctdb, tmp_ctx);
1433
1434         recdata = talloc_zero(recdb, struct ctdb_marshall_buffer);
1435         CTDB_NO_MEMORY(ctdb, recdata);
1436
1437         recdata->db_id = dbid;
1438
1439         params.ctdb = ctdb;
1440         params.recdata = recdata;
1441         params.len = offsetof(struct ctdb_marshall_buffer, data);
1442         params.allocated_len = params.len;
1443         params.failed = false;
1444         params.persistent = persistent;
1445
1446         if (tdb_traverse_read(recdb->tdb, traverse_recdb, &params) == -1) {
1447                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1448                 talloc_free(params.recdata);
1449                 talloc_free(tmp_ctx);
1450                 return -1;
1451         }
1452
1453         if (params.failed) {
1454                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1455                 talloc_free(params.recdata);
1456                 talloc_free(tmp_ctx);
1457                 return -1;              
1458         }
1459
1460         recdata = params.recdata;
1461
1462         outdata.dptr = (void *)recdata;
1463         outdata.dsize = params.len;
1464
1465         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
1466         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_PUSH_DB,
1467                                         nodes, 0,
1468                                         CONTROL_TIMEOUT(), false, outdata,
1469                                         NULL, NULL,
1470                                         NULL) != 0) {
1471                 DEBUG(DEBUG_ERR,(__location__ " Failed to push recdb records to nodes for db 0x%x\n", dbid));
1472                 talloc_free(recdata);
1473                 talloc_free(tmp_ctx);
1474                 return -1;
1475         }
1476
1477         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pushed remote database 0x%x of size %u\n", 
1478                   dbid, recdata->count));
1479
1480         talloc_free(recdata);
1481         talloc_free(tmp_ctx);
1482
1483         return 0;
1484 }
1485
1486
1487 /*
1488   go through a full recovery on one database 
1489  */
1490 static int recover_database(struct ctdb_recoverd *rec, 
1491                             TALLOC_CTX *mem_ctx,
1492                             uint32_t dbid,
1493                             bool persistent,
1494                             uint32_t pnn, 
1495                             struct ctdb_node_map *nodemap,
1496                             uint32_t transaction_id)
1497 {
1498         struct tdb_wrap *recdb;
1499         int ret;
1500         struct ctdb_context *ctdb = rec->ctdb;
1501         TDB_DATA data;
1502         struct ctdb_control_transdb w;
1503         uint32_t *nodes;
1504
1505         recdb = create_recdb(ctdb, mem_ctx);
1506         if (recdb == NULL) {
1507                 return -1;
1508         }
1509
1510         /* pull all remote databases onto the recdb */
1511         ret = pull_remote_database(ctdb, rec, nodemap, recdb, dbid, persistent);
1512         if (ret != 0) {
1513                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull remote database 0x%x\n", dbid));
1514                 return -1;
1515         }
1516
1517         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pulled remote database 0x%x\n", dbid));
1518
1519         /* wipe all the remote databases. This is safe as we are in a transaction */
1520         w.db_id = dbid;
1521         w.transaction_id = transaction_id;
1522
1523         data.dptr = (void *)&w;
1524         data.dsize = sizeof(w);
1525
1526         nodes = list_of_active_nodes(ctdb, nodemap, recdb, true);
1527         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
1528                                         nodes, 0,
1529                                         CONTROL_TIMEOUT(), false, data,
1530                                         NULL, NULL,
1531                                         NULL) != 0) {
1532                 DEBUG(DEBUG_ERR, (__location__ " Unable to wipe database. Recovery failed.\n"));
1533                 talloc_free(recdb);
1534                 return -1;
1535         }
1536         
1537         /* push out the correct database. This sets the dmaster and skips 
1538            the empty records */
1539         ret = push_recdb_database(ctdb, dbid, persistent, recdb, nodemap);
1540         if (ret != 0) {
1541                 talloc_free(recdb);
1542                 return -1;
1543         }
1544
1545         /* all done with this database */
1546         talloc_free(recdb);
1547
1548         return 0;
1549 }
1550
1551 static int ctdb_reload_remote_public_ips(struct ctdb_context *ctdb,
1552                                          struct ctdb_recoverd *rec,
1553                                          struct ctdb_node_map *nodemap,
1554                                          uint32_t *culprit)
1555 {
1556         int j;
1557         int ret;
1558
1559         if (ctdb->num_nodes != nodemap->num) {
1560                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) invalid param\n",
1561                                   ctdb->num_nodes, nodemap->num));
1562                 if (culprit) {
1563                         *culprit = ctdb->pnn;
1564                 }
1565                 return -1;
1566         }
1567
1568         for (j=0; j<nodemap->num; j++) {
1569                 /* For readability */
1570                 struct ctdb_node *node = ctdb->nodes[j];
1571
1572                 /* release any existing data */
1573                 if (node->known_public_ips) {
1574                         talloc_free(node->known_public_ips);
1575                         node->known_public_ips = NULL;
1576                 }
1577                 if (node->available_public_ips) {
1578                         talloc_free(node->available_public_ips);
1579                         node->available_public_ips = NULL;
1580                 }
1581
1582                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1583                         continue;
1584                 }
1585
1586                 /* Retrieve the list of known public IPs from the node */
1587                 ret = ctdb_ctrl_get_public_ips_flags(ctdb,
1588                                         CONTROL_TIMEOUT(),
1589                                         node->pnn,
1590                                         ctdb->nodes,
1591                                         0,
1592                                         &node->known_public_ips);
1593                 if (ret != 0) {
1594                         DEBUG(DEBUG_ERR,
1595                               ("Failed to read known public IPs from node: %u\n",
1596                                node->pnn));
1597                         if (culprit) {
1598                                 *culprit = node->pnn;
1599                         }
1600                         return -1;
1601                 }
1602
1603                 if (ctdb->do_checkpublicip &&
1604                     !ctdb_op_is_disabled(rec->takeover_run) &&
1605                     verify_remote_ip_allocation(ctdb,
1606                                                  node->known_public_ips,
1607                                                  node->pnn)) {
1608                         DEBUG(DEBUG_ERR,("Trigger IP reallocation\n"));
1609                         rec->need_takeover_run = true;
1610                 }
1611
1612                 /* Retrieve the list of available public IPs from the node */
1613                 ret = ctdb_ctrl_get_public_ips_flags(ctdb,
1614                                         CONTROL_TIMEOUT(),
1615                                         node->pnn,
1616                                         ctdb->nodes,
1617                                         CTDB_PUBLIC_IP_FLAGS_ONLY_AVAILABLE,
1618                                         &node->available_public_ips);
1619                 if (ret != 0) {
1620                         DEBUG(DEBUG_ERR,
1621                               ("Failed to read available public IPs from node: %u\n",
1622                                node->pnn));
1623                         if (culprit) {
1624                                 *culprit = node->pnn;
1625                         }
1626                         return -1;
1627                 }
1628         }
1629
1630         return 0;
1631 }
1632
1633 /* when we start a recovery, make sure all nodes use the same reclock file
1634    setting
1635 */
1636 static int sync_recovery_lock_file_across_cluster(struct ctdb_recoverd *rec)
1637 {
1638         struct ctdb_context *ctdb = rec->ctdb;
1639         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
1640         TDB_DATA data;
1641         uint32_t *nodes;
1642
1643         if (ctdb->recovery_lock_file == NULL) {
1644                 data.dptr  = NULL;
1645                 data.dsize = 0;
1646         } else {
1647                 data.dsize = strlen(ctdb->recovery_lock_file) + 1;
1648                 data.dptr  = (uint8_t *)ctdb->recovery_lock_file;
1649         }
1650
1651         nodes = list_of_active_nodes(ctdb, rec->nodemap, tmp_ctx, true);
1652         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECLOCK_FILE,
1653                                         nodes, 0,
1654                                         CONTROL_TIMEOUT(),
1655                                         false, data,
1656                                         NULL, NULL,
1657                                         rec) != 0) {
1658                 DEBUG(DEBUG_ERR, (__location__ " Failed to sync reclock file settings\n"));
1659                 talloc_free(tmp_ctx);
1660                 return -1;
1661         }
1662
1663         talloc_free(tmp_ctx);
1664         return 0;
1665 }
1666
1667
1668 /*
1669  * this callback is called for every node that failed to execute ctdb_takeover_run()
1670  * and set flag to re-run takeover run.
1671  */
1672 static void takeover_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
1673 {
1674         DEBUG(DEBUG_ERR, ("Node %u failed the takeover run\n", node_pnn));
1675
1676         if (callback_data != NULL) {
1677                 struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
1678
1679                 DEBUG(DEBUG_ERR, ("Setting node %u as recovery fail culprit\n", node_pnn));
1680
1681                 ctdb_set_culprit(rec, node_pnn);
1682         }
1683 }
1684
1685
1686 static void ban_misbehaving_nodes(struct ctdb_recoverd *rec, bool *self_ban)
1687 {
1688         struct ctdb_context *ctdb = rec->ctdb;
1689         int i;
1690         struct ctdb_banning_state *ban_state;
1691
1692         *self_ban = false;
1693         for (i=0; i<ctdb->num_nodes; i++) {
1694                 if (ctdb->nodes[i]->ban_state == NULL) {
1695                         continue;
1696                 }
1697                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
1698                 if (ban_state->count < 2*ctdb->num_nodes) {
1699                         continue;
1700                 }
1701
1702                 DEBUG(DEBUG_NOTICE,("Node %u reached %u banning credits - banning it for %u seconds\n",
1703                         ctdb->nodes[i]->pnn, ban_state->count,
1704                         ctdb->tunable.recovery_ban_period));
1705                 ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
1706                 ban_state->count = 0;
1707
1708                 /* Banning ourself? */
1709                 if (ctdb->nodes[i]->pnn == rec->ctdb->pnn) {
1710                         *self_ban = true;
1711                 }
1712         }
1713 }
1714
1715 static bool do_takeover_run(struct ctdb_recoverd *rec,
1716                             struct ctdb_node_map *nodemap,
1717                             bool banning_credits_on_fail)
1718 {
1719         uint32_t *nodes = NULL;
1720         struct srvid_request_data dtr;
1721         TDB_DATA data;
1722         int i;
1723         uint32_t *rebalance_nodes = rec->force_rebalance_nodes;
1724         int ret;
1725         bool ok;
1726
1727         DEBUG(DEBUG_NOTICE, ("Takeover run starting\n"));
1728
1729         if (ctdb_op_is_in_progress(rec->takeover_run)) {
1730                 DEBUG(DEBUG_ERR, (__location__
1731                                   " takeover run already in progress \n"));
1732                 ok = false;
1733                 goto done;
1734         }
1735
1736         if (!ctdb_op_begin(rec->takeover_run)) {
1737                 ok = false;
1738                 goto done;
1739         }
1740
1741         /* Disable IP checks (takeover runs, really) on other nodes
1742          * while doing this takeover run.  This will stop those other
1743          * nodes from triggering takeover runs when think they should
1744          * be hosting an IP but it isn't yet on an interface.  Don't
1745          * wait for replies since a failure here might cause some
1746          * noise in the logs but will not actually cause a problem.
1747          */
1748         dtr.srvid = 0; /* No reply */
1749         dtr.pnn = -1;
1750
1751         data.dptr  = (uint8_t*)&dtr;
1752         data.dsize = sizeof(dtr);
1753
1754         nodes = list_of_connected_nodes(rec->ctdb, nodemap, rec, false);
1755
1756         /* Disable for 60 seconds.  This can be a tunable later if
1757          * necessary.
1758          */
1759         dtr.data = 60;
1760         for (i = 0; i < talloc_array_length(nodes); i++) {
1761                 if (ctdb_client_send_message(rec->ctdb, nodes[i],
1762                                              CTDB_SRVID_DISABLE_TAKEOVER_RUNS,
1763                                              data) != 0) {
1764                         DEBUG(DEBUG_INFO,("Failed to disable takeover runs\n"));
1765                 }
1766         }
1767
1768         ret = ctdb_takeover_run(rec->ctdb, nodemap,
1769                                 rec->force_rebalance_nodes,
1770                                 takeover_fail_callback,
1771                                 banning_credits_on_fail ? rec : NULL);
1772
1773         /* Reenable takeover runs and IP checks on other nodes */
1774         dtr.data = 0;
1775         for (i = 0; i < talloc_array_length(nodes); i++) {
1776                 if (ctdb_client_send_message(rec->ctdb, nodes[i],
1777                                              CTDB_SRVID_DISABLE_TAKEOVER_RUNS,
1778                                              data) != 0) {
1779                         DEBUG(DEBUG_INFO,("Failed to reenable takeover runs\n"));
1780                 }
1781         }
1782
1783         if (ret != 0) {
1784                 DEBUG(DEBUG_ERR, ("ctdb_takeover_run() failed\n"));
1785                 ok = false;
1786                 goto done;
1787         }
1788
1789         ok = true;
1790         /* Takeover run was successful so clear force rebalance targets */
1791         if (rebalance_nodes == rec->force_rebalance_nodes) {
1792                 TALLOC_FREE(rec->force_rebalance_nodes);
1793         } else {
1794                 DEBUG(DEBUG_WARNING,
1795                       ("Rebalance target nodes changed during takeover run - not clearing\n"));
1796         }
1797 done:
1798         rec->need_takeover_run = !ok;
1799         talloc_free(nodes);
1800         ctdb_op_end(rec->takeover_run);
1801
1802         DEBUG(DEBUG_NOTICE, ("Takeover run %s\n", ok ? "completed successfully" : "unsuccessful"));
1803         return ok;
1804 }
1805
1806 struct recovery_helper_state {
1807         int fd[2];
1808         pid_t pid;
1809         int result;
1810         bool done;
1811 };
1812
1813 static void ctdb_recovery_handler(struct tevent_context *ev,
1814                                   struct tevent_fd *fde,
1815                                   uint16_t flags, void *private_data)
1816 {
1817         struct recovery_helper_state *state = talloc_get_type_abort(
1818                 private_data, struct recovery_helper_state);
1819         int ret;
1820
1821         ret = sys_read(state->fd[0], &state->result, sizeof(state->result));
1822         if (ret != sizeof(state->result)) {
1823                 state->result = EPIPE;
1824         }
1825
1826         state->done = true;
1827 }
1828
1829
1830 static int db_recovery_parallel(struct ctdb_recoverd *rec, TALLOC_CTX *mem_ctx)
1831 {
1832         static char prog[PATH_MAX+1] = "";
1833         const char **args;
1834         struct recovery_helper_state *state;
1835         struct tevent_fd *fde;
1836         int nargs, ret;
1837
1838         if (!ctdb_set_helper("recovery_helper", prog, sizeof(prog),
1839                              "CTDB_RECOVERY_HELPER", CTDB_HELPER_BINDIR,
1840                              "ctdb_recovery_helper")) {
1841                 ctdb_die(rec->ctdb, "Unable to set recovery helper\n");
1842         }
1843
1844         state = talloc_zero(mem_ctx, struct recovery_helper_state);
1845         if (state == NULL) {
1846                 DEBUG(DEBUG_ERR, (__location__ " memory error\n"));
1847                 return -1;
1848         }
1849
1850         state->pid = -1;
1851
1852         ret = pipe(state->fd);
1853         if (ret != 0) {
1854                 DEBUG(DEBUG_ERR,
1855                       ("Failed to create pipe for recovery helper\n"));
1856                 goto fail;
1857         }
1858
1859         set_close_on_exec(state->fd[0]);
1860
1861         nargs = 4;
1862         args = talloc_array(state, const char *, nargs);
1863         if (args == NULL) {
1864                 DEBUG(DEBUG_ERR, (__location__ " memory error\n"));
1865                 goto fail;
1866         }
1867
1868         args[0] = talloc_asprintf(args, "%d", state->fd[1]);
1869         args[1] = rec->ctdb->daemon.name;
1870         args[2] = talloc_asprintf(args, "%u", new_generation());
1871         args[3] = NULL;
1872
1873         if (args[0] == NULL || args[2] == NULL) {
1874                 DEBUG(DEBUG_ERR, (__location__ " memory error\n"));
1875                 goto fail;
1876         }
1877
1878         if (!ctdb_vfork_with_logging(state, rec->ctdb, "recovery", prog, nargs,
1879                                      args, NULL, NULL, &state->pid)) {
1880                 DEBUG(DEBUG_ERR,
1881                       ("Failed to create child for recovery helper\n"));
1882                 goto fail;
1883         }
1884
1885         close(state->fd[1]);
1886         state->fd[1] = -1;
1887
1888         state->done = false;
1889
1890         fde = tevent_add_fd(rec->ctdb->ev, rec->ctdb, state->fd[0],
1891                             TEVENT_FD_READ, ctdb_recovery_handler, state);
1892         if (fde == NULL) {
1893                 goto fail;
1894         }
1895         tevent_fd_set_auto_close(fde);
1896
1897         while (!state->done) {
1898                 tevent_loop_once(rec->ctdb->ev);
1899         }
1900
1901         close(state->fd[0]);
1902         state->fd[0] = -1;
1903
1904         if (state->result != 0) {
1905                 goto fail;
1906         }
1907
1908         ctdb_kill(rec->ctdb, state->pid, SIGKILL);
1909         talloc_free(state);
1910         return 0;
1911
1912 fail:
1913         if (state->fd[0] != -1) {
1914                 close(state->fd[0]);
1915         }
1916         if (state->fd[1] != -1) {
1917                 close(state->fd[1]);
1918         }
1919         if (state->pid != -1) {
1920                 ctdb_kill(rec->ctdb, state->pid, SIGKILL);
1921         }
1922         talloc_free(state);
1923         return -1;
1924 }
1925
1926 static int db_recovery_serial(struct ctdb_recoverd *rec, TALLOC_CTX *mem_ctx,
1927                               uint32_t pnn, struct ctdb_node_map *nodemap,
1928                               struct ctdb_vnn_map *vnnmap,
1929                               struct ctdb_dbid_map *dbmap)
1930 {
1931         struct ctdb_context *ctdb = rec->ctdb;
1932         uint32_t generation;
1933         TDB_DATA data;
1934         uint32_t *nodes;
1935         int ret, i, j;
1936
1937         /* set recovery mode to active on all nodes */
1938         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE, true);
1939         if (ret != 0) {
1940                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
1941                 return -1;
1942         }
1943
1944         /* execute the "startrecovery" event script on all nodes */
1945         ret = run_startrecovery_eventscript(rec, nodemap);
1946         if (ret!=0) {
1947                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
1948                 return -1;
1949         }
1950
1951         /* pick a new generation number */
1952         generation = new_generation();
1953
1954         /* change the vnnmap on this node to use the new generation 
1955            number but not on any other nodes.
1956            this guarantees that if we abort the recovery prematurely
1957            for some reason (a node stops responding?)
1958            that we can just return immediately and we will reenter
1959            recovery shortly again.
1960            I.e. we deliberately leave the cluster with an inconsistent
1961            generation id to allow us to abort recovery at any stage and
1962            just restart it from scratch.
1963          */
1964         vnnmap->generation = generation;
1965         ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, vnnmap);
1966         if (ret != 0) {
1967                 DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
1968                 return -1;
1969         }
1970
1971         /* Database generations are updated when the transaction is commited to
1972          * the databases.  So make sure to use the final generation as the
1973          * transaction id
1974          */
1975         generation = new_generation();
1976
1977         data.dptr = (void *)&generation;
1978         data.dsize = sizeof(uint32_t);
1979
1980         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
1981         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
1982                                         nodes, 0,
1983                                         CONTROL_TIMEOUT(), false, data,
1984                                         NULL,
1985                                         transaction_start_fail_callback,
1986                                         rec) != 0) {
1987                 DEBUG(DEBUG_ERR, (__location__ " Unable to start transactions. Recovery failed.\n"));
1988                 if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_CANCEL,
1989                                         nodes, 0,
1990                                         CONTROL_TIMEOUT(), false, tdb_null,
1991                                         NULL,
1992                                         NULL,
1993                                         NULL) != 0) {
1994                         DEBUG(DEBUG_ERR,("Failed to cancel recovery transaction\n"));
1995                 }
1996                 return -1;
1997         }
1998
1999         DEBUG(DEBUG_NOTICE,(__location__ " started transactions on all nodes\n"));
2000
2001         for (i=0;i<dbmap->num;i++) {
2002                 ret = recover_database(rec, mem_ctx,
2003                                        dbmap->dbs[i].dbid,
2004                                        dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT,
2005                                        pnn, nodemap, generation);
2006                 if (ret != 0) {
2007                         DEBUG(DEBUG_ERR, (__location__ " Failed to recover database 0x%x\n", dbmap->dbs[i].dbid));
2008                         return -1;
2009                 }
2010         }
2011
2012         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - starting database commits\n"));
2013
2014         /* commit all the changes */
2015         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
2016                                         nodes, 0,
2017                                         CONTROL_TIMEOUT(), false, data,
2018                                         NULL, NULL,
2019                                         NULL) != 0) {
2020                 DEBUG(DEBUG_ERR, (__location__ " Unable to commit recovery changes. Recovery failed.\n"));
2021                 return -1;
2022         }
2023
2024         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - committed databases\n"));
2025
2026         /* build a new vnn map with all the currently active and
2027            unbanned nodes */
2028         vnnmap = talloc(mem_ctx, struct ctdb_vnn_map);
2029         CTDB_NO_MEMORY(ctdb, vnnmap);
2030         vnnmap->generation = generation;
2031         vnnmap->size = 0;
2032         vnnmap->map = talloc_zero_array(vnnmap, uint32_t, vnnmap->size);
2033         CTDB_NO_MEMORY(ctdb, vnnmap->map);
2034         for (i=j=0;i<nodemap->num;i++) {
2035                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2036                         continue;
2037                 }
2038                 if (!ctdb_node_has_capabilities(rec->caps,
2039                                                 ctdb->nodes[i]->pnn,
2040                                                 CTDB_CAP_LMASTER)) {
2041                         /* this node can not be an lmaster */
2042                         DEBUG(DEBUG_DEBUG, ("Node %d cant be a LMASTER, skipping it\n", i));
2043                         continue;
2044                 }
2045
2046                 vnnmap->size++;
2047                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
2048                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
2049                 vnnmap->map[j++] = nodemap->nodes[i].pnn;
2050
2051         }
2052         if (vnnmap->size == 0) {
2053                 DEBUG(DEBUG_NOTICE, ("No suitable lmasters found. Adding local node (recmaster) anyway.\n"));
2054                 vnnmap->size++;
2055                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
2056                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
2057                 vnnmap->map[0] = pnn;
2058         }
2059
2060         /* update to the new vnnmap on all nodes */
2061         ret = update_vnnmap_on_all_nodes(ctdb, nodemap, pnn, vnnmap, mem_ctx);
2062         if (ret != 0) {
2063                 DEBUG(DEBUG_ERR, (__location__ " Unable to update vnnmap on all nodes\n"));
2064                 return -1;
2065         }
2066
2067         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated vnnmap\n"));
2068
2069         /* update recmaster to point to us for all nodes */
2070         ret = set_recovery_master(ctdb, nodemap, pnn);
2071         if (ret!=0) {
2072                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery master\n"));
2073                 return -1;
2074         }
2075
2076         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated recmaster\n"));
2077
2078         /* disable recovery mode */
2079         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_NORMAL, false);
2080         if (ret != 0) {
2081                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to normal on cluster\n"));
2082                 return -1;
2083         }
2084
2085         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - disabled recovery mode\n"));
2086
2087         return 0;
2088 }
2089
2090 /*
2091   we are the recmaster, and recovery is needed - start a recovery run
2092  */
2093 static int do_recovery(struct ctdb_recoverd *rec,
2094                        TALLOC_CTX *mem_ctx, uint32_t pnn,
2095                        struct ctdb_node_map *nodemap, struct ctdb_vnn_map *vnnmap)
2096 {
2097         struct ctdb_context *ctdb = rec->ctdb;
2098         int i, ret;
2099         struct ctdb_dbid_map *dbmap;
2100         struct timeval start_time;
2101         uint32_t culprit = (uint32_t)-1;
2102         bool self_ban;
2103         bool par_recovery;
2104
2105         DEBUG(DEBUG_NOTICE, (__location__ " Starting do_recovery\n"));
2106
2107         /* Check if the current node is still the recmaster.  It's possible that
2108          * re-election has changed the recmaster, but we have not yet updated
2109          * that information.
2110          */
2111         ret = ctdb_ctrl_getrecmaster(ctdb, mem_ctx, CONTROL_TIMEOUT(),
2112                                      pnn, &ctdb->recovery_master);
2113         if (ret != 0) {
2114                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster\n"));
2115                 return -1;
2116         }
2117
2118         if (pnn != ctdb->recovery_master) {
2119                 DEBUG(DEBUG_NOTICE,
2120                       ("Recovery master changed to %u, aborting recovery\n",
2121                        ctdb->recovery_master));
2122                 return -1;
2123         }
2124
2125         /* if recovery fails, force it again */
2126         rec->need_recovery = true;
2127
2128         if (!ctdb_op_begin(rec->recovery)) {
2129                 return -1;
2130         }
2131
2132         if (rec->election_timeout) {
2133                 /* an election is in progress */
2134                 DEBUG(DEBUG_ERR, ("do_recovery called while election in progress - try again later\n"));
2135                 goto fail;
2136         }
2137
2138         ban_misbehaving_nodes(rec, &self_ban);
2139         if (self_ban) {
2140                 DEBUG(DEBUG_NOTICE, ("This node was banned, aborting recovery\n"));
2141                 goto fail;
2142         }
2143
2144         if (ctdb->recovery_lock_file != NULL) {
2145                 if (ctdb_recovery_have_lock(ctdb)) {
2146                         DEBUG(DEBUG_NOTICE, ("Already holding recovery lock\n"));
2147                 } else {
2148                         start_time = timeval_current();
2149                         DEBUG(DEBUG_NOTICE, ("Attempting to take recovery lock (%s)\n",
2150                                              ctdb->recovery_lock_file));
2151                         if (!ctdb_recovery_lock(ctdb)) {
2152                                 if (ctdb->runstate == CTDB_RUNSTATE_FIRST_RECOVERY) {
2153                                         /* If ctdb is trying first recovery, it's
2154                                          * possible that current node does not know
2155                                          * yet who the recmaster is.
2156                                          */
2157                                         DEBUG(DEBUG_ERR, ("Unable to get recovery lock"
2158                                                           " - retrying recovery\n"));
2159                                         goto fail;
2160                                 }
2161
2162                                 DEBUG(DEBUG_ERR,("Unable to get recovery lock - aborting recovery "
2163                                                  "and ban ourself for %u seconds\n",
2164                                                  ctdb->tunable.recovery_ban_period));
2165                                 ctdb_ban_node(rec, pnn, ctdb->tunable.recovery_ban_period);
2166                                 goto fail;
2167                         }
2168                         ctdb_ctrl_report_recd_lock_latency(ctdb,
2169                                                            CONTROL_TIMEOUT(),
2170                                                            timeval_elapsed(&start_time));
2171                         DEBUG(DEBUG_NOTICE,
2172                               ("Recovery lock taken successfully by recovery daemon\n"));
2173                 }
2174         }
2175
2176         DEBUG(DEBUG_NOTICE, (__location__ " Recovery initiated due to problem with node %u\n", rec->last_culprit_node));
2177
2178         /* get a list of all databases */
2179         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &dbmap);
2180         if (ret != 0) {
2181                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node :%u\n", pnn));
2182                 goto fail;
2183         }
2184
2185         /* we do the db creation before we set the recovery mode, so the freeze happens
2186            on all databases we will be dealing with. */
2187
2188         /* verify that we have all the databases any other node has */
2189         ret = create_missing_local_databases(ctdb, nodemap, pnn, &dbmap, mem_ctx);
2190         if (ret != 0) {
2191                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing local databases\n"));
2192                 goto fail;
2193         }
2194
2195         /* verify that all other nodes have all our databases */
2196         ret = create_missing_remote_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
2197         if (ret != 0) {
2198                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing remote databases\n"));
2199                 goto fail;
2200         }
2201         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - created remote databases\n"));
2202
2203         /* update the database priority for all remote databases */
2204         ret = update_db_priority_on_remote_nodes(ctdb, nodemap, pnn, dbmap, mem_ctx);
2205         if (ret != 0) {
2206                 DEBUG(DEBUG_ERR, (__location__ " Unable to set db priority on remote nodes\n"));
2207         }
2208         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated db priority for all databases\n"));
2209
2210
2211         /* update all other nodes to use the same setting for reclock files
2212            as the local recovery master.
2213         */
2214         sync_recovery_lock_file_across_cluster(rec);
2215
2216         /* update the capabilities for all nodes */
2217         ret = update_capabilities(rec, nodemap);
2218         if (ret!=0) {
2219                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
2220                 return -1;
2221         }
2222
2223         /*
2224           update all nodes to have the same flags that we have
2225          */
2226         for (i=0;i<nodemap->num;i++) {
2227                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
2228                         continue;
2229                 }
2230
2231                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
2232                 if (ret != 0) {
2233                         if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2234                                 DEBUG(DEBUG_WARNING, (__location__ "Unable to update flags on inactive node %d\n", i));
2235                         } else {
2236                                 DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
2237                                 return -1;
2238                         }
2239                 }
2240         }
2241
2242         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
2243
2244         /* Check if all participating nodes have parallel recovery capability */
2245         par_recovery = true;
2246         for (i=0; i<nodemap->num; i++) {
2247                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2248                         continue;
2249                 }
2250
2251                 if (!(rec->caps[i].capabilities &
2252                       CTDB_CAP_PARALLEL_RECOVERY)) {
2253                         par_recovery = false;
2254                         break;
2255                 }
2256         }
2257
2258         if (par_recovery) {
2259                 ret = db_recovery_parallel(rec, mem_ctx);
2260         } else {
2261                 ret = db_recovery_serial(rec, mem_ctx, pnn, nodemap, vnnmap,
2262                                          dbmap);
2263         }
2264
2265         if (ret != 0) {
2266                 goto fail;
2267         }
2268
2269         /* Fetch known/available public IPs from each active node */
2270         ret = ctdb_reload_remote_public_ips(ctdb, rec, nodemap, &culprit);
2271         if (ret != 0) {
2272                 DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
2273                                  culprit));
2274                 rec->need_takeover_run = true;
2275                 goto fail;
2276         }
2277
2278         do_takeover_run(rec, nodemap, false);
2279
2280         /* execute the "recovered" event script on all nodes */
2281         ret = run_recovered_eventscript(rec, nodemap, "do_recovery");
2282         if (ret!=0) {
2283                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Recovery process failed.\n"));
2284                 goto fail;
2285         }
2286
2287         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - finished the recovered event\n"));
2288
2289         /* send a message to all clients telling them that the cluster 
2290            has been reconfigured */
2291         ret = ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED,
2292                                        CTDB_SRVID_RECONFIGURE, tdb_null);
2293         if (ret != 0) {
2294                 DEBUG(DEBUG_ERR, (__location__ " Failed to send reconfigure message\n"));
2295                 goto fail;
2296         }
2297
2298         DEBUG(DEBUG_NOTICE, (__location__ " Recovery complete\n"));
2299
2300         rec->need_recovery = false;
2301         ctdb_op_end(rec->recovery);
2302
2303         /* we managed to complete a full recovery, make sure to forgive
2304            any past sins by the nodes that could now participate in the
2305            recovery.
2306         */
2307         DEBUG(DEBUG_ERR,("Resetting ban count to 0 for all nodes\n"));
2308         for (i=0;i<nodemap->num;i++) {
2309                 struct ctdb_banning_state *ban_state;
2310
2311                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
2312                         continue;
2313                 }
2314
2315                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[nodemap->nodes[i].pnn]->ban_state;
2316                 if (ban_state == NULL) {
2317                         continue;
2318                 }
2319
2320                 ban_state->count = 0;
2321         }
2322
2323         /* We just finished a recovery successfully.
2324            We now wait for rerecovery_timeout before we allow
2325            another recovery to take place.
2326         */
2327         DEBUG(DEBUG_NOTICE, ("Just finished a recovery. New recoveries will now be supressed for the rerecovery timeout (%d seconds)\n", ctdb->tunable.rerecovery_timeout));
2328         ctdb_op_disable(rec->recovery, ctdb->ev,
2329                         ctdb->tunable.rerecovery_timeout);
2330         return 0;
2331
2332 fail:
2333         ctdb_op_end(rec->recovery);
2334         return -1;
2335 }
2336
2337
2338 /*
2339   elections are won by first checking the number of connected nodes, then
2340   the priority time, then the pnn
2341  */
2342 struct election_message {
2343         uint32_t num_connected;
2344         struct timeval priority_time;
2345         uint32_t pnn;
2346         uint32_t node_flags;
2347 };
2348
2349 /*
2350   form this nodes election data
2351  */
2352 static void ctdb_election_data(struct ctdb_recoverd *rec, struct election_message *em)
2353 {
2354         int ret, i;
2355         struct ctdb_node_map *nodemap;
2356         struct ctdb_context *ctdb = rec->ctdb;
2357
2358         ZERO_STRUCTP(em);
2359
2360         em->pnn = rec->ctdb->pnn;
2361         em->priority_time = rec->priority_time;
2362
2363         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, rec, &nodemap);
2364         if (ret != 0) {
2365                 DEBUG(DEBUG_ERR,(__location__ " unable to get node map\n"));
2366                 return;
2367         }
2368
2369         rec->node_flags = nodemap->nodes[ctdb->pnn].flags;
2370         em->node_flags = rec->node_flags;
2371
2372         for (i=0;i<nodemap->num;i++) {
2373                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
2374                         em->num_connected++;
2375                 }
2376         }
2377
2378         /* we shouldnt try to win this election if we cant be a recmaster */
2379         if ((ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
2380                 em->num_connected = 0;
2381                 em->priority_time = timeval_current();
2382         }
2383
2384         talloc_free(nodemap);
2385 }
2386
2387 /*
2388   see if the given election data wins
2389  */
2390 static bool ctdb_election_win(struct ctdb_recoverd *rec, struct election_message *em)
2391 {
2392         struct election_message myem;
2393         int cmp = 0;
2394
2395         ctdb_election_data(rec, &myem);
2396
2397         /* we cant win if we dont have the recmaster capability */
2398         if ((rec->ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
2399                 return false;
2400         }
2401
2402         /* we cant win if we are banned */
2403         if (rec->node_flags & NODE_FLAGS_BANNED) {
2404                 return false;
2405         }
2406
2407         /* we cant win if we are stopped */
2408         if (rec->node_flags & NODE_FLAGS_STOPPED) {
2409                 return false;
2410         }
2411
2412         /* we will automatically win if the other node is banned */
2413         if (em->node_flags & NODE_FLAGS_BANNED) {
2414                 return true;
2415         }
2416
2417         /* we will automatically win if the other node is banned */
2418         if (em->node_flags & NODE_FLAGS_STOPPED) {
2419                 return true;
2420         }
2421
2422         /* then the longest running node */
2423         if (cmp == 0) {
2424                 cmp = timeval_compare(&em->priority_time, &myem.priority_time);
2425         }
2426
2427         if (cmp == 0) {
2428                 cmp = (int)myem.pnn - (int)em->pnn;
2429         }
2430
2431         return cmp > 0;
2432 }
2433
2434 /*
2435   send out an election request
2436  */
2437 static int send_election_request(struct ctdb_recoverd *rec, uint32_t pnn)
2438 {
2439         int ret;
2440         TDB_DATA election_data;
2441         struct election_message emsg;
2442         uint64_t srvid;
2443         struct ctdb_context *ctdb = rec->ctdb;
2444
2445         srvid = CTDB_SRVID_RECOVERY;
2446
2447         ctdb_election_data(rec, &emsg);
2448
2449         election_data.dsize = sizeof(struct election_message);
2450         election_data.dptr  = (unsigned char *)&emsg;
2451
2452
2453         /* first we assume we will win the election and set 
2454            recoverymaster to be ourself on the current node
2455          */
2456         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), pnn, pnn);
2457         if (ret != 0) {
2458                 DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request\n"));
2459                 return -1;
2460         }
2461
2462
2463         /* send an election message to all active nodes */
2464         DEBUG(DEBUG_INFO,(__location__ " Send election request to all active nodes\n"));
2465         return ctdb_client_send_message(ctdb, CTDB_BROADCAST_ALL, srvid, election_data);
2466 }
2467
2468 /*
2469   this function will unban all nodes in the cluster
2470 */
2471 static void unban_all_nodes(struct ctdb_context *ctdb)
2472 {
2473         int ret, i;
2474         struct ctdb_node_map *nodemap;
2475         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2476         
2477         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
2478         if (ret != 0) {
2479                 DEBUG(DEBUG_ERR,(__location__ " failed to get nodemap to unban all nodes\n"));
2480                 return;
2481         }
2482
2483         for (i=0;i<nodemap->num;i++) {
2484                 if ( (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED))
2485                   && (nodemap->nodes[i].flags & NODE_FLAGS_BANNED) ) {
2486                         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(),
2487                                                  nodemap->nodes[i].pnn, 0,
2488                                                  NODE_FLAGS_BANNED);
2489                         if (ret != 0) {
2490                                 DEBUG(DEBUG_ERR, (__location__ " failed to reset ban state\n"));
2491                         }
2492                 }
2493         }
2494
2495         talloc_free(tmp_ctx);
2496 }
2497
2498
2499 /*
2500   we think we are winning the election - send a broadcast election request
2501  */
2502 static void election_send_request(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
2503 {
2504         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
2505         int ret;
2506
2507         ret = send_election_request(rec, ctdb_get_pnn(rec->ctdb));
2508         if (ret != 0) {
2509                 DEBUG(DEBUG_ERR,("Failed to send election request!\n"));
2510         }
2511
2512         talloc_free(rec->send_election_te);
2513         rec->send_election_te = NULL;
2514 }
2515
2516 /*
2517   handler for memory dumps
2518 */
2519 static void mem_dump_handler(uint64_t srvid, TDB_DATA data, void *private_data)
2520 {
2521         struct ctdb_recoverd *rec = talloc_get_type(
2522                 private_data, struct ctdb_recoverd);
2523         struct ctdb_context *ctdb = rec->ctdb;
2524         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2525         TDB_DATA *dump;
2526         int ret;
2527         struct srvid_request *rd;
2528
2529         if (data.dsize != sizeof(struct srvid_request)) {
2530                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
2531                 talloc_free(tmp_ctx);
2532                 return;
2533         }
2534         rd = (struct srvid_request *)data.dptr;
2535
2536         dump = talloc_zero(tmp_ctx, TDB_DATA);
2537         if (dump == NULL) {
2538                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate memory for memdump\n"));
2539                 talloc_free(tmp_ctx);
2540                 return;
2541         }
2542         ret = ctdb_dump_memory(ctdb, dump);
2543         if (ret != 0) {
2544                 DEBUG(DEBUG_ERR, (__location__ " ctdb_dump_memory() failed\n"));
2545                 talloc_free(tmp_ctx);
2546                 return;
2547         }
2548
2549 DEBUG(DEBUG_ERR, ("recovery master memory dump\n"));            
2550
2551         ret = ctdb_client_send_message(ctdb, rd->pnn, rd->srvid, *dump);
2552         if (ret != 0) {
2553                 DEBUG(DEBUG_ERR,("Failed to send rd memdump reply message\n"));
2554                 talloc_free(tmp_ctx);
2555                 return;
2556         }
2557
2558         talloc_free(tmp_ctx);
2559 }
2560
2561 /*
2562   handler for reload_nodes
2563 */
2564 static void reload_nodes_handler(uint64_t srvid, TDB_DATA data,
2565                                  void *private_data)
2566 {
2567         struct ctdb_recoverd *rec = talloc_get_type(
2568                 private_data, struct ctdb_recoverd);
2569
2570         DEBUG(DEBUG_ERR, (__location__ " Reload nodes file from recovery daemon\n"));
2571
2572         ctdb_load_nodes_file(rec->ctdb);
2573 }
2574
2575
2576 static void ctdb_rebalance_timeout(struct event_context *ev,
2577                                    struct timed_event *te,
2578                                    struct timeval t, void *p)
2579 {
2580         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
2581
2582         if (rec->force_rebalance_nodes == NULL) {
2583                 DEBUG(DEBUG_ERR,
2584                       ("Rebalance timeout occurred - no nodes to rebalance\n"));
2585                 return;
2586         }
2587
2588         DEBUG(DEBUG_NOTICE,
2589               ("Rebalance timeout occurred - do takeover run\n"));
2590         do_takeover_run(rec, rec->nodemap, false);
2591 }
2592
2593
2594 static void recd_node_rebalance_handler(uint64_t srvid, TDB_DATA data,
2595                                         void *private_data)
2596 {
2597         struct ctdb_recoverd *rec = talloc_get_type(
2598                 private_data, struct ctdb_recoverd);
2599         struct ctdb_context *ctdb = rec->ctdb;
2600         uint32_t pnn;
2601         uint32_t *t;
2602         int len;
2603         uint32_t deferred_rebalance;
2604
2605         if (rec->recmaster != ctdb_get_pnn(ctdb)) {
2606                 return;
2607         }
2608
2609         if (data.dsize != sizeof(uint32_t)) {
2610                 DEBUG(DEBUG_ERR,(__location__ " Incorrect size of node rebalance message. Was %zd but expected %zd bytes\n", data.dsize, sizeof(uint32_t)));
2611                 return;
2612         }
2613
2614         pnn = *(uint32_t *)&data.dptr[0];
2615
2616         DEBUG(DEBUG_NOTICE,("Setting up rebalance of IPs to node %u\n", pnn));
2617
2618         /* Copy any existing list of nodes.  There's probably some
2619          * sort of realloc variant that will do this but we need to
2620          * make sure that freeing the old array also cancels the timer
2621          * event for the timeout... not sure if realloc will do that.
2622          */
2623         len = (rec->force_rebalance_nodes != NULL) ?
2624                 talloc_array_length(rec->force_rebalance_nodes) :
2625                 0;
2626
2627         /* This allows duplicates to be added but they don't cause
2628          * harm.  A call to add a duplicate PNN arguably means that
2629          * the timeout should be reset, so this is the simplest
2630          * solution.
2631          */
2632         t = talloc_zero_array(rec, uint32_t, len+1);
2633         CTDB_NO_MEMORY_VOID(ctdb, t);
2634         if (len > 0) {
2635                 memcpy(t, rec->force_rebalance_nodes, sizeof(uint32_t) * len);
2636         }
2637         t[len] = pnn;
2638
2639         talloc_free(rec->force_rebalance_nodes);
2640
2641         rec->force_rebalance_nodes = t;
2642
2643         /* If configured, setup a deferred takeover run to make sure
2644          * that certain nodes get IPs rebalanced to them.  This will
2645          * be cancelled if a successful takeover run happens before
2646          * the timeout.  Assign tunable value to variable for
2647          * readability.
2648          */
2649         deferred_rebalance = ctdb->tunable.deferred_rebalance_on_node_add;
2650         if (deferred_rebalance != 0) {
2651                 event_add_timed(ctdb->ev, rec->force_rebalance_nodes,
2652                                 timeval_current_ofs(deferred_rebalance, 0),
2653                                 ctdb_rebalance_timeout, rec);
2654         }
2655 }
2656
2657
2658
2659 static void recd_update_ip_handler(uint64_t srvid, TDB_DATA data,
2660                                    void *private_data)
2661 {
2662         struct ctdb_recoverd *rec = talloc_get_type(
2663                 private_data, struct ctdb_recoverd);
2664         struct ctdb_public_ip *ip;
2665
2666         if (rec->recmaster != rec->ctdb->pnn) {
2667                 DEBUG(DEBUG_INFO,("Not recmaster, ignore update ip message\n"));
2668                 return;
2669         }
2670
2671         if (data.dsize != sizeof(struct ctdb_public_ip)) {
2672                 DEBUG(DEBUG_ERR,(__location__ " Incorrect size of recd update ip message. Was %zd but expected %zd bytes\n", data.dsize, sizeof(struct ctdb_public_ip)));
2673                 return;
2674         }
2675
2676         ip = (struct ctdb_public_ip *)data.dptr;
2677
2678         update_ip_assignment_tree(rec->ctdb, ip);
2679 }
2680
2681 static void srvid_disable_and_reply(struct ctdb_context *ctdb,
2682                                     TDB_DATA data,
2683                                     struct ctdb_op_state *op_state)
2684 {
2685         struct srvid_request_data *r;
2686         uint32_t timeout;
2687         TDB_DATA result;
2688         int32_t ret = 0;
2689
2690         /* Validate input data */
2691         if (data.dsize != sizeof(struct srvid_request_data)) {
2692                 DEBUG(DEBUG_ERR,(__location__ " Wrong size for data :%lu "
2693                                  "expecting %lu\n", (long unsigned)data.dsize,
2694                                  (long unsigned)sizeof(struct srvid_request)));
2695                 return;
2696         }
2697         if (data.dptr == NULL) {
2698                 DEBUG(DEBUG_ERR,(__location__ " No data received\n"));
2699                 return;
2700         }
2701
2702         r = (struct srvid_request_data *)data.dptr;
2703         timeout = r->data;
2704
2705         ret = ctdb_op_disable(op_state, ctdb->ev, timeout);
2706         if (ret != 0) {
2707                 goto done;
2708         }
2709
2710         /* Returning our PNN tells the caller that we succeeded */
2711         ret = ctdb_get_pnn(ctdb);
2712 done:
2713         result.dsize = sizeof(int32_t);
2714         result.dptr  = (uint8_t *)&ret;
2715         srvid_request_reply(ctdb, (struct srvid_request *)r, result);
2716 }
2717
2718 static void disable_takeover_runs_handler(uint64_t srvid, TDB_DATA data,
2719                                           void *private_data)
2720 {
2721         struct ctdb_recoverd *rec = talloc_get_type(
2722                 private_data, struct ctdb_recoverd);
2723
2724         srvid_disable_and_reply(rec->ctdb, data, rec->takeover_run);
2725 }
2726
2727 /* Backward compatibility for this SRVID */
2728 static void disable_ip_check_handler(uint64_t srvid, TDB_DATA data,
2729                                      void *private_data)
2730 {
2731         struct ctdb_recoverd *rec = talloc_get_type(
2732                 private_data, struct ctdb_recoverd);
2733         uint32_t timeout;
2734
2735         if (data.dsize != sizeof(uint32_t)) {
2736                 DEBUG(DEBUG_ERR,(__location__ " Wrong size for data :%lu "
2737                                  "expecting %lu\n", (long unsigned)data.dsize,
2738                                  (long unsigned)sizeof(uint32_t)));
2739                 return;
2740         }
2741         if (data.dptr == NULL) {
2742                 DEBUG(DEBUG_ERR,(__location__ " No data received\n"));
2743                 return;
2744         }
2745
2746         timeout = *((uint32_t *)data.dptr);
2747
2748         ctdb_op_disable(rec->takeover_run, rec->ctdb->ev, timeout);
2749 }
2750
2751 static void disable_recoveries_handler(uint64_t srvid, TDB_DATA data,
2752                                        void *private_data)
2753 {
2754         struct ctdb_recoverd *rec = talloc_get_type(
2755                 private_data, struct ctdb_recoverd);
2756
2757         srvid_disable_and_reply(rec->ctdb, data, rec->recovery);
2758 }
2759
2760 /*
2761   handler for ip reallocate, just add it to the list of requests and 
2762   handle this later in the monitor_cluster loop so we do not recurse
2763   with other requests to takeover_run()
2764 */
2765 static void ip_reallocate_handler(uint64_t srvid, TDB_DATA data,
2766                                   void *private_data)
2767 {
2768         struct srvid_request *request;
2769         struct ctdb_recoverd *rec = talloc_get_type(
2770                 private_data, struct ctdb_recoverd);
2771
2772         if (data.dsize != sizeof(struct srvid_request)) {
2773                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
2774                 return;
2775         }
2776
2777         request = (struct srvid_request *)data.dptr;
2778
2779         srvid_request_add(rec->ctdb, &rec->reallocate_requests, request);
2780 }
2781
2782 static void process_ipreallocate_requests(struct ctdb_context *ctdb,
2783                                           struct ctdb_recoverd *rec)
2784 {
2785         TDB_DATA result;
2786         int32_t ret;
2787         uint32_t culprit;
2788         struct srvid_requests *current;
2789
2790         DEBUG(DEBUG_INFO, ("recovery master forced ip reallocation\n"));
2791
2792         /* Only process requests that are currently pending.  More
2793          * might come in while the takeover run is in progress and
2794          * they will need to be processed later since they might
2795          * be in response flag changes.
2796          */
2797         current = rec->reallocate_requests;
2798         rec->reallocate_requests = NULL;
2799
2800         /* update the list of public ips that a node can handle for
2801            all connected nodes
2802         */
2803         ret = ctdb_reload_remote_public_ips(ctdb, rec, rec->nodemap, &culprit);
2804         if (ret != 0) {
2805                 DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
2806                                  culprit));
2807                 rec->need_takeover_run = true;
2808         }
2809         if (ret == 0) {
2810                 if (do_takeover_run(rec, rec->nodemap, false)) {
2811                         ret = ctdb_get_pnn(ctdb);
2812                 } else {
2813                         ret = -1;
2814                 }
2815         }
2816
2817         result.dsize = sizeof(int32_t);
2818         result.dptr  = (uint8_t *)&ret;
2819
2820         srvid_requests_reply(ctdb, &current, result);
2821 }
2822
2823
2824 /*
2825   handler for recovery master elections
2826 */
2827 static void election_handler(uint64_t srvid, TDB_DATA data, void *private_data)
2828 {
2829         struct ctdb_recoverd *rec = talloc_get_type(
2830                 private_data, struct ctdb_recoverd);
2831         struct ctdb_context *ctdb = rec->ctdb;
2832         int ret;
2833         struct election_message *em = (struct election_message *)data.dptr;
2834
2835         /* Ignore election packets from ourself */
2836         if (ctdb->pnn == em->pnn) {
2837                 return;
2838         }
2839
2840         /* we got an election packet - update the timeout for the election */
2841         talloc_free(rec->election_timeout);
2842         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
2843                                                 fast_start ?
2844                                                 timeval_current_ofs(0, 500000) :
2845                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
2846                                                 ctdb_election_timeout, rec);
2847
2848         /* someone called an election. check their election data
2849            and if we disagree and we would rather be the elected node, 
2850            send a new election message to all other nodes
2851          */
2852         if (ctdb_election_win(rec, em)) {
2853                 if (!rec->send_election_te) {
2854                         rec->send_election_te = event_add_timed(ctdb->ev, rec, 
2855                                                                 timeval_current_ofs(0, 500000),
2856                                                                 election_send_request, rec);
2857                 }
2858                 /*unban_all_nodes(ctdb);*/
2859                 return;
2860         }
2861
2862         /* we didn't win */
2863         TALLOC_FREE(rec->send_election_te);
2864
2865         /* Release the recovery lock file */
2866         if (ctdb_recovery_have_lock(ctdb)) {
2867                 ctdb_recovery_unlock(ctdb);
2868                 unban_all_nodes(ctdb);
2869         }
2870
2871         clear_ip_assignment_tree(ctdb);
2872
2873         /* ok, let that guy become recmaster then */
2874         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), ctdb_get_pnn(ctdb), em->pnn);
2875         if (ret != 0) {
2876                 DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request"));
2877                 return;
2878         }
2879
2880         return;
2881 }
2882
2883
2884 /*
2885   force the start of the election process
2886  */
2887 static void force_election(struct ctdb_recoverd *rec, uint32_t pnn, 
2888                            struct ctdb_node_map *nodemap)
2889 {
2890         int ret;
2891         struct ctdb_context *ctdb = rec->ctdb;
2892
2893         DEBUG(DEBUG_INFO,(__location__ " Force an election\n"));
2894
2895         /* set all nodes to recovery mode to stop all internode traffic */
2896         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE, false);
2897         if (ret != 0) {
2898                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
2899                 return;
2900         }
2901
2902         talloc_free(rec->election_timeout);
2903         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
2904                                                 fast_start ?
2905                                                 timeval_current_ofs(0, 500000) :
2906                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
2907                                                 ctdb_election_timeout, rec);
2908
2909         ret = send_election_request(rec, pnn);
2910         if (ret!=0) {
2911                 DEBUG(DEBUG_ERR, (__location__ " failed to initiate recmaster election"));
2912                 return;
2913         }
2914
2915         /* wait for a few seconds to collect all responses */
2916         ctdb_wait_election(rec);
2917 }
2918
2919
2920
2921 /*
2922   handler for when a node changes its flags
2923 */
2924 static void monitor_handler(uint64_t srvid, TDB_DATA data, void *private_data)
2925 {
2926         struct ctdb_recoverd *rec = talloc_get_type(
2927                 private_data, struct ctdb_recoverd);
2928         struct ctdb_context *ctdb = rec->ctdb;
2929         int ret;
2930         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2931         struct ctdb_node_map *nodemap=NULL;
2932         TALLOC_CTX *tmp_ctx;
2933         int i;
2934         int disabled_flag_changed;
2935
2936         if (data.dsize != sizeof(*c)) {
2937                 DEBUG(DEBUG_ERR,(__location__ "Invalid data in ctdb_node_flag_change\n"));
2938                 return;
2939         }
2940
2941         tmp_ctx = talloc_new(ctdb);
2942         CTDB_NO_MEMORY_VOID(ctdb, tmp_ctx);
2943
2944         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
2945         if (ret != 0) {
2946                 DEBUG(DEBUG_ERR,(__location__ "ctdb_ctrl_getnodemap failed in monitor_handler\n"));
2947                 talloc_free(tmp_ctx);
2948                 return;         
2949         }
2950
2951
2952         for (i=0;i<nodemap->num;i++) {
2953                 if (nodemap->nodes[i].pnn == c->pnn) break;
2954         }
2955
2956         if (i == nodemap->num) {
2957                 DEBUG(DEBUG_CRIT,(__location__ "Flag change for non-existant node %u\n", c->pnn));
2958                 talloc_free(tmp_ctx);
2959                 return;
2960         }
2961
2962         if (c->old_flags != c->new_flags) {
2963                 DEBUG(DEBUG_NOTICE,("Node %u has changed flags - now 0x%x  was 0x%x\n", c->pnn, c->new_flags, c->old_flags));
2964         }
2965
2966         disabled_flag_changed =  (nodemap->nodes[i].flags ^ c->new_flags) & NODE_FLAGS_DISABLED;
2967
2968         nodemap->nodes[i].flags = c->new_flags;
2969
2970         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
2971                                      CTDB_CURRENT_NODE, &ctdb->recovery_master);
2972
2973         if (ret == 0) {
2974                 ret = ctdb_ctrl_getrecmode(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
2975                                            CTDB_CURRENT_NODE, &ctdb->recovery_mode);
2976         }
2977         
2978         if (ret == 0 &&
2979             ctdb->recovery_master == ctdb->pnn &&
2980             ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
2981                 /* Only do the takeover run if the perm disabled or unhealthy
2982                    flags changed since these will cause an ip failover but not
2983                    a recovery.
2984                    If the node became disconnected or banned this will also
2985                    lead to an ip address failover but that is handled 
2986                    during recovery
2987                 */
2988                 if (disabled_flag_changed) {
2989                         rec->need_takeover_run = true;
2990                 }
2991         }
2992
2993         talloc_free(tmp_ctx);
2994 }
2995
2996 /*
2997   handler for when we need to push out flag changes ot all other nodes
2998 */
2999 static void push_flags_handler(uint64_t srvid, TDB_DATA data,
3000                                void *private_data)
3001 {
3002         struct ctdb_recoverd *rec = talloc_get_type(
3003                 private_data, struct ctdb_recoverd);
3004         struct ctdb_context *ctdb = rec->ctdb;
3005         int ret;
3006         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
3007         struct ctdb_node_map *nodemap=NULL;
3008         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3009         uint32_t recmaster;
3010         uint32_t *nodes;
3011
3012         /* find the recovery master */
3013         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &recmaster);
3014         if (ret != 0) {
3015                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
3016                 talloc_free(tmp_ctx);
3017                 return;
3018         }
3019
3020         /* read the node flags from the recmaster */
3021         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), recmaster, tmp_ctx, &nodemap);
3022         if (ret != 0) {
3023                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", c->pnn));
3024                 talloc_free(tmp_ctx);
3025                 return;
3026         }
3027         if (c->pnn >= nodemap->num) {
3028                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", c->pnn));
3029                 talloc_free(tmp_ctx);
3030                 return;
3031         }
3032
3033         /* send the flags update to all connected nodes */
3034         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
3035
3036         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
3037                                       nodes, 0, CONTROL_TIMEOUT(),
3038                                       false, data,
3039                                       NULL, NULL,
3040                                       NULL) != 0) {
3041                 DEBUG(DEBUG_ERR, (__location__ " ctdb_control to modify node flags failed\n"));
3042
3043                 talloc_free(tmp_ctx);
3044                 return;
3045         }
3046
3047         talloc_free(tmp_ctx);
3048 }
3049
3050
3051 struct verify_recmode_normal_data {
3052         uint32_t count;
3053         enum monitor_result status;
3054 };
3055
3056 static void verify_recmode_normal_callback(struct ctdb_client_control_state *state)
3057 {
3058         struct verify_recmode_normal_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmode_normal_data);
3059
3060
3061         /* one more node has responded with recmode data*/
3062         rmdata->count--;
3063
3064         /* if we failed to get the recmode, then return an error and let
3065            the main loop try again.
3066         */
3067         if (state->state != CTDB_CONTROL_DONE) {
3068                 if (rmdata->status == MONITOR_OK) {
3069                         rmdata->status = MONITOR_FAILED;
3070                 }
3071                 return;
3072         }
3073
3074         /* if we got a response, then the recmode will be stored in the
3075            status field
3076         */
3077         if (state->status != CTDB_RECOVERY_NORMAL) {
3078                 DEBUG(DEBUG_NOTICE, ("Node:%u was in recovery mode. Start recovery process\n", state->c->hdr.destnode));
3079                 rmdata->status = MONITOR_RECOVERY_NEEDED;
3080         }
3081
3082         return;
3083 }
3084
3085
3086 /* verify that all nodes are in normal recovery mode */
3087 static enum monitor_result verify_recmode(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
3088 {
3089         struct verify_recmode_normal_data *rmdata;
3090         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
3091         struct ctdb_client_control_state *state;
3092         enum monitor_result status;
3093         int j;
3094         
3095         rmdata = talloc(mem_ctx, struct verify_recmode_normal_data);
3096         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
3097         rmdata->count  = 0;
3098         rmdata->status = MONITOR_OK;
3099
3100         /* loop over all active nodes and send an async getrecmode call to 
3101            them*/
3102         for (j=0; j<nodemap->num; j++) {
3103                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3104                         continue;
3105                 }
3106                 state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, 
3107                                         CONTROL_TIMEOUT(), 
3108                                         nodemap->nodes[j].pnn);
3109                 if (state == NULL) {
3110                         /* we failed to send the control, treat this as 
3111                            an error and try again next iteration
3112                         */                      
3113                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmode_send during monitoring\n"));
3114                         talloc_free(mem_ctx);
3115                         return MONITOR_FAILED;
3116                 }
3117
3118                 /* set up the callback functions */
3119                 state->async.fn = verify_recmode_normal_callback;
3120                 state->async.private_data = rmdata;
3121
3122                 /* one more control to wait for to complete */
3123                 rmdata->count++;
3124         }
3125
3126
3127         /* now wait for up to the maximum number of seconds allowed
3128            or until all nodes we expect a response from has replied
3129         */
3130         while (rmdata->count > 0) {
3131                 event_loop_once(ctdb->ev);
3132         }
3133
3134         status = rmdata->status;
3135         talloc_free(mem_ctx);
3136         return status;
3137 }
3138
3139
3140 struct verify_recmaster_data {
3141         struct ctdb_recoverd *rec;
3142         uint32_t count;
3143         uint32_t pnn;
3144         enum monitor_result status;
3145 };
3146
3147 static void verify_recmaster_callback(struct ctdb_client_control_state *state)
3148 {
3149         struct verify_recmaster_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmaster_data);
3150
3151
3152         /* one more node has responded with recmaster data*/
3153         rmdata->count--;
3154
3155         /* if we failed to get the recmaster, then return an error and let
3156            the main loop try again.
3157         */
3158         if (state->state != CTDB_CONTROL_DONE) {
3159                 if (rmdata->status == MONITOR_OK) {
3160                         rmdata->status = MONITOR_FAILED;
3161                 }
3162                 return;
3163         }
3164
3165         /* if we got a response, then the recmaster will be stored in the
3166            status field
3167         */
3168         if (state->status != rmdata->pnn) {
3169                 DEBUG(DEBUG_ERR,("Node %d thinks node %d is recmaster. Need a new recmaster election\n", state->c->hdr.destnode, state->status));
3170                 ctdb_set_culprit(rmdata->rec, state->c->hdr.destnode);
3171                 rmdata->status = MONITOR_ELECTION_NEEDED;
3172         }
3173
3174         return;
3175 }
3176
3177
3178 /* verify that all nodes agree that we are the recmaster */
3179 static enum monitor_result verify_recmaster(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t pnn)
3180 {
3181         struct ctdb_context *ctdb = rec->ctdb;
3182         struct verify_recmaster_data *rmdata;
3183         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
3184         struct ctdb_client_control_state *state;
3185         enum monitor_result status;
3186         int j;
3187         
3188         rmdata = talloc(mem_ctx, struct verify_recmaster_data);
3189         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
3190         rmdata->rec    = rec;
3191         rmdata->count  = 0;
3192         rmdata->pnn    = pnn;
3193         rmdata->status = MONITOR_OK;
3194
3195         /* loop over all active nodes and send an async getrecmaster call to 
3196            them*/
3197         for (j=0; j<nodemap->num; j++) {
3198                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3199                         continue;
3200                 }
3201                 state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, 
3202                                         CONTROL_TIMEOUT(),
3203                                         nodemap->nodes[j].pnn);
3204                 if (state == NULL) {
3205                         /* we failed to send the control, treat this as 
3206                            an error and try again next iteration
3207                         */                      
3208                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmaster_send during monitoring\n"));
3209                         talloc_free(mem_ctx);
3210                         return MONITOR_FAILED;
3211                 }
3212
3213                 /* set up the callback functions */
3214                 state->async.fn = verify_recmaster_callback;
3215                 state->async.private_data = rmdata;
3216
3217                 /* one more control to wait for to complete */
3218                 rmdata->count++;
3219         }
3220
3221
3222         /* now wait for up to the maximum number of seconds allowed
3223            or until all nodes we expect a response from has replied
3224         */
3225         while (rmdata->count > 0) {
3226                 event_loop_once(ctdb->ev);
3227         }
3228
3229         status = rmdata->status;
3230         talloc_free(mem_ctx);
3231         return status;
3232 }
3233
3234 static bool interfaces_have_changed(struct ctdb_context *ctdb,
3235                                     struct ctdb_recoverd *rec)
3236 {
3237         struct ctdb_control_get_ifaces *ifaces = NULL;
3238         TALLOC_CTX *mem_ctx;
3239         bool ret = false;
3240
3241         mem_ctx = talloc_new(NULL);
3242
3243         /* Read the interfaces from the local node */
3244         if (ctdb_ctrl_get_ifaces(ctdb, CONTROL_TIMEOUT(),
3245                                  CTDB_CURRENT_NODE, mem_ctx, &ifaces) != 0) {
3246                 DEBUG(DEBUG_ERR, ("Unable to get interfaces from local node %u\n", ctdb->pnn));
3247                 /* We could return an error.  However, this will be
3248                  * rare so we'll decide that the interfaces have
3249                  * actually changed, just in case.
3250                  */
3251                 talloc_free(mem_ctx);
3252                 return true;
3253         }
3254
3255         if (!rec->ifaces) {
3256                 /* We haven't been here before so things have changed */
3257                 DEBUG(DEBUG_NOTICE, ("Initial interface fetched\n"));
3258                 ret = true;
3259         } else if (rec->ifaces->num != ifaces->num) {
3260                 /* Number of interfaces has changed */
3261                 DEBUG(DEBUG_NOTICE, ("Interface count changed from %d to %d\n",
3262                                      rec->ifaces->num, ifaces->num));
3263                 ret = true;
3264         } else {
3265                 /* See if interface names or link states have changed */
3266                 int i;
3267                 for (i = 0; i < rec->ifaces->num; i++) {
3268                         struct ctdb_control_iface_info * iface = &rec->ifaces->ifaces[i];
3269                         if (strcmp(iface->name, ifaces->ifaces[i].name) != 0) {
3270                                 DEBUG(DEBUG_NOTICE,
3271                                       ("Interface in slot %d changed: %s => %s\n",
3272                                        i, iface->name, ifaces->ifaces[i].name));
3273                                 ret = true;
3274                                 break;
3275                         }
3276                         if (iface->link_state != ifaces->ifaces[i].link_state) {
3277                                 DEBUG(DEBUG_NOTICE,
3278                                       ("Interface %s changed state: %d => %d\n",
3279                                        iface->name, iface->link_state,
3280                                        ifaces->ifaces[i].link_state));
3281                                 ret = true;
3282                                 break;
3283                         }
3284                 }
3285         }
3286
3287         talloc_free(rec->ifaces);
3288         rec->ifaces = talloc_steal(rec, ifaces);
3289
3290         talloc_free(mem_ctx);
3291         return ret;
3292 }
3293
3294 /* called to check that the local allocation of public ip addresses is ok.
3295 */
3296 static int verify_local_ip_allocation(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, uint32_t pnn, struct ctdb_node_map *nodemap)
3297 {
3298         TALLOC_CTX *mem_ctx = talloc_new(NULL);
3299         struct ctdb_uptime *uptime1 = NULL;
3300         struct ctdb_uptime *uptime2 = NULL;
3301         int ret, j;
3302         bool need_takeover_run = false;
3303
3304         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
3305                                 CTDB_CURRENT_NODE, &uptime1);
3306         if (ret != 0) {
3307                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
3308                 talloc_free(mem_ctx);
3309                 return -1;
3310         }
3311
3312         if (interfaces_have_changed(ctdb, rec)) {
3313                 DEBUG(DEBUG_NOTICE, ("The interfaces status has changed on "
3314                                      "local node %u - force takeover run\n",
3315                                      pnn));
3316                 need_takeover_run = true;
3317         }
3318
3319         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
3320                                 CTDB_CURRENT_NODE, &uptime2);
3321         if (ret != 0) {
3322                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
3323                 talloc_free(mem_ctx);
3324                 return -1;
3325         }
3326
3327         /* skip the check if the startrecovery time has changed */
3328         if (timeval_compare(&uptime1->last_recovery_started,
3329                             &uptime2->last_recovery_started) != 0) {
3330                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
3331                 talloc_free(mem_ctx);
3332                 return 0;
3333         }
3334
3335         /* skip the check if the endrecovery time has changed */
3336         if (timeval_compare(&uptime1->last_recovery_finished,
3337                             &uptime2->last_recovery_finished) != 0) {
3338                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
3339                 talloc_free(mem_ctx);
3340                 return 0;
3341         }
3342
3343         /* skip the check if we have started but not finished recovery */
3344         if (timeval_compare(&uptime1->last_recovery_finished,
3345                             &uptime1->last_recovery_started) != 1) {
3346                 DEBUG(DEBUG_INFO, (__location__ " in the middle of recovery or ip reallocation. skipping public ip address check\n"));
3347                 talloc_free(mem_ctx);
3348
3349                 return 0;
3350         }
3351
3352         /* verify that we have the ip addresses we should have
3353            and we dont have ones we shouldnt have.
3354            if we find an inconsistency we set recmode to
3355            active on the local node and wait for the recmaster
3356            to do a full blown recovery.
3357            also if the pnn is -1 and we are healthy and can host the ip
3358            we also request a ip reallocation.
3359         */
3360         if (ctdb->tunable.disable_ip_failover == 0) {
3361                 struct ctdb_all_public_ips *ips = NULL;
3362
3363                 /* read the *available* IPs from the local node */
3364                 ret = ctdb_ctrl_get_public_ips_flags(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, CTDB_PUBLIC_IP_FLAGS_ONLY_AVAILABLE, &ips);
3365                 if (ret != 0) {
3366                         DEBUG(DEBUG_ERR, ("Unable to get available public IPs from local node %u\n", pnn));
3367                         talloc_free(mem_ctx);
3368                         return -1;
3369                 }
3370
3371                 for (j=0; j<ips->num; j++) {
3372                         if (ips->ips[j].pnn == -1 &&
3373                             nodemap->nodes[pnn].flags == 0) {
3374                                 DEBUG(DEBUG_CRIT,("Public IP '%s' is not assigned and we could serve it\n",
3375                                                   ctdb_addr_to_str(&ips->ips[j].addr)));
3376                                 need_takeover_run = true;
3377                         }
3378                 }
3379
3380                 talloc_free(ips);
3381
3382                 /* read the *known* IPs from the local node */
3383                 ret = ctdb_ctrl_get_public_ips_flags(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, 0, &ips);
3384                 if (ret != 0) {
3385                         DEBUG(DEBUG_ERR, ("Unable to get known public IPs from local node %u\n", pnn));
3386                         talloc_free(mem_ctx);
3387                         return -1;
3388                 }
3389
3390                 for (j=0; j<ips->num; j++) {
3391                         if (ips->ips[j].pnn == pnn) {
3392                                 if (ctdb->do_checkpublicip && !ctdb_sys_have_ip(&ips->ips[j].addr)) {
3393                                         DEBUG(DEBUG_CRIT,("Public IP '%s' is assigned to us but not on an interface\n",
3394                                                 ctdb_addr_to_str(&ips->ips[j].addr)));
3395                                         need_takeover_run = true;
3396                                 }
3397                         } else {
3398                                 if (ctdb->do_checkpublicip &&
3399                                     ctdb_sys_have_ip(&ips->ips[j].addr)) {
3400
3401                                         DEBUG(DEBUG_CRIT,("We are still serving a public IP '%s' that we should not be serving. Removing it\n", 
3402                                                 ctdb_addr_to_str(&ips->ips[j].addr)));
3403
3404                                         if (ctdb_ctrl_release_ip(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ips->ips[j]) != 0) {
3405                                                 DEBUG(DEBUG_ERR,("Failed to release local IP address\n"));
3406                                         }
3407                                 }
3408                         }
3409                 }
3410         }
3411
3412         if (need_takeover_run) {
3413                 struct srvid_request rd;
3414                 TDB_DATA data;
3415
3416                 DEBUG(DEBUG_CRIT,("Trigger takeoverrun\n"));
3417
3418                 rd.pnn = ctdb->pnn;
3419                 rd.srvid = 0;
3420                 data.dptr = (uint8_t *)&rd;
3421                 data.dsize = sizeof(rd);
3422
3423                 ret = ctdb_client_send_message(ctdb, rec->recmaster, CTDB_SRVID_TAKEOVER_RUN, data);
3424                 if (ret != 0) {
3425                         DEBUG(DEBUG_ERR,(__location__ " Failed to send ipreallocate to recmaster :%d\n", (int)rec->recmaster));
3426                 }
3427         }
3428         talloc_free(mem_ctx);
3429         return 0;
3430 }
3431
3432
3433 static void async_getnodemap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
3434 {
3435         struct ctdb_node_map **remote_nodemaps = callback_data;
3436
3437         if (node_pnn >= ctdb->num_nodes) {
3438                 DEBUG(DEBUG_ERR,(__location__ " pnn from invalid node\n"));
3439                 return;
3440         }
3441
3442         remote_nodemaps[node_pnn] = (struct ctdb_node_map *)talloc_steal(remote_nodemaps, outdata.dptr);
3443
3444 }
3445
3446 static int get_remote_nodemaps(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx,
3447         struct ctdb_node_map *nodemap,
3448         struct ctdb_node_map **remote_nodemaps)
3449 {
3450         uint32_t *nodes;
3451
3452         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
3453         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_NODEMAP,
3454                                         nodes, 0,
3455                                         CONTROL_TIMEOUT(), false, tdb_null,
3456                                         async_getnodemap_callback,
3457                                         NULL,
3458                                         remote_nodemaps) != 0) {
3459                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull all remote nodemaps\n"));
3460
3461                 return -1;
3462         }
3463
3464         return 0;
3465 }
3466
3467 static int update_recovery_lock_file(struct ctdb_context *ctdb)
3468 {
3469         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3470         const char *reclockfile;
3471
3472         if (ctdb_ctrl_getreclock(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &reclockfile) != 0) {
3473                 DEBUG(DEBUG_ERR,("Failed to read reclock file from daemon\n"));
3474                 talloc_free(tmp_ctx);
3475                 return -1;      
3476         }
3477
3478         if (reclockfile == NULL) {
3479                 if (ctdb->recovery_lock_file != NULL) {
3480                        &nb