server: create recdb.tdb.X in /var/ctdb/state/
[metze/ctdb/wip.git] / server / ctdb_recoverd.c
1 /* 
2    ctdb recovery daemon
3
4    Copyright (C) Ronnie Sahlberg  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/events/events.h"
22 #include "system/filesys.h"
23 #include "system/time.h"
24 #include "system/network.h"
25 #include "system/wait.h"
26 #include "popt.h"
27 #include "cmdline.h"
28 #include "../include/ctdb.h"
29 #include "../include/ctdb_private.h"
30 #include "db_wrap.h"
31 #include "dlinklist.h"
32
33
34 /* list of "ctdb ipreallocate" processes to call back when we have
35    finished the takeover run.
36 */
37 struct ip_reallocate_list {
38         struct ip_reallocate_list *next;
39         struct rd_memdump_reply *rd;
40 };
41
42 struct ctdb_banning_state {
43         uint32_t count;
44         struct timeval last_reported_time;
45 };
46
47 /*
48   private state of recovery daemon
49  */
50 struct ctdb_recoverd {
51         struct ctdb_context *ctdb;
52         uint32_t recmaster;
53         uint32_t num_active;
54         uint32_t num_connected;
55         uint32_t last_culprit_node;
56         struct ctdb_node_map *nodemap;
57         struct timeval priority_time;
58         bool need_takeover_run;
59         bool need_recovery;
60         uint32_t node_flags;
61         struct timed_event *send_election_te;
62         struct timed_event *election_timeout;
63         struct vacuum_info *vacuum_info;
64         TALLOC_CTX *ip_reallocate_ctx;
65         struct ip_reallocate_list *reallocate_callers;
66         TALLOC_CTX *ip_check_disable_ctx;
67 };
68
69 #define CONTROL_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_timeout, 0)
70 #define MONITOR_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_interval, 0)
71
72
73 /*
74   ban a node for a period of time
75  */
76 static void ctdb_ban_node(struct ctdb_recoverd *rec, uint32_t pnn, uint32_t ban_time)
77 {
78         int ret;
79         struct ctdb_context *ctdb = rec->ctdb;
80         struct ctdb_ban_time bantime;
81        
82         DEBUG(DEBUG_NOTICE,("Banning node %u for %u seconds\n", pnn, ban_time));
83
84         if (!ctdb_validate_pnn(ctdb, pnn)) {
85                 DEBUG(DEBUG_ERR,("Bad pnn %u in ctdb_ban_node\n", pnn));
86                 return;
87         }
88
89         bantime.pnn  = pnn;
90         bantime.time = ban_time;
91
92         ret = ctdb_ctrl_set_ban(ctdb, CONTROL_TIMEOUT(), pnn, &bantime);
93         if (ret != 0) {
94                 DEBUG(DEBUG_ERR,(__location__ " Failed to ban node %d\n", pnn));
95                 return;
96         }
97
98 }
99
100 enum monitor_result { MONITOR_OK, MONITOR_RECOVERY_NEEDED, MONITOR_ELECTION_NEEDED, MONITOR_FAILED};
101
102
103 /*
104   run the "recovered" eventscript on all nodes
105  */
106 static int run_recovered_eventscript(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, const char *caller)
107 {
108         TALLOC_CTX *tmp_ctx;
109         uint32_t *nodes;
110
111         tmp_ctx = talloc_new(ctdb);
112         CTDB_NO_MEMORY(ctdb, tmp_ctx);
113
114         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
115         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_END_RECOVERY,
116                                         nodes, 0,
117                                         CONTROL_TIMEOUT(), false, tdb_null,
118                                         NULL, NULL,
119                                         NULL) != 0) {
120                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event when called from %s\n", caller));
121
122                 talloc_free(tmp_ctx);
123                 return -1;
124         }
125
126         talloc_free(tmp_ctx);
127         return 0;
128 }
129
130 /*
131   remember the trouble maker
132  */
133 static void ctdb_set_culprit_count(struct ctdb_recoverd *rec, uint32_t culprit, uint32_t count)
134 {
135         struct ctdb_context *ctdb = talloc_get_type(rec->ctdb, struct ctdb_context);
136         struct ctdb_banning_state *ban_state;
137
138         if (culprit > ctdb->num_nodes) {
139                 DEBUG(DEBUG_ERR,("Trying to set culprit %d but num_nodes is %d\n", culprit, ctdb->num_nodes));
140                 return;
141         }
142
143         if (ctdb->nodes[culprit]->ban_state == NULL) {
144                 ctdb->nodes[culprit]->ban_state = talloc_zero(ctdb->nodes[culprit], struct ctdb_banning_state);
145                 CTDB_NO_MEMORY_VOID(ctdb, ctdb->nodes[culprit]->ban_state);
146
147                 
148         }
149         ban_state = ctdb->nodes[culprit]->ban_state;
150         if (timeval_elapsed(&ban_state->last_reported_time) > ctdb->tunable.recovery_grace_period) {
151                 /* this was the first time in a long while this node
152                    misbehaved so we will forgive any old transgressions.
153                 */
154                 ban_state->count = 0;
155         }
156
157         ban_state->count += count;
158         ban_state->last_reported_time = timeval_current();
159         rec->last_culprit_node = culprit;
160 }
161
162 /*
163   remember the trouble maker
164  */
165 static void ctdb_set_culprit(struct ctdb_recoverd *rec, uint32_t culprit)
166 {
167         ctdb_set_culprit_count(rec, culprit, 1);
168 }
169
170
171 /* this callback is called for every node that failed to execute the
172    start recovery event
173 */
174 static void startrecovery_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
175 {
176         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
177
178         DEBUG(DEBUG_ERR, (__location__ " Node %u failed the startrecovery event. Setting it as recovery fail culprit\n", node_pnn));
179
180         ctdb_set_culprit(rec, node_pnn);
181 }
182
183 /*
184   run the "startrecovery" eventscript on all nodes
185  */
186 static int run_startrecovery_eventscript(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
187 {
188         TALLOC_CTX *tmp_ctx;
189         uint32_t *nodes;
190         struct ctdb_context *ctdb = rec->ctdb;
191
192         tmp_ctx = talloc_new(ctdb);
193         CTDB_NO_MEMORY(ctdb, tmp_ctx);
194
195         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
196         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_START_RECOVERY,
197                                         nodes, 0,
198                                         CONTROL_TIMEOUT(), false, tdb_null,
199                                         NULL,
200                                         startrecovery_fail_callback,
201                                         rec) != 0) {
202                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event. Recovery failed.\n"));
203                 talloc_free(tmp_ctx);
204                 return -1;
205         }
206
207         talloc_free(tmp_ctx);
208         return 0;
209 }
210
211 static void async_getcap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
212 {
213         if ( (outdata.dsize != sizeof(uint32_t)) || (outdata.dptr == NULL) ) {
214                 DEBUG(DEBUG_ERR, (__location__ " Invalid lenght/pointer for getcap callback : %u %p\n",  (unsigned)outdata.dsize, outdata.dptr));
215                 return;
216         }
217         if (node_pnn < ctdb->num_nodes) {
218                 ctdb->nodes[node_pnn]->capabilities = *((uint32_t *)outdata.dptr);
219         }
220 }
221
222 /*
223   update the node capabilities for all connected nodes
224  */
225 static int update_capabilities(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
226 {
227         uint32_t *nodes;
228         TALLOC_CTX *tmp_ctx;
229
230         tmp_ctx = talloc_new(ctdb);
231         CTDB_NO_MEMORY(ctdb, tmp_ctx);
232
233         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
234         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_CAPABILITIES,
235                                         nodes, 0,
236                                         CONTROL_TIMEOUT(),
237                                         false, tdb_null,
238                                         async_getcap_callback, NULL,
239                                         NULL) != 0) {
240                 DEBUG(DEBUG_ERR, (__location__ " Failed to read node capabilities.\n"));
241                 talloc_free(tmp_ctx);
242                 return -1;
243         }
244
245         talloc_free(tmp_ctx);
246         return 0;
247 }
248
249 static void set_recmode_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
250 {
251         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
252
253         DEBUG(DEBUG_ERR,("Failed to freeze node %u during recovery. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
254         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
255 }
256
257 static void transaction_start_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
258 {
259         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
260
261         DEBUG(DEBUG_ERR,("Failed to start recovery transaction on node %u. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
262         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
263 }
264
265 /*
266   change recovery mode on all nodes
267  */
268 static int set_recovery_mode(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t rec_mode)
269 {
270         TDB_DATA data;
271         uint32_t *nodes;
272         TALLOC_CTX *tmp_ctx;
273
274         tmp_ctx = talloc_new(ctdb);
275         CTDB_NO_MEMORY(ctdb, tmp_ctx);
276
277         /* freeze all nodes */
278         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
279         if (rec_mode == CTDB_RECOVERY_ACTIVE) {
280                 int i;
281
282                 for (i=1; i<=NUM_DB_PRIORITIES; i++) {
283                         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
284                                                 nodes, i,
285                                                 CONTROL_TIMEOUT(),
286                                                 false, tdb_null,
287                                                 NULL,
288                                                 set_recmode_fail_callback,
289                                                 rec) != 0) {
290                                 DEBUG(DEBUG_ERR, (__location__ " Unable to freeze nodes. Recovery failed.\n"));
291                                 talloc_free(tmp_ctx);
292                                 return -1;
293                         }
294                 }
295         }
296
297
298         data.dsize = sizeof(uint32_t);
299         data.dptr = (unsigned char *)&rec_mode;
300
301         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMODE,
302                                         nodes, 0,
303                                         CONTROL_TIMEOUT(),
304                                         false, data,
305                                         NULL, NULL,
306                                         NULL) != 0) {
307                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode. Recovery failed.\n"));
308                 talloc_free(tmp_ctx);
309                 return -1;
310         }
311
312         talloc_free(tmp_ctx);
313         return 0;
314 }
315
316 /*
317   change recovery master on all node
318  */
319 static int set_recovery_master(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn)
320 {
321         TDB_DATA data;
322         TALLOC_CTX *tmp_ctx;
323         uint32_t *nodes;
324
325         tmp_ctx = talloc_new(ctdb);
326         CTDB_NO_MEMORY(ctdb, tmp_ctx);
327
328         data.dsize = sizeof(uint32_t);
329         data.dptr = (unsigned char *)&pnn;
330
331         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
332         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMASTER,
333                                         nodes, 0,
334                                         CONTROL_TIMEOUT(), false, data,
335                                         NULL, NULL,
336                                         NULL) != 0) {
337                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recmaster. Recovery failed.\n"));
338                 talloc_free(tmp_ctx);
339                 return -1;
340         }
341
342         talloc_free(tmp_ctx);
343         return 0;
344 }
345
346 /* update all remote nodes to use the same db priority that we have
347    this can fail if the remove node has not yet been upgraded to 
348    support this function, so we always return success and never fail
349    a recovery if this call fails.
350 */
351 static int update_db_priority_on_remote_nodes(struct ctdb_context *ctdb,
352         struct ctdb_node_map *nodemap, 
353         uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
354 {
355         int db;
356         uint32_t *nodes;
357
358         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
359
360         /* step through all local databases */
361         for (db=0; db<dbmap->num;db++) {
362                 TDB_DATA data;
363                 struct ctdb_db_priority db_prio;
364                 int ret;
365
366                 db_prio.db_id     = dbmap->dbs[db].dbid;
367                 ret = ctdb_ctrl_get_db_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, dbmap->dbs[db].dbid, &db_prio.priority);
368                 if (ret != 0) {
369                         DEBUG(DEBUG_ERR,(__location__ " Failed to read database priority from local node for db 0x%08x\n", dbmap->dbs[db].dbid));
370                         continue;
371                 }
372
373                 DEBUG(DEBUG_INFO,("Update DB priority for db 0x%08x to %u\n", dbmap->dbs[db].dbid, db_prio.priority)); 
374
375                 data.dptr  = (uint8_t *)&db_prio;
376                 data.dsize = sizeof(db_prio);
377
378                 if (ctdb_client_async_control(ctdb,
379                                         CTDB_CONTROL_SET_DB_PRIORITY,
380                                         nodes, 0,
381                                         CONTROL_TIMEOUT(), false, data,
382                                         NULL, NULL,
383                                         NULL) != 0) {
384                         DEBUG(DEBUG_ERR,(__location__ " Failed to set DB priority for 0x%08x\n", db_prio.db_id));
385                 }
386         }
387
388         return 0;
389 }                       
390
391 /*
392   ensure all other nodes have attached to any databases that we have
393  */
394 static int create_missing_remote_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
395                                            uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
396 {
397         int i, j, db, ret;
398         struct ctdb_dbid_map *remote_dbmap;
399
400         /* verify that all other nodes have all our databases */
401         for (j=0; j<nodemap->num; j++) {
402                 /* we dont need to ourself ourselves */
403                 if (nodemap->nodes[j].pnn == pnn) {
404                         continue;
405                 }
406                 /* dont check nodes that are unavailable */
407                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
408                         continue;
409                 }
410
411                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
412                                          mem_ctx, &remote_dbmap);
413                 if (ret != 0) {
414                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
415                         return -1;
416                 }
417
418                 /* step through all local databases */
419                 for (db=0; db<dbmap->num;db++) {
420                         const char *name;
421
422
423                         for (i=0;i<remote_dbmap->num;i++) {
424                                 if (dbmap->dbs[db].dbid == remote_dbmap->dbs[i].dbid) {
425                                         break;
426                                 }
427                         }
428                         /* the remote node already have this database */
429                         if (i!=remote_dbmap->num) {
430                                 continue;
431                         }
432                         /* ok so we need to create this database */
433                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), pnn, dbmap->dbs[db].dbid, 
434                                             mem_ctx, &name);
435                         if (ret != 0) {
436                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", pnn));
437                                 return -1;
438                         }
439                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
440                                            mem_ctx, name, dbmap->dbs[db].persistent);
441                         if (ret != 0) {
442                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create remote db:%s\n", name));
443                                 return -1;
444                         }
445                 }
446         }
447
448         return 0;
449 }
450
451
452 /*
453   ensure we are attached to any databases that anyone else is attached to
454  */
455 static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
456                                           uint32_t pnn, struct ctdb_dbid_map **dbmap, TALLOC_CTX *mem_ctx)
457 {
458         int i, j, db, ret;
459         struct ctdb_dbid_map *remote_dbmap;
460
461         /* verify that we have all database any other node has */
462         for (j=0; j<nodemap->num; j++) {
463                 /* we dont need to ourself ourselves */
464                 if (nodemap->nodes[j].pnn == pnn) {
465                         continue;
466                 }
467                 /* dont check nodes that are unavailable */
468                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
469                         continue;
470                 }
471
472                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
473                                          mem_ctx, &remote_dbmap);
474                 if (ret != 0) {
475                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
476                         return -1;
477                 }
478
479                 /* step through all databases on the remote node */
480                 for (db=0; db<remote_dbmap->num;db++) {
481                         const char *name;
482
483                         for (i=0;i<(*dbmap)->num;i++) {
484                                 if (remote_dbmap->dbs[db].dbid == (*dbmap)->dbs[i].dbid) {
485                                         break;
486                                 }
487                         }
488                         /* we already have this db locally */
489                         if (i!=(*dbmap)->num) {
490                                 continue;
491                         }
492                         /* ok so we need to create this database and
493                            rebuild dbmap
494                          */
495                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
496                                             remote_dbmap->dbs[db].dbid, mem_ctx, &name);
497                         if (ret != 0) {
498                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", 
499                                           nodemap->nodes[j].pnn));
500                                 return -1;
501                         }
502                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, name, 
503                                            remote_dbmap->dbs[db].persistent);
504                         if (ret != 0) {
505                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create local db:%s\n", name));
506                                 return -1;
507                         }
508                         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, dbmap);
509                         if (ret != 0) {
510                                 DEBUG(DEBUG_ERR, (__location__ " Unable to reread dbmap on node %u\n", pnn));
511                                 return -1;
512                         }
513                 }
514         }
515
516         return 0;
517 }
518
519
520 /*
521   pull the remote database contents from one node into the recdb
522  */
523 static int pull_one_remote_database(struct ctdb_context *ctdb, uint32_t srcnode, 
524                                     struct tdb_wrap *recdb, uint32_t dbid,
525                                     bool persistent)
526 {
527         int ret;
528         TDB_DATA outdata;
529         struct ctdb_marshall_buffer *reply;
530         struct ctdb_rec_data *rec;
531         int i;
532         TALLOC_CTX *tmp_ctx = talloc_new(recdb);
533
534         ret = ctdb_ctrl_pulldb(ctdb, srcnode, dbid, CTDB_LMASTER_ANY, tmp_ctx,
535                                CONTROL_TIMEOUT(), &outdata);
536         if (ret != 0) {
537                 DEBUG(DEBUG_ERR,(__location__ " Unable to copy db from node %u\n", srcnode));
538                 talloc_free(tmp_ctx);
539                 return -1;
540         }
541
542         reply = (struct ctdb_marshall_buffer *)outdata.dptr;
543
544         if (outdata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
545                 DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
546                 talloc_free(tmp_ctx);
547                 return -1;
548         }
549         
550         rec = (struct ctdb_rec_data *)&reply->data[0];
551         
552         for (i=0;
553              i<reply->count;
554              rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec), i++) {
555                 TDB_DATA key, data;
556                 struct ctdb_ltdb_header *hdr;
557                 TDB_DATA existing;
558                 
559                 key.dptr = &rec->data[0];
560                 key.dsize = rec->keylen;
561                 data.dptr = &rec->data[key.dsize];
562                 data.dsize = rec->datalen;
563                 
564                 hdr = (struct ctdb_ltdb_header *)data.dptr;
565
566                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
567                         DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
568                         talloc_free(tmp_ctx);
569                         return -1;
570                 }
571
572                 /* fetch the existing record, if any */
573                 existing = tdb_fetch(recdb->tdb, key);
574                 
575                 if (existing.dptr != NULL) {
576                         struct ctdb_ltdb_header header;
577                         if (existing.dsize < sizeof(struct ctdb_ltdb_header)) {
578                                 DEBUG(DEBUG_CRIT,(__location__ " Bad record size %u from node %u\n", 
579                                          (unsigned)existing.dsize, srcnode));
580                                 free(existing.dptr);
581                                 talloc_free(tmp_ctx);
582                                 return -1;
583                         }
584                         header = *(struct ctdb_ltdb_header *)existing.dptr;
585                         free(existing.dptr);
586                         if (!(header.rsn < hdr->rsn ||
587                               (header.dmaster != ctdb->recovery_master && header.rsn == hdr->rsn))) {
588                                 continue;
589                         }
590                 }
591                 
592                 if (tdb_store(recdb->tdb, key, data, TDB_REPLACE) != 0) {
593                         DEBUG(DEBUG_CRIT,(__location__ " Failed to store record\n"));
594                         talloc_free(tmp_ctx);
595                         return -1;                              
596                 }
597         }
598
599         talloc_free(tmp_ctx);
600
601         return 0;
602 }
603
604 /*
605   pull all the remote database contents into the recdb
606  */
607 static int pull_remote_database(struct ctdb_context *ctdb,
608                                 struct ctdb_recoverd *rec, 
609                                 struct ctdb_node_map *nodemap, 
610                                 struct tdb_wrap *recdb, uint32_t dbid,
611                                 bool persistent)
612 {
613         int j;
614
615         /* pull all records from all other nodes across onto this node
616            (this merges based on rsn)
617         */
618         for (j=0; j<nodemap->num; j++) {
619                 /* dont merge from nodes that are unavailable */
620                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
621                         continue;
622                 }
623                 if (pull_one_remote_database(ctdb, nodemap->nodes[j].pnn, recdb, dbid, persistent) != 0) {
624                         DEBUG(DEBUG_ERR,(__location__ " Failed to pull remote database from node %u\n", 
625                                  nodemap->nodes[j].pnn));
626                         ctdb_set_culprit_count(rec, nodemap->nodes[j].pnn, nodemap->num);
627                         return -1;
628                 }
629         }
630         
631         return 0;
632 }
633
634
635 /*
636   update flags on all active nodes
637  */
638 static int update_flags_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn, uint32_t flags)
639 {
640         int ret;
641
642         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, flags, ~flags);
643                 if (ret != 0) {
644                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
645                 return -1;
646         }
647
648         return 0;
649 }
650
651 /*
652   ensure all nodes have the same vnnmap we do
653  */
654 static int update_vnnmap_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
655                                       uint32_t pnn, struct ctdb_vnn_map *vnnmap, TALLOC_CTX *mem_ctx)
656 {
657         int j, ret;
658
659         /* push the new vnn map out to all the nodes */
660         for (j=0; j<nodemap->num; j++) {
661                 /* dont push to nodes that are unavailable */
662                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
663                         continue;
664                 }
665
666                 ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, mem_ctx, vnnmap);
667                 if (ret != 0) {
668                         DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
669                         return -1;
670                 }
671         }
672
673         return 0;
674 }
675
676
677 struct vacuum_info {
678         struct vacuum_info *next, *prev;
679         struct ctdb_recoverd *rec;
680         uint32_t srcnode;
681         struct ctdb_db_context *ctdb_db;
682         struct ctdb_marshall_buffer *recs;
683         struct ctdb_rec_data *r;
684 };
685
686 static void vacuum_fetch_next(struct vacuum_info *v);
687
688 /*
689   called when a vacuum fetch has completed - just free it and do the next one
690  */
691 static void vacuum_fetch_callback(struct ctdb_client_call_state *state)
692 {
693         struct vacuum_info *v = talloc_get_type(state->async.private_data, struct vacuum_info);
694         talloc_free(state);
695         vacuum_fetch_next(v);
696 }
697
698
699 /*
700   process the next element from the vacuum list
701 */
702 static void vacuum_fetch_next(struct vacuum_info *v)
703 {
704         struct ctdb_call call;
705         struct ctdb_rec_data *r;
706
707         while (v->recs->count) {
708                 struct ctdb_client_call_state *state;
709                 TDB_DATA data;
710                 struct ctdb_ltdb_header *hdr;
711
712                 ZERO_STRUCT(call);
713                 call.call_id = CTDB_NULL_FUNC;
714                 call.flags = CTDB_IMMEDIATE_MIGRATION;
715
716                 r = v->r;
717                 v->r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
718                 v->recs->count--;
719
720                 call.key.dptr = &r->data[0];
721                 call.key.dsize = r->keylen;
722
723                 /* ensure we don't block this daemon - just skip a record if we can't get
724                    the chainlock */
725                 if (tdb_chainlock_nonblock(v->ctdb_db->ltdb->tdb, call.key) != 0) {
726                         continue;
727                 }
728
729                 data = tdb_fetch(v->ctdb_db->ltdb->tdb, call.key);
730                 if (data.dptr == NULL) {
731                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
732                         continue;
733                 }
734
735                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
736                         free(data.dptr);
737                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
738                         continue;
739                 }
740                 
741                 hdr = (struct ctdb_ltdb_header *)data.dptr;
742                 if (hdr->dmaster == v->rec->ctdb->pnn) {
743                         /* its already local */
744                         free(data.dptr);
745                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
746                         continue;
747                 }
748
749                 free(data.dptr);
750
751                 state = ctdb_call_send(v->ctdb_db, &call);
752                 tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
753                 if (state == NULL) {
754                         DEBUG(DEBUG_ERR,(__location__ " Failed to setup vacuum fetch call\n"));
755                         talloc_free(v);
756                         return;
757                 }
758                 state->async.fn = vacuum_fetch_callback;
759                 state->async.private_data = v;
760                 return;
761         }
762
763         talloc_free(v);
764 }
765
766
767 /*
768   destroy a vacuum info structure
769  */
770 static int vacuum_info_destructor(struct vacuum_info *v)
771 {
772         DLIST_REMOVE(v->rec->vacuum_info, v);
773         return 0;
774 }
775
776
777 /*
778   handler for vacuum fetch
779 */
780 static void vacuum_fetch_handler(struct ctdb_context *ctdb, uint64_t srvid, 
781                                  TDB_DATA data, void *private_data)
782 {
783         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
784         struct ctdb_marshall_buffer *recs;
785         int ret, i;
786         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
787         const char *name;
788         struct ctdb_dbid_map *dbmap=NULL;
789         bool persistent = false;
790         struct ctdb_db_context *ctdb_db;
791         struct ctdb_rec_data *r;
792         uint32_t srcnode;
793         struct vacuum_info *v;
794
795         recs = (struct ctdb_marshall_buffer *)data.dptr;
796         r = (struct ctdb_rec_data *)&recs->data[0];
797
798         if (recs->count == 0) {
799                 talloc_free(tmp_ctx);
800                 return;
801         }
802
803         srcnode = r->reqid;
804
805         for (v=rec->vacuum_info;v;v=v->next) {
806                 if (srcnode == v->srcnode && recs->db_id == v->ctdb_db->db_id) {
807                         /* we're already working on records from this node */
808                         talloc_free(tmp_ctx);
809                         return;
810                 }
811         }
812
813         /* work out if the database is persistent */
814         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &dbmap);
815         if (ret != 0) {
816                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from local node\n"));
817                 talloc_free(tmp_ctx);
818                 return;
819         }
820
821         for (i=0;i<dbmap->num;i++) {
822                 if (dbmap->dbs[i].dbid == recs->db_id) {
823                         persistent = dbmap->dbs[i].persistent;
824                         break;
825                 }
826         }
827         if (i == dbmap->num) {
828                 DEBUG(DEBUG_ERR, (__location__ " Unable to find db_id 0x%x on local node\n", recs->db_id));
829                 talloc_free(tmp_ctx);
830                 return;         
831         }
832
833         /* find the name of this database */
834         if (ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, recs->db_id, tmp_ctx, &name) != 0) {
835                 DEBUG(DEBUG_ERR,(__location__ " Failed to get name of db 0x%x\n", recs->db_id));
836                 talloc_free(tmp_ctx);
837                 return;
838         }
839
840         /* attach to it */
841         ctdb_db = ctdb_attach(ctdb, name, persistent, 0);
842         if (ctdb_db == NULL) {
843                 DEBUG(DEBUG_ERR,(__location__ " Failed to attach to database '%s'\n", name));
844                 talloc_free(tmp_ctx);
845                 return;
846         }
847
848         v = talloc_zero(rec, struct vacuum_info);
849         if (v == NULL) {
850                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
851                 talloc_free(tmp_ctx);
852                 return;
853         }
854
855         v->rec = rec;
856         v->srcnode = srcnode;
857         v->ctdb_db = ctdb_db;
858         v->recs = talloc_memdup(v, recs, data.dsize);
859         if (v->recs == NULL) {
860                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
861                 talloc_free(v);
862                 talloc_free(tmp_ctx);
863                 return;         
864         }
865         v->r =  (struct ctdb_rec_data *)&v->recs->data[0];
866
867         DLIST_ADD(rec->vacuum_info, v);
868
869         talloc_set_destructor(v, vacuum_info_destructor);
870
871         vacuum_fetch_next(v);
872         talloc_free(tmp_ctx);
873 }
874
875
876 /*
877   called when ctdb_wait_timeout should finish
878  */
879 static void ctdb_wait_handler(struct event_context *ev, struct timed_event *te, 
880                               struct timeval yt, void *p)
881 {
882         uint32_t *timed_out = (uint32_t *)p;
883         (*timed_out) = 1;
884 }
885
886 /*
887   wait for a given number of seconds
888  */
889 static void ctdb_wait_timeout(struct ctdb_context *ctdb, uint32_t secs)
890 {
891         uint32_t timed_out = 0;
892         event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(secs, 0), ctdb_wait_handler, &timed_out);
893         while (!timed_out) {
894                 event_loop_once(ctdb->ev);
895         }
896 }
897
898 /*
899   called when an election times out (ends)
900  */
901 static void ctdb_election_timeout(struct event_context *ev, struct timed_event *te, 
902                                   struct timeval t, void *p)
903 {
904         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
905         rec->election_timeout = NULL;
906
907         DEBUG(DEBUG_WARNING,(__location__ " Election timed out\n"));
908 }
909
910
911 /*
912   wait for an election to finish. It finished election_timeout seconds after
913   the last election packet is received
914  */
915 static void ctdb_wait_election(struct ctdb_recoverd *rec)
916 {
917         struct ctdb_context *ctdb = rec->ctdb;
918         while (rec->election_timeout) {
919                 event_loop_once(ctdb->ev);
920         }
921 }
922
923 /*
924   Update our local flags from all remote connected nodes. 
925   This is only run when we are or we belive we are the recovery master
926  */
927 static int update_local_flags(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
928 {
929         int j;
930         struct ctdb_context *ctdb = rec->ctdb;
931         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
932
933         /* get the nodemap for all active remote nodes and verify
934            they are the same as for this node
935          */
936         for (j=0; j<nodemap->num; j++) {
937                 struct ctdb_node_map *remote_nodemap=NULL;
938                 int ret;
939
940                 if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
941                         continue;
942                 }
943                 if (nodemap->nodes[j].pnn == ctdb->pnn) {
944                         continue;
945                 }
946
947                 ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
948                                            mem_ctx, &remote_nodemap);
949                 if (ret != 0) {
950                         DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from remote node %u\n", 
951                                   nodemap->nodes[j].pnn));
952                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
953                         talloc_free(mem_ctx);
954                         return MONITOR_FAILED;
955                 }
956                 if (nodemap->nodes[j].flags != remote_nodemap->nodes[j].flags) {
957                         /* We should tell our daemon about this so it
958                            updates its flags or else we will log the same 
959                            message again in the next iteration of recovery.
960                            Since we are the recovery master we can just as
961                            well update the flags on all nodes.
962                         */
963                         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, nodemap->nodes[j].flags, ~nodemap->nodes[j].flags);
964                         if (ret != 0) {
965                                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
966                                 return -1;
967                         }
968
969                         /* Update our local copy of the flags in the recovery
970                            daemon.
971                         */
972                         DEBUG(DEBUG_NOTICE,("Remote node %u had flags 0x%x, local had 0x%x - updating local\n",
973                                  nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags,
974                                  nodemap->nodes[j].flags));
975                         nodemap->nodes[j].flags = remote_nodemap->nodes[j].flags;
976                 }
977                 talloc_free(remote_nodemap);
978         }
979         talloc_free(mem_ctx);
980         return MONITOR_OK;
981 }
982
983
984 /* Create a new random generation ip. 
985    The generation id can not be the INVALID_GENERATION id
986 */
987 static uint32_t new_generation(void)
988 {
989         uint32_t generation;
990
991         while (1) {
992                 generation = random();
993
994                 if (generation != INVALID_GENERATION) {
995                         break;
996                 }
997         }
998
999         return generation;
1000 }
1001
1002
1003 /*
1004   create a temporary working database
1005  */
1006 static struct tdb_wrap *create_recdb(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx)
1007 {
1008         char *name;
1009         struct tdb_wrap *recdb;
1010         unsigned tdb_flags;
1011
1012         /* open up the temporary recovery database */
1013         name = talloc_asprintf(mem_ctx, "%s/recdb.tdb.%u",
1014                                ctdb->db_directory_state,
1015                                ctdb->pnn);
1016         if (name == NULL) {
1017                 return NULL;
1018         }
1019         unlink(name);
1020
1021         tdb_flags = TDB_NOLOCK;
1022         if (!ctdb->do_setsched) {
1023                 tdb_flags |= TDB_NOMMAP;
1024         }
1025         tdb_flags |= TDB_DISALLOW_NESTING;
1026
1027         recdb = tdb_wrap_open(mem_ctx, name, ctdb->tunable.database_hash_size, 
1028                               tdb_flags, O_RDWR|O_CREAT|O_EXCL, 0600);
1029         if (recdb == NULL) {
1030                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create temp recovery database '%s'\n", name));
1031         }
1032
1033         talloc_free(name);
1034
1035         return recdb;
1036 }
1037
1038
1039 /* 
1040    a traverse function for pulling all relevent records from recdb
1041  */
1042 struct recdb_data {
1043         struct ctdb_context *ctdb;
1044         struct ctdb_marshall_buffer *recdata;
1045         uint32_t len;
1046         bool failed;
1047         bool persistent;
1048 };
1049
1050 static int traverse_recdb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
1051 {
1052         struct recdb_data *params = (struct recdb_data *)p;
1053         struct ctdb_rec_data *rec;
1054         struct ctdb_ltdb_header *hdr;
1055
1056         /* skip empty records */
1057         if (data.dsize <= sizeof(struct ctdb_ltdb_header)) {
1058                 return 0;
1059         }
1060
1061         /* update the dmaster field to point to us */
1062         hdr = (struct ctdb_ltdb_header *)data.dptr;
1063         if (!params->persistent) {
1064                 hdr->dmaster = params->ctdb->pnn;
1065         }
1066
1067         /* add the record to the blob ready to send to the nodes */
1068         rec = ctdb_marshall_record(params->recdata, 0, key, NULL, data);
1069         if (rec == NULL) {
1070                 params->failed = true;
1071                 return -1;
1072         }
1073         params->recdata = talloc_realloc_size(NULL, params->recdata, rec->length + params->len);
1074         if (params->recdata == NULL) {
1075                 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata to %u (%u records)\n", 
1076                          rec->length + params->len, params->recdata->count));
1077                 params->failed = true;
1078                 return -1;
1079         }
1080         params->recdata->count++;
1081         memcpy(params->len+(uint8_t *)params->recdata, rec, rec->length);
1082         params->len += rec->length;
1083         talloc_free(rec);
1084
1085         return 0;
1086 }
1087
1088 /*
1089   push the recdb database out to all nodes
1090  */
1091 static int push_recdb_database(struct ctdb_context *ctdb, uint32_t dbid,
1092                                bool persistent,
1093                                struct tdb_wrap *recdb, struct ctdb_node_map *nodemap)
1094 {
1095         struct recdb_data params;
1096         struct ctdb_marshall_buffer *recdata;
1097         TDB_DATA outdata;
1098         TALLOC_CTX *tmp_ctx;
1099         uint32_t *nodes;
1100
1101         tmp_ctx = talloc_new(ctdb);
1102         CTDB_NO_MEMORY(ctdb, tmp_ctx);
1103
1104         recdata = talloc_zero(recdb, struct ctdb_marshall_buffer);
1105         CTDB_NO_MEMORY(ctdb, recdata);
1106
1107         recdata->db_id = dbid;
1108
1109         params.ctdb = ctdb;
1110         params.recdata = recdata;
1111         params.len = offsetof(struct ctdb_marshall_buffer, data);
1112         params.failed = false;
1113         params.persistent = persistent;
1114
1115         if (tdb_traverse_read(recdb->tdb, traverse_recdb, &params) == -1) {
1116                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1117                 talloc_free(params.recdata);
1118                 talloc_free(tmp_ctx);
1119                 return -1;
1120         }
1121
1122         if (params.failed) {
1123                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1124                 talloc_free(params.recdata);
1125                 talloc_free(tmp_ctx);
1126                 return -1;              
1127         }
1128
1129         recdata = params.recdata;
1130
1131         outdata.dptr = (void *)recdata;
1132         outdata.dsize = params.len;
1133
1134         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
1135         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_PUSH_DB,
1136                                         nodes, 0,
1137                                         CONTROL_TIMEOUT(), false, outdata,
1138                                         NULL, NULL,
1139                                         NULL) != 0) {
1140                 DEBUG(DEBUG_ERR,(__location__ " Failed to push recdb records to nodes for db 0x%x\n", dbid));
1141                 talloc_free(recdata);
1142                 talloc_free(tmp_ctx);
1143                 return -1;
1144         }
1145
1146         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pushed remote database 0x%x of size %u\n", 
1147                   dbid, recdata->count));
1148
1149         talloc_free(recdata);
1150         talloc_free(tmp_ctx);
1151
1152         return 0;
1153 }
1154
1155
1156 /*
1157   go through a full recovery on one database 
1158  */
1159 static int recover_database(struct ctdb_recoverd *rec, 
1160                             TALLOC_CTX *mem_ctx,
1161                             uint32_t dbid,
1162                             bool persistent,
1163                             uint32_t pnn, 
1164                             struct ctdb_node_map *nodemap,
1165                             uint32_t transaction_id)
1166 {
1167         struct tdb_wrap *recdb;
1168         int ret;
1169         struct ctdb_context *ctdb = rec->ctdb;
1170         TDB_DATA data;
1171         struct ctdb_control_wipe_database w;
1172         uint32_t *nodes;
1173
1174         recdb = create_recdb(ctdb, mem_ctx);
1175         if (recdb == NULL) {
1176                 return -1;
1177         }
1178
1179         /* pull all remote databases onto the recdb */
1180         ret = pull_remote_database(ctdb, rec, nodemap, recdb, dbid, persistent);
1181         if (ret != 0) {
1182                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull remote database 0x%x\n", dbid));
1183                 return -1;
1184         }
1185
1186         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pulled remote database 0x%x\n", dbid));
1187
1188         /* wipe all the remote databases. This is safe as we are in a transaction */
1189         w.db_id = dbid;
1190         w.transaction_id = transaction_id;
1191
1192         data.dptr = (void *)&w;
1193         data.dsize = sizeof(w);
1194
1195         nodes = list_of_active_nodes(ctdb, nodemap, recdb, true);
1196         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
1197                                         nodes, 0,
1198                                         CONTROL_TIMEOUT(), false, data,
1199                                         NULL, NULL,
1200                                         NULL) != 0) {
1201                 DEBUG(DEBUG_ERR, (__location__ " Unable to wipe database. Recovery failed.\n"));
1202                 talloc_free(recdb);
1203                 return -1;
1204         }
1205         
1206         /* push out the correct database. This sets the dmaster and skips 
1207            the empty records */
1208         ret = push_recdb_database(ctdb, dbid, persistent, recdb, nodemap);
1209         if (ret != 0) {
1210                 talloc_free(recdb);
1211                 return -1;
1212         }
1213
1214         /* all done with this database */
1215         talloc_free(recdb);
1216
1217         return 0;
1218 }
1219
1220 /*
1221   reload the nodes file 
1222 */
1223 static void reload_nodes_file(struct ctdb_context *ctdb)
1224 {
1225         ctdb->nodes = NULL;
1226         ctdb_load_nodes_file(ctdb);
1227 }
1228
1229         
1230 /*
1231   we are the recmaster, and recovery is needed - start a recovery run
1232  */
1233 static int do_recovery(struct ctdb_recoverd *rec, 
1234                        TALLOC_CTX *mem_ctx, uint32_t pnn,
1235                        struct ctdb_node_map *nodemap, struct ctdb_vnn_map *vnnmap)
1236 {
1237         struct ctdb_context *ctdb = rec->ctdb;
1238         int i, j, ret;
1239         uint32_t generation;
1240         struct ctdb_dbid_map *dbmap;
1241         TDB_DATA data;
1242         uint32_t *nodes;
1243         struct timeval start_time;
1244
1245         DEBUG(DEBUG_NOTICE, (__location__ " Starting do_recovery\n"));
1246
1247         /* if recovery fails, force it again */
1248         rec->need_recovery = true;
1249
1250         for (i=0; i<ctdb->num_nodes; i++) {
1251                 struct ctdb_banning_state *ban_state;
1252
1253                 if (ctdb->nodes[i]->ban_state == NULL) {
1254                         continue;
1255                 }
1256                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
1257                 if (ban_state->count < 2*ctdb->num_nodes) {
1258                         continue;
1259                 }
1260                 DEBUG(DEBUG_NOTICE,("Node %u has caused %u recoveries recently - banning it for %u seconds\n",
1261                         ctdb->nodes[i]->pnn, ban_state->count,
1262                         ctdb->tunable.recovery_ban_period));
1263                 ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
1264                 ban_state->count = 0;
1265         }
1266
1267
1268         if (ctdb->tunable.verify_recovery_lock != 0) {
1269                 DEBUG(DEBUG_ERR,("Taking out recovery lock from recovery daemon\n"));
1270                 start_time = timeval_current();
1271                 if (!ctdb_recovery_lock(ctdb, true)) {
1272                         ctdb_set_culprit(rec, pnn);
1273                         DEBUG(DEBUG_ERR,("Unable to get recovery lock - aborting recovery\n"));
1274                         return -1;
1275                 }
1276                 ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&start_time));
1277                 DEBUG(DEBUG_ERR,("Recovery lock taken successfully by recovery daemon\n"));
1278         }
1279
1280         DEBUG(DEBUG_NOTICE, (__location__ " Recovery initiated due to problem with node %u\n", rec->last_culprit_node));
1281
1282         /* get a list of all databases */
1283         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &dbmap);
1284         if (ret != 0) {
1285                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node :%u\n", pnn));
1286                 return -1;
1287         }
1288
1289         /* we do the db creation before we set the recovery mode, so the freeze happens
1290            on all databases we will be dealing with. */
1291
1292         /* verify that we have all the databases any other node has */
1293         ret = create_missing_local_databases(ctdb, nodemap, pnn, &dbmap, mem_ctx);
1294         if (ret != 0) {
1295                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing local databases\n"));
1296                 return -1;
1297         }
1298
1299         /* verify that all other nodes have all our databases */
1300         ret = create_missing_remote_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
1301         if (ret != 0) {
1302                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing remote databases\n"));
1303                 return -1;
1304         }
1305         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - created remote databases\n"));
1306
1307         /* update the database priority for all remote databases */
1308         ret = update_db_priority_on_remote_nodes(ctdb, nodemap, pnn, dbmap, mem_ctx);
1309         if (ret != 0) {
1310                 DEBUG(DEBUG_ERR, (__location__ " Unable to set db priority on remote nodes\n"));
1311         }
1312         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated db priority for all databases\n"));
1313
1314
1315         /* set recovery mode to active on all nodes */
1316         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
1317         if (ret != 0) {
1318                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
1319                 return -1;
1320         }
1321
1322         /* execute the "startrecovery" event script on all nodes */
1323         ret = run_startrecovery_eventscript(rec, nodemap);
1324         if (ret!=0) {
1325                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
1326                 return -1;
1327         }
1328
1329         /* pick a new generation number */
1330         generation = new_generation();
1331
1332         /* change the vnnmap on this node to use the new generation 
1333            number but not on any other nodes.
1334            this guarantees that if we abort the recovery prematurely
1335            for some reason (a node stops responding?)
1336            that we can just return immediately and we will reenter
1337            recovery shortly again.
1338            I.e. we deliberately leave the cluster with an inconsistent
1339            generation id to allow us to abort recovery at any stage and
1340            just restart it from scratch.
1341          */
1342         vnnmap->generation = generation;
1343         ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, vnnmap);
1344         if (ret != 0) {
1345                 DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
1346                 return -1;
1347         }
1348
1349         data.dptr = (void *)&generation;
1350         data.dsize = sizeof(uint32_t);
1351
1352         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
1353         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
1354                                         nodes, 0,
1355                                         CONTROL_TIMEOUT(), false, data,
1356                                         NULL,
1357                                         transaction_start_fail_callback,
1358                                         rec) != 0) {
1359                 DEBUG(DEBUG_ERR, (__location__ " Unable to start transactions. Recovery failed.\n"));
1360                 if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_CANCEL,
1361                                         nodes, 0,
1362                                         CONTROL_TIMEOUT(), false, tdb_null,
1363                                         NULL,
1364                                         NULL,
1365                                         NULL) != 0) {
1366                         DEBUG(DEBUG_ERR,("Failed to cancel recovery transaction\n"));
1367                 }
1368                 return -1;
1369         }
1370
1371         DEBUG(DEBUG_NOTICE,(__location__ " started transactions on all nodes\n"));
1372
1373         for (i=0;i<dbmap->num;i++) {
1374                 ret = recover_database(rec, mem_ctx,
1375                                        dbmap->dbs[i].dbid,
1376                                        dbmap->dbs[i].persistent,
1377                                        pnn, nodemap, generation);
1378                 if (ret != 0) {
1379                         DEBUG(DEBUG_ERR, (__location__ " Failed to recover database 0x%x\n", dbmap->dbs[i].dbid));
1380                         return -1;
1381                 }
1382         }
1383
1384         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - starting database commits\n"));
1385
1386         /* commit all the changes */
1387         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
1388                                         nodes, 0,
1389                                         CONTROL_TIMEOUT(), false, data,
1390                                         NULL, NULL,
1391                                         NULL) != 0) {
1392                 DEBUG(DEBUG_ERR, (__location__ " Unable to commit recovery changes. Recovery failed.\n"));
1393                 return -1;
1394         }
1395
1396         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - committed databases\n"));
1397         
1398
1399         /* update the capabilities for all nodes */
1400         ret = update_capabilities(ctdb, nodemap);
1401         if (ret!=0) {
1402                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
1403                 return -1;
1404         }
1405
1406         /* build a new vnn map with all the currently active and
1407            unbanned nodes */
1408         generation = new_generation();
1409         vnnmap = talloc(mem_ctx, struct ctdb_vnn_map);
1410         CTDB_NO_MEMORY(ctdb, vnnmap);
1411         vnnmap->generation = generation;
1412         vnnmap->size = 0;
1413         vnnmap->map = talloc_zero_array(vnnmap, uint32_t, vnnmap->size);
1414         CTDB_NO_MEMORY(ctdb, vnnmap->map);
1415         for (i=j=0;i<nodemap->num;i++) {
1416                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1417                         continue;
1418                 }
1419                 if (!(ctdb->nodes[i]->capabilities & CTDB_CAP_LMASTER)) {
1420                         /* this node can not be an lmaster */
1421                         DEBUG(DEBUG_DEBUG, ("Node %d cant be a LMASTER, skipping it\n", i));
1422                         continue;
1423                 }
1424
1425                 vnnmap->size++;
1426                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1427                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1428                 vnnmap->map[j++] = nodemap->nodes[i].pnn;
1429
1430         }
1431         if (vnnmap->size == 0) {
1432                 DEBUG(DEBUG_NOTICE, ("No suitable lmasters found. Adding local node (recmaster) anyway.\n"));
1433                 vnnmap->size++;
1434                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1435                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1436                 vnnmap->map[0] = pnn;
1437         }       
1438
1439         /* update to the new vnnmap on all nodes */
1440         ret = update_vnnmap_on_all_nodes(ctdb, nodemap, pnn, vnnmap, mem_ctx);
1441         if (ret != 0) {
1442                 DEBUG(DEBUG_ERR, (__location__ " Unable to update vnnmap on all nodes\n"));
1443                 return -1;
1444         }
1445
1446         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated vnnmap\n"));
1447
1448         /* update recmaster to point to us for all nodes */
1449         ret = set_recovery_master(ctdb, nodemap, pnn);
1450         if (ret!=0) {
1451                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery master\n"));
1452                 return -1;
1453         }
1454
1455         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated recmaster\n"));
1456
1457         /*
1458           update all nodes to have the same flags that we have
1459          */
1460         for (i=0;i<nodemap->num;i++) {
1461                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1462                         continue;
1463                 }
1464
1465                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
1466                 if (ret != 0) {
1467                         DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
1468                         return -1;
1469                 }
1470         }
1471
1472         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
1473
1474         /* disable recovery mode */
1475         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_NORMAL);
1476         if (ret != 0) {
1477                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to normal on cluster\n"));
1478                 return -1;
1479         }
1480
1481         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - disabled recovery mode\n"));
1482
1483         /*
1484           tell nodes to takeover their public IPs
1485          */
1486         rec->need_takeover_run = false;
1487         ret = ctdb_takeover_run(ctdb, nodemap);
1488         if (ret != 0) {
1489                 DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses\n"));
1490                 return -1;
1491         }
1492         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - takeip finished\n"));
1493
1494         /* execute the "recovered" event script on all nodes */
1495         ret = run_recovered_eventscript(ctdb, nodemap, "do_recovery");
1496         if (ret!=0) {
1497                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Recovery process failed.\n"));
1498                 return -1;
1499         }
1500
1501         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - finished the recovered event\n"));
1502
1503         /* send a message to all clients telling them that the cluster 
1504            has been reconfigured */
1505         ctdb_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_RECONFIGURE, tdb_null);
1506
1507         DEBUG(DEBUG_NOTICE, (__location__ " Recovery complete\n"));
1508
1509         rec->need_recovery = false;
1510
1511         /* we managed to complete a full recovery, make sure to forgive
1512            any past sins by the nodes that could now participate in the
1513            recovery.
1514         */
1515         DEBUG(DEBUG_ERR,("Resetting ban count to 0 for all nodes\n"));
1516         for (i=0;i<nodemap->num;i++) {
1517                 struct ctdb_banning_state *ban_state;
1518
1519                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1520                         continue;
1521                 }
1522
1523                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[nodemap->nodes[i].pnn]->ban_state;
1524                 if (ban_state == NULL) {
1525                         continue;
1526                 }
1527
1528                 ban_state->count = 0;
1529         }
1530
1531
1532         /* We just finished a recovery successfully. 
1533            We now wait for rerecovery_timeout before we allow 
1534            another recovery to take place.
1535         */
1536         DEBUG(DEBUG_NOTICE, (__location__ " New recoveries supressed for the rerecovery timeout\n"));
1537         ctdb_wait_timeout(ctdb, ctdb->tunable.rerecovery_timeout);
1538         DEBUG(DEBUG_NOTICE, (__location__ " Rerecovery timeout elapsed. Recovery reactivated.\n"));
1539
1540         return 0;
1541 }
1542
1543
1544 /*
1545   elections are won by first checking the number of connected nodes, then
1546   the priority time, then the pnn
1547  */
1548 struct election_message {
1549         uint32_t num_connected;
1550         struct timeval priority_time;
1551         uint32_t pnn;
1552         uint32_t node_flags;
1553 };
1554
1555 /*
1556   form this nodes election data
1557  */
1558 static void ctdb_election_data(struct ctdb_recoverd *rec, struct election_message *em)
1559 {
1560         int ret, i;
1561         struct ctdb_node_map *nodemap;
1562         struct ctdb_context *ctdb = rec->ctdb;
1563
1564         ZERO_STRUCTP(em);
1565
1566         em->pnn = rec->ctdb->pnn;
1567         em->priority_time = rec->priority_time;
1568
1569         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, rec, &nodemap);
1570         if (ret != 0) {
1571                 DEBUG(DEBUG_ERR,(__location__ " unable to get election data\n"));
1572                 return;
1573         }
1574
1575         rec->node_flags = nodemap->nodes[ctdb->pnn].flags;
1576         em->node_flags = rec->node_flags;
1577
1578         for (i=0;i<nodemap->num;i++) {
1579                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1580                         em->num_connected++;
1581                 }
1582         }
1583
1584         /* we shouldnt try to win this election if we cant be a recmaster */
1585         if ((ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1586                 em->num_connected = 0;
1587                 em->priority_time = timeval_current();
1588         }
1589
1590         talloc_free(nodemap);
1591 }
1592
1593 /*
1594   see if the given election data wins
1595  */
1596 static bool ctdb_election_win(struct ctdb_recoverd *rec, struct election_message *em)
1597 {
1598         struct election_message myem;
1599         int cmp = 0;
1600
1601         ctdb_election_data(rec, &myem);
1602
1603         /* we cant win if we dont have the recmaster capability */
1604         if ((rec->ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1605                 return false;
1606         }
1607
1608         /* we cant win if we are banned */
1609         if (rec->node_flags & NODE_FLAGS_BANNED) {
1610                 return false;
1611         }       
1612
1613         /* we cant win if we are stopped */
1614         if (rec->node_flags & NODE_FLAGS_STOPPED) {
1615                 return false;
1616         }       
1617
1618         /* we will automatically win if the other node is banned */
1619         if (em->node_flags & NODE_FLAGS_BANNED) {
1620                 return true;
1621         }
1622
1623         /* we will automatically win if the other node is banned */
1624         if (em->node_flags & NODE_FLAGS_STOPPED) {
1625                 return true;
1626         }
1627
1628         /* try to use the most connected node */
1629         if (cmp == 0) {
1630                 cmp = (int)myem.num_connected - (int)em->num_connected;
1631         }
1632
1633         /* then the longest running node */
1634         if (cmp == 0) {
1635                 cmp = timeval_compare(&em->priority_time, &myem.priority_time);
1636         }
1637
1638         if (cmp == 0) {
1639                 cmp = (int)myem.pnn - (int)em->pnn;
1640         }
1641
1642         return cmp > 0;
1643 }
1644
1645 /*
1646   send out an election request
1647  */
1648 static int send_election_request(struct ctdb_recoverd *rec, uint32_t pnn, bool update_recmaster)
1649 {
1650         int ret;
1651         TDB_DATA election_data;
1652         struct election_message emsg;
1653         uint64_t srvid;
1654         struct ctdb_context *ctdb = rec->ctdb;
1655
1656         srvid = CTDB_SRVID_RECOVERY;
1657
1658         ctdb_election_data(rec, &emsg);
1659
1660         election_data.dsize = sizeof(struct election_message);
1661         election_data.dptr  = (unsigned char *)&emsg;
1662
1663
1664         /* send an election message to all active nodes */
1665         DEBUG(DEBUG_INFO,(__location__ " Send election request to all active nodes\n"));
1666         ctdb_send_message(ctdb, CTDB_BROADCAST_ALL, srvid, election_data);
1667
1668
1669         /* A new node that is already frozen has entered the cluster.
1670            The existing nodes are not frozen and dont need to be frozen
1671            until the election has ended and we start the actual recovery
1672         */
1673         if (update_recmaster == true) {
1674                 /* first we assume we will win the election and set 
1675                    recoverymaster to be ourself on the current node
1676                  */
1677                 ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), pnn, pnn);
1678                 if (ret != 0) {
1679                         DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request\n"));
1680                         return -1;
1681                 }
1682         }
1683
1684
1685         return 0;
1686 }
1687
1688 /*
1689   this function will unban all nodes in the cluster
1690 */
1691 static void unban_all_nodes(struct ctdb_context *ctdb)
1692 {
1693         int ret, i;
1694         struct ctdb_node_map *nodemap;
1695         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1696         
1697         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1698         if (ret != 0) {
1699                 DEBUG(DEBUG_ERR,(__location__ " failed to get nodemap to unban all nodes\n"));
1700                 return;
1701         }
1702
1703         for (i=0;i<nodemap->num;i++) {
1704                 if ( (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED))
1705                   && (nodemap->nodes[i].flags & NODE_FLAGS_BANNED) ) {
1706                         ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[i].pnn, 0, NODE_FLAGS_BANNED);
1707                 }
1708         }
1709
1710         talloc_free(tmp_ctx);
1711 }
1712
1713
1714 /*
1715   we think we are winning the election - send a broadcast election request
1716  */
1717 static void election_send_request(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
1718 {
1719         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1720         int ret;
1721
1722         ret = send_election_request(rec, ctdb_get_pnn(rec->ctdb), false);
1723         if (ret != 0) {
1724                 DEBUG(DEBUG_ERR,("Failed to send election request!\n"));
1725         }
1726
1727         talloc_free(rec->send_election_te);
1728         rec->send_election_te = NULL;
1729 }
1730
1731 /*
1732   handler for memory dumps
1733 */
1734 static void mem_dump_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1735                              TDB_DATA data, void *private_data)
1736 {
1737         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1738         TDB_DATA *dump;
1739         int ret;
1740         struct rd_memdump_reply *rd;
1741
1742         if (data.dsize != sizeof(struct rd_memdump_reply)) {
1743                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
1744                 talloc_free(tmp_ctx);
1745                 return;
1746         }
1747         rd = (struct rd_memdump_reply *)data.dptr;
1748
1749         dump = talloc_zero(tmp_ctx, TDB_DATA);
1750         if (dump == NULL) {
1751                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate memory for memdump\n"));
1752                 talloc_free(tmp_ctx);
1753                 return;
1754         }
1755         ret = ctdb_dump_memory(ctdb, dump);
1756         if (ret != 0) {
1757                 DEBUG(DEBUG_ERR, (__location__ " ctdb_dump_memory() failed\n"));
1758                 talloc_free(tmp_ctx);
1759                 return;
1760         }
1761
1762 DEBUG(DEBUG_ERR, ("recovery master memory dump\n"));            
1763
1764         ret = ctdb_send_message(ctdb, rd->pnn, rd->srvid, *dump);
1765         if (ret != 0) {
1766                 DEBUG(DEBUG_ERR,("Failed to send rd memdump reply message\n"));
1767                 talloc_free(tmp_ctx);
1768                 return;
1769         }
1770
1771         talloc_free(tmp_ctx);
1772 }
1773
1774 /*
1775   handler for reload_nodes
1776 */
1777 static void reload_nodes_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1778                              TDB_DATA data, void *private_data)
1779 {
1780         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1781
1782         DEBUG(DEBUG_ERR, (__location__ " Reload nodes file from recovery daemon\n"));
1783
1784         reload_nodes_file(rec->ctdb);
1785 }
1786
1787
1788 static void reenable_ip_check(struct event_context *ev, struct timed_event *te, 
1789                               struct timeval yt, void *p)
1790 {
1791         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1792
1793         talloc_free(rec->ip_check_disable_ctx);
1794         rec->ip_check_disable_ctx = NULL;
1795 }
1796
1797 static void disable_ip_check_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1798                              TDB_DATA data, void *private_data)
1799 {
1800         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1801         uint32_t timeout;
1802
1803         if (rec->ip_check_disable_ctx != NULL) {
1804                 talloc_free(rec->ip_check_disable_ctx);
1805                 rec->ip_check_disable_ctx = NULL;
1806         }
1807
1808         if (data.dsize != sizeof(uint32_t)) {
1809                 DEBUG(DEBUG_ERR,(__location__ " Wrong size for data :%lu "
1810                                  "expexting %lu\n", (long unsigned)data.dsize,
1811                                  (long unsigned)sizeof(uint32_t)));
1812                 return;
1813         }
1814         if (data.dptr == NULL) {
1815                 DEBUG(DEBUG_ERR,(__location__ " No data recaived\n"));
1816                 return;
1817         }
1818
1819         timeout = *((uint32_t *)data.dptr);
1820         DEBUG(DEBUG_NOTICE,("Disabling ip check for %u seconds\n", timeout));
1821
1822         rec->ip_check_disable_ctx = talloc_new(rec);
1823         CTDB_NO_MEMORY_VOID(ctdb, rec->ip_check_disable_ctx);
1824
1825         event_add_timed(ctdb->ev, rec->ip_check_disable_ctx, timeval_current_ofs(timeout, 0), reenable_ip_check, rec);
1826 }
1827
1828
1829 /*
1830   handler for ip reallocate, just add it to the list of callers and 
1831   handle this later in the monitor_cluster loop so we do not recurse
1832   with other callers to takeover_run()
1833 */
1834 static void ip_reallocate_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1835                              TDB_DATA data, void *private_data)
1836 {
1837         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1838         struct ip_reallocate_list *caller;
1839
1840         if (data.dsize != sizeof(struct rd_memdump_reply)) {
1841                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
1842                 return;
1843         }
1844
1845         if (rec->ip_reallocate_ctx == NULL) {
1846                 rec->ip_reallocate_ctx = talloc_new(rec);
1847                 CTDB_NO_MEMORY_FATAL(ctdb, rec->ip_reallocate_ctx);
1848         }
1849
1850         caller = talloc(rec->ip_reallocate_ctx, struct ip_reallocate_list);
1851         CTDB_NO_MEMORY_FATAL(ctdb, caller);
1852
1853         caller->rd   = (struct rd_memdump_reply *)talloc_steal(caller, data.dptr);
1854         caller->next = rec->reallocate_callers;
1855         rec->reallocate_callers = caller;
1856
1857         return;
1858 }
1859
1860 static void process_ipreallocate_requests(struct ctdb_context *ctdb, struct ctdb_recoverd *rec)
1861 {
1862         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1863         TDB_DATA result;
1864         int32_t ret;
1865         struct ip_reallocate_list *callers;
1866
1867         DEBUG(DEBUG_INFO, ("recovery master forced ip reallocation\n"));
1868         ret = ctdb_takeover_run(ctdb, rec->nodemap);
1869         result.dsize = sizeof(int32_t);
1870         result.dptr  = (uint8_t *)&ret;
1871
1872         for (callers=rec->reallocate_callers; callers; callers=callers->next) {
1873
1874                 /* Someone that sent srvid==0 does not want a reply */
1875                 if (callers->rd->srvid == 0) {
1876                         continue;
1877                 }
1878                 DEBUG(DEBUG_INFO,("Sending ip reallocate reply message to "
1879                                   "%u:%llu\n", (unsigned)callers->rd->pnn,
1880                                   (unsigned long long)callers->rd->srvid));
1881                 ret = ctdb_send_message(ctdb, callers->rd->pnn, callers->rd->srvid, result);
1882                 if (ret != 0) {
1883                         DEBUG(DEBUG_ERR,("Failed to send ip reallocate reply "
1884                                          "message to %u:%llu\n",
1885                                          (unsigned)callers->rd->pnn,
1886                                          (unsigned long long)callers->rd->srvid));
1887                 }
1888         }
1889
1890         talloc_free(tmp_ctx);
1891         talloc_free(rec->ip_reallocate_ctx);
1892         rec->ip_reallocate_ctx = NULL;
1893         rec->reallocate_callers = NULL;
1894         
1895 }
1896
1897
1898 /*
1899   handler for recovery master elections
1900 */
1901 static void election_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1902                              TDB_DATA data, void *private_data)
1903 {
1904         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1905         int ret;
1906         struct election_message *em = (struct election_message *)data.dptr;
1907         TALLOC_CTX *mem_ctx;
1908
1909         /* we got an election packet - update the timeout for the election */
1910         talloc_free(rec->election_timeout);
1911         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
1912                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
1913                                                 ctdb_election_timeout, rec);
1914
1915         mem_ctx = talloc_new(ctdb);
1916
1917         /* someone called an election. check their election data
1918            and if we disagree and we would rather be the elected node, 
1919            send a new election message to all other nodes
1920          */
1921         if (ctdb_election_win(rec, em)) {
1922                 if (!rec->send_election_te) {
1923                         rec->send_election_te = event_add_timed(ctdb->ev, rec, 
1924                                                                 timeval_current_ofs(0, 500000),
1925                                                                 election_send_request, rec);
1926                 }
1927                 talloc_free(mem_ctx);
1928                 /*unban_all_nodes(ctdb);*/
1929                 return;
1930         }
1931         
1932         /* we didn't win */
1933         talloc_free(rec->send_election_te);
1934         rec->send_election_te = NULL;
1935
1936         if (ctdb->tunable.verify_recovery_lock != 0) {
1937                 /* release the recmaster lock */
1938                 if (em->pnn != ctdb->pnn &&
1939                     ctdb->recovery_lock_fd != -1) {
1940                         close(ctdb->recovery_lock_fd);
1941                         ctdb->recovery_lock_fd = -1;
1942                         unban_all_nodes(ctdb);
1943                 }
1944         }
1945
1946         /* ok, let that guy become recmaster then */
1947         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), ctdb_get_pnn(ctdb), em->pnn);
1948         if (ret != 0) {
1949                 DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request"));
1950                 talloc_free(mem_ctx);
1951                 return;
1952         }
1953
1954         talloc_free(mem_ctx);
1955         return;
1956 }
1957
1958
1959 /*
1960   force the start of the election process
1961  */
1962 static void force_election(struct ctdb_recoverd *rec, uint32_t pnn, 
1963                            struct ctdb_node_map *nodemap)
1964 {
1965         int ret;
1966         struct ctdb_context *ctdb = rec->ctdb;
1967
1968         DEBUG(DEBUG_INFO,(__location__ " Force an election\n"));
1969
1970         /* set all nodes to recovery mode to stop all internode traffic */
1971         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
1972         if (ret != 0) {
1973                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
1974                 return;
1975         }
1976
1977         talloc_free(rec->election_timeout);
1978         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
1979                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
1980                                                 ctdb_election_timeout, rec);
1981
1982         ret = send_election_request(rec, pnn, true);
1983         if (ret!=0) {
1984                 DEBUG(DEBUG_ERR, (__location__ " failed to initiate recmaster election"));
1985                 return;
1986         }
1987
1988         /* wait for a few seconds to collect all responses */
1989         ctdb_wait_election(rec);
1990 }
1991
1992
1993
1994 /*
1995   handler for when a node changes its flags
1996 */
1997 static void monitor_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1998                             TDB_DATA data, void *private_data)
1999 {
2000         int ret;
2001         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2002         struct ctdb_node_map *nodemap=NULL;
2003         TALLOC_CTX *tmp_ctx;
2004         uint32_t changed_flags;
2005         int i;
2006         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2007         int disabled_flag_changed;
2008
2009         if (data.dsize != sizeof(*c)) {
2010                 DEBUG(DEBUG_ERR,(__location__ "Invalid data in ctdb_node_flag_change\n"));
2011                 return;
2012         }
2013
2014         tmp_ctx = talloc_new(ctdb);
2015         CTDB_NO_MEMORY_VOID(ctdb, tmp_ctx);
2016
2017         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
2018         if (ret != 0) {
2019                 DEBUG(DEBUG_ERR,(__location__ "ctdb_ctrl_getnodemap failed in monitor_handler\n"));
2020                 talloc_free(tmp_ctx);
2021                 return;         
2022         }
2023
2024
2025         for (i=0;i<nodemap->num;i++) {
2026                 if (nodemap->nodes[i].pnn == c->pnn) break;
2027         }
2028
2029         if (i == nodemap->num) {
2030                 DEBUG(DEBUG_CRIT,(__location__ "Flag change for non-existant node %u\n", c->pnn));
2031                 talloc_free(tmp_ctx);
2032                 return;
2033         }
2034
2035         changed_flags = c->old_flags ^ c->new_flags;
2036
2037         if (nodemap->nodes[i].flags != c->new_flags) {
2038                 DEBUG(DEBUG_NOTICE,("Node %u has changed flags - now 0x%x  was 0x%x\n", c->pnn, c->new_flags, c->old_flags));
2039         }
2040
2041         disabled_flag_changed =  (nodemap->nodes[i].flags ^ c->new_flags) & NODE_FLAGS_DISABLED;
2042
2043         nodemap->nodes[i].flags = c->new_flags;
2044
2045         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
2046                                      CTDB_CURRENT_NODE, &ctdb->recovery_master);
2047
2048         if (ret == 0) {
2049                 ret = ctdb_ctrl_getrecmode(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
2050                                            CTDB_CURRENT_NODE, &ctdb->recovery_mode);
2051         }
2052         
2053         if (ret == 0 &&
2054             ctdb->recovery_master == ctdb->pnn &&
2055             ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
2056                 /* Only do the takeover run if the perm disabled or unhealthy
2057                    flags changed since these will cause an ip failover but not
2058                    a recovery.
2059                    If the node became disconnected or banned this will also
2060                    lead to an ip address failover but that is handled 
2061                    during recovery
2062                 */
2063                 if (disabled_flag_changed) {
2064                         rec->need_takeover_run = true;
2065                 }
2066         }
2067
2068         talloc_free(tmp_ctx);
2069 }
2070
2071 /*
2072   handler for when we need to push out flag changes ot all other nodes
2073 */
2074 static void push_flags_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2075                             TDB_DATA data, void *private_data)
2076 {
2077         int ret;
2078         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2079         struct ctdb_node_map *nodemap=NULL;
2080         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2081         uint32_t recmaster;
2082         uint32_t *nodes;
2083
2084         /* find the recovery master */
2085         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &recmaster);
2086         if (ret != 0) {
2087                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
2088                 talloc_free(tmp_ctx);
2089                 return;
2090         }
2091
2092         /* read the node flags from the recmaster */
2093         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), recmaster, tmp_ctx, &nodemap);
2094         if (ret != 0) {
2095                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", c->pnn));
2096                 talloc_free(tmp_ctx);
2097                 return;
2098         }
2099         if (c->pnn >= nodemap->num) {
2100                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", c->pnn));
2101                 talloc_free(tmp_ctx);
2102                 return;
2103         }
2104
2105         /* send the flags update to all connected nodes */
2106         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2107
2108         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
2109                                       nodes, 0, CONTROL_TIMEOUT(),
2110                                       false, data,
2111                                       NULL, NULL,
2112                                       NULL) != 0) {
2113                 DEBUG(DEBUG_ERR, (__location__ " ctdb_control to modify node flags failed\n"));
2114
2115                 talloc_free(tmp_ctx);
2116                 return;
2117         }
2118
2119         talloc_free(tmp_ctx);
2120 }
2121
2122
2123 struct verify_recmode_normal_data {
2124         uint32_t count;
2125         enum monitor_result status;
2126 };
2127
2128 static void verify_recmode_normal_callback(struct ctdb_client_control_state *state)
2129 {
2130         struct verify_recmode_normal_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmode_normal_data);
2131
2132
2133         /* one more node has responded with recmode data*/
2134         rmdata->count--;
2135
2136         /* if we failed to get the recmode, then return an error and let
2137            the main loop try again.
2138         */
2139         if (state->state != CTDB_CONTROL_DONE) {
2140                 if (rmdata->status == MONITOR_OK) {
2141                         rmdata->status = MONITOR_FAILED;
2142                 }
2143                 return;
2144         }
2145
2146         /* if we got a response, then the recmode will be stored in the
2147            status field
2148         */
2149         if (state->status != CTDB_RECOVERY_NORMAL) {
2150                 DEBUG(DEBUG_NOTICE, (__location__ " Node:%u was in recovery mode. Restart recovery process\n", state->c->hdr.destnode));
2151                 rmdata->status = MONITOR_RECOVERY_NEEDED;
2152         }
2153
2154         return;
2155 }
2156
2157
2158 /* verify that all nodes are in normal recovery mode */
2159 static enum monitor_result verify_recmode(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
2160 {
2161         struct verify_recmode_normal_data *rmdata;
2162         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2163         struct ctdb_client_control_state *state;
2164         enum monitor_result status;
2165         int j;
2166         
2167         rmdata = talloc(mem_ctx, struct verify_recmode_normal_data);
2168         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2169         rmdata->count  = 0;
2170         rmdata->status = MONITOR_OK;
2171
2172         /* loop over all active nodes and send an async getrecmode call to 
2173            them*/
2174         for (j=0; j<nodemap->num; j++) {
2175                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2176                         continue;
2177                 }
2178                 state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, 
2179                                         CONTROL_TIMEOUT(), 
2180                                         nodemap->nodes[j].pnn);
2181                 if (state == NULL) {
2182                         /* we failed to send the control, treat this as 
2183                            an error and try again next iteration
2184                         */                      
2185                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmode_send during monitoring\n"));
2186                         talloc_free(mem_ctx);
2187                         return MONITOR_FAILED;
2188                 }
2189
2190                 /* set up the callback functions */
2191                 state->async.fn = verify_recmode_normal_callback;
2192                 state->async.private_data = rmdata;
2193
2194                 /* one more control to wait for to complete */
2195                 rmdata->count++;
2196         }
2197
2198
2199         /* now wait for up to the maximum number of seconds allowed
2200            or until all nodes we expect a response from has replied
2201         */
2202         while (rmdata->count > 0) {
2203                 event_loop_once(ctdb->ev);
2204         }
2205
2206         status = rmdata->status;
2207         talloc_free(mem_ctx);
2208         return status;
2209 }
2210
2211
2212 struct verify_recmaster_data {
2213         struct ctdb_recoverd *rec;
2214         uint32_t count;
2215         uint32_t pnn;
2216         enum monitor_result status;
2217 };
2218
2219 static void verify_recmaster_callback(struct ctdb_client_control_state *state)
2220 {
2221         struct verify_recmaster_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmaster_data);
2222
2223
2224         /* one more node has responded with recmaster data*/
2225         rmdata->count--;
2226
2227         /* if we failed to get the recmaster, then return an error and let
2228            the main loop try again.
2229         */
2230         if (state->state != CTDB_CONTROL_DONE) {
2231                 if (rmdata->status == MONITOR_OK) {
2232                         rmdata->status = MONITOR_FAILED;
2233                 }
2234                 return;
2235         }
2236
2237         /* if we got a response, then the recmaster will be stored in the
2238            status field
2239         */
2240         if (state->status != rmdata->pnn) {
2241                 DEBUG(DEBUG_ERR,("Node %d does not agree we are the recmaster. Need a new recmaster election\n", state->c->hdr.destnode));
2242                 ctdb_set_culprit(rmdata->rec, state->c->hdr.destnode);
2243                 rmdata->status = MONITOR_ELECTION_NEEDED;
2244         }
2245
2246         return;
2247 }
2248
2249
2250 /* verify that all nodes agree that we are the recmaster */
2251 static enum monitor_result verify_recmaster(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t pnn)
2252 {
2253         struct ctdb_context *ctdb = rec->ctdb;
2254         struct verify_recmaster_data *rmdata;
2255         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2256         struct ctdb_client_control_state *state;
2257         enum monitor_result status;
2258         int j;
2259         
2260         rmdata = talloc(mem_ctx, struct verify_recmaster_data);
2261         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2262         rmdata->rec    = rec;
2263         rmdata->count  = 0;
2264         rmdata->pnn    = pnn;
2265         rmdata->status = MONITOR_OK;
2266
2267         /* loop over all active nodes and send an async getrecmaster call to 
2268            them*/
2269         for (j=0; j<nodemap->num; j++) {
2270                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2271                         continue;
2272                 }
2273                 state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, 
2274                                         CONTROL_TIMEOUT(),
2275                                         nodemap->nodes[j].pnn);
2276                 if (state == NULL) {
2277                         /* we failed to send the control, treat this as 
2278                            an error and try again next iteration
2279                         */                      
2280                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmaster_send during monitoring\n"));
2281                         talloc_free(mem_ctx);
2282                         return MONITOR_FAILED;
2283                 }
2284
2285                 /* set up the callback functions */
2286                 state->async.fn = verify_recmaster_callback;
2287                 state->async.private_data = rmdata;
2288
2289                 /* one more control to wait for to complete */
2290                 rmdata->count++;
2291         }
2292
2293
2294         /* now wait for up to the maximum number of seconds allowed
2295            or until all nodes we expect a response from has replied
2296         */
2297         while (rmdata->count > 0) {
2298                 event_loop_once(ctdb->ev);
2299         }
2300
2301         status = rmdata->status;
2302         talloc_free(mem_ctx);
2303         return status;
2304 }
2305
2306
2307 /* called to check that the allocation of public ip addresses is ok.
2308 */
2309 static int verify_ip_allocation(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, uint32_t pnn)
2310 {
2311         TALLOC_CTX *mem_ctx = talloc_new(NULL);
2312         struct ctdb_all_public_ips *ips = NULL;
2313         struct ctdb_uptime *uptime1 = NULL;
2314         struct ctdb_uptime *uptime2 = NULL;
2315         int ret, j;
2316
2317         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
2318                                 CTDB_CURRENT_NODE, &uptime1);
2319         if (ret != 0) {
2320                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
2321                 talloc_free(mem_ctx);
2322                 return -1;
2323         }
2324
2325         /* read the ip allocation from the local node */
2326         ret = ctdb_ctrl_get_public_ips(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, &ips);
2327         if (ret != 0) {
2328                 DEBUG(DEBUG_ERR, ("Unable to get public ips from local node %u\n", pnn));
2329                 talloc_free(mem_ctx);
2330                 return -1;
2331         }
2332
2333         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
2334                                 CTDB_CURRENT_NODE, &uptime2);
2335         if (ret != 0) {
2336                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
2337                 talloc_free(mem_ctx);
2338                 return -1;
2339         }
2340
2341         /* skip the check if the startrecovery time has changed */
2342         if (timeval_compare(&uptime1->last_recovery_started,
2343                             &uptime2->last_recovery_started) != 0) {
2344                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
2345                 talloc_free(mem_ctx);
2346                 return 0;
2347         }
2348
2349         /* skip the check if the endrecovery time has changed */
2350         if (timeval_compare(&uptime1->last_recovery_finished,
2351                             &uptime2->last_recovery_finished) != 0) {
2352                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
2353                 talloc_free(mem_ctx);
2354                 return 0;
2355         }
2356
2357         /* skip the check if we have started but not finished recovery */
2358         if (timeval_compare(&uptime1->last_recovery_finished,
2359                             &uptime1->last_recovery_started) != 1) {
2360                 DEBUG(DEBUG_NOTICE, (__location__ " in the middle of recovery or ip reallocation. skipping public ip address check\n"));
2361                 talloc_free(mem_ctx);
2362
2363                 return 0;
2364         }
2365
2366         /* verify that we have the ip addresses we should have
2367            and we dont have ones we shouldnt have.
2368            if we find an inconsistency we set recmode to
2369            active on the local node and wait for the recmaster
2370            to do a full blown recovery
2371         */
2372         for (j=0; j<ips->num; j++) {
2373                 if (ips->ips[j].pnn == pnn) {
2374                         if (!ctdb_sys_have_ip(&ips->ips[j].addr)) {
2375                                 struct takeover_run_reply rd;
2376                                 TDB_DATA data;
2377
2378                                 DEBUG(DEBUG_CRIT,("Public address '%s' is missing and we should serve this ip\n",
2379                                         ctdb_addr_to_str(&ips->ips[j].addr)));
2380
2381                                 rd.pnn   = ctdb->pnn;
2382                                 rd.srvid = 0;
2383                                 data.dptr = (uint8_t *)&rd;
2384                                 data.dsize = sizeof(rd);
2385
2386                                 ret = ctdb_send_message(ctdb, rec->recmaster, CTDB_SRVID_TAKEOVER_RUN, data);
2387                                 if (ret != 0) {
2388                                         DEBUG(DEBUG_ERR,(__location__ " Failed to send ipreallocate to recmaster :%d\n", (int)rec->recmaster));
2389                                 }
2390                         }
2391                 } else {
2392                         if (ctdb_sys_have_ip(&ips->ips[j].addr)) {
2393                                 struct takeover_run_reply rd;
2394                                 TDB_DATA data;
2395
2396                                 DEBUG(DEBUG_CRIT,("We are still serving a public address '%s' that we should not be serving.\n", 
2397                                         ctdb_addr_to_str(&ips->ips[j].addr)));
2398
2399                                 rd.pnn   = ctdb->pnn;
2400                                 rd.srvid = 0;
2401                                 data.dptr = (uint8_t *)&rd;
2402                                 data.dsize = sizeof(rd);
2403
2404                                 ret = ctdb_send_message(ctdb, rec->recmaster, CTDB_SRVID_TAKEOVER_RUN, data);
2405                                 if (ret != 0) {
2406                                         DEBUG(DEBUG_ERR,(__location__ " Failed to send ipreallocate to recmaster :%d\n", (int)rec->recmaster));
2407                                 }
2408                         }
2409                 }
2410         }
2411
2412         talloc_free(mem_ctx);
2413         return 0;
2414 }
2415
2416
2417 static void async_getnodemap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
2418 {
2419         struct ctdb_node_map **remote_nodemaps = callback_data;
2420
2421         if (node_pnn >= ctdb->num_nodes) {
2422                 DEBUG(DEBUG_ERR,(__location__ " pnn from invalid node\n"));
2423                 return;
2424         }
2425
2426         remote_nodemaps[node_pnn] = (struct ctdb_node_map *)talloc_steal(remote_nodemaps, outdata.dptr);
2427
2428 }
2429
2430 static int get_remote_nodemaps(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx,
2431         struct ctdb_node_map *nodemap,
2432         struct ctdb_node_map **remote_nodemaps)
2433 {
2434         uint32_t *nodes;
2435
2436         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
2437         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_NODEMAP,
2438                                         nodes, 0,
2439                                         CONTROL_TIMEOUT(), false, tdb_null,
2440                                         async_getnodemap_callback,
2441                                         NULL,
2442                                         remote_nodemaps) != 0) {
2443                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull all remote nodemaps\n"));
2444
2445                 return -1;
2446         }
2447
2448         return 0;
2449 }
2450
2451 enum reclock_child_status { RECLOCK_CHECKING, RECLOCK_OK, RECLOCK_FAILED, RECLOCK_TIMEOUT};
2452 struct ctdb_check_reclock_state {
2453         struct ctdb_context *ctdb;
2454         struct timeval start_time;
2455         int fd[2];
2456         pid_t child;
2457         struct timed_event *te;
2458         struct fd_event *fde;
2459         enum reclock_child_status status;
2460 };
2461
2462 /* when we free the reclock state we must kill any child process.
2463 */
2464 static int check_reclock_destructor(struct ctdb_check_reclock_state *state)
2465 {
2466         struct ctdb_context *ctdb = state->ctdb;
2467
2468         ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&state->start_time));
2469
2470         if (state->fd[0] != -1) {
2471                 close(state->fd[0]);
2472                 state->fd[0] = -1;
2473         }
2474         if (state->fd[1] != -1) {
2475                 close(state->fd[1]);
2476                 state->fd[1] = -1;
2477         }
2478         kill(state->child, SIGKILL);
2479         return 0;
2480 }
2481
2482 /*
2483   called if our check_reclock child times out. this would happen if
2484   i/o to the reclock file blocks.
2485  */
2486 static void ctdb_check_reclock_timeout(struct event_context *ev, struct timed_event *te, 
2487                                          struct timeval t, void *private_data)
2488 {
2489         struct ctdb_check_reclock_state *state = talloc_get_type(private_data, 
2490                                            struct ctdb_check_reclock_state);
2491
2492         DEBUG(DEBUG_ERR,(__location__ " check_reclock child process hung/timedout CFS slow to grant locks?\n"));
2493         state->status = RECLOCK_TIMEOUT;
2494 }
2495
2496 /* this is called when the child process has completed checking the reclock
2497    file and has written data back to us through the pipe.
2498 */
2499 static void reclock_child_handler(struct event_context *ev, struct fd_event *fde, 
2500                              uint16_t flags, void *private_data)
2501 {
2502         struct ctdb_check_reclock_state *state= talloc_get_type(private_data, 
2503                                              struct ctdb_check_reclock_state);
2504         char c = 0;
2505         int ret;
2506
2507         /* we got a response from our child process so we can abort the
2508            timeout.
2509         */
2510         talloc_free(state->te);
2511         state->te = NULL;
2512
2513         ret = read(state->fd[0], &c, 1);
2514         if (ret != 1 || c != RECLOCK_OK) {
2515                 DEBUG(DEBUG_ERR,(__location__ " reclock child process returned error %d\n", c));
2516                 state->status = RECLOCK_FAILED;
2517
2518                 return;
2519         }
2520
2521         state->status = RECLOCK_OK;
2522         return;
2523 }
2524
2525 static int check_recovery_lock(struct ctdb_context *ctdb)
2526 {
2527         int ret;
2528         struct ctdb_check_reclock_state *state;
2529         pid_t parent = getpid();
2530
2531         if (ctdb->recovery_lock_fd == -1) {
2532                 DEBUG(DEBUG_CRIT,("recovery master doesn't have the recovery lock\n"));
2533                 return -1;
2534         }
2535
2536         state = talloc(ctdb, struct ctdb_check_reclock_state);
2537         CTDB_NO_MEMORY(ctdb, state);
2538
2539         state->ctdb = ctdb;
2540         state->start_time = timeval_current();
2541         state->status = RECLOCK_CHECKING;
2542         state->fd[0] = -1;
2543         state->fd[1] = -1;
2544
2545         ret = pipe(state->fd);
2546         if (ret != 0) {
2547                 talloc_free(state);
2548                 DEBUG(DEBUG_CRIT,(__location__ " Failed to open pipe for check_reclock child\n"));
2549                 return -1;
2550         }
2551
2552         state->child = fork();
2553         if (state->child == (pid_t)-1) {
2554                 DEBUG(DEBUG_CRIT,(__location__ " fork() failed in check_reclock child\n"));
2555                 close(state->fd[0]);
2556                 state->fd[0] = -1;
2557                 close(state->fd[1]);
2558                 state->fd[1] = -1;
2559                 talloc_free(state);
2560                 return -1;
2561         }
2562
2563         if (state->child == 0) {
2564                 char cc = RECLOCK_OK;
2565                 close(state->fd[0]);
2566                 state->fd[0] = -1;
2567
2568                 if (pread(ctdb->recovery_lock_fd, &cc, 1, 0) == -1) {
2569                         DEBUG(DEBUG_CRIT,("failed read from recovery_lock_fd - %s\n", strerror(errno)));
2570                         cc = RECLOCK_FAILED;
2571                 }
2572
2573                 write(state->fd[1], &cc, 1);
2574                 /* make sure we die when our parent dies */
2575                 while (kill(parent, 0) == 0 || errno != ESRCH) {
2576                         sleep(5);
2577                         write(state->fd[1], &cc, 1);
2578                 }
2579                 _exit(0);
2580         }
2581         close(state->fd[1]);
2582         state->fd[1] = -1;
2583         set_close_on_exec(state->fd[0]);
2584
2585         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d for check_recovery_lock\n", state->fd[0]));
2586
2587         talloc_set_destructor(state, check_reclock_destructor);
2588
2589         state->te = event_add_timed(ctdb->ev, state, timeval_current_ofs(15, 0),
2590                                     ctdb_check_reclock_timeout, state);
2591         if (state->te == NULL) {
2592                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create a timed event for reclock child\n"));
2593                 talloc_free(state);
2594                 return -1;
2595         }
2596
2597         state->fde = event_add_fd(ctdb->ev, state, state->fd[0],
2598                                 EVENT_FD_READ|EVENT_FD_AUTOCLOSE,
2599                                 reclock_child_handler,
2600                                 (void *)state);
2601
2602         if (state->fde == NULL) {
2603                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create an fd event for reclock child\n"));
2604                 talloc_free(state);
2605                 return -1;
2606         }
2607
2608         while (state->status == RECLOCK_CHECKING) {
2609                 event_loop_once(ctdb->ev);
2610         }
2611
2612         if (state->status == RECLOCK_FAILED) {
2613                 DEBUG(DEBUG_ERR,(__location__ " reclock child failed when checking file\n"));
2614                 close(ctdb->recovery_lock_fd);
2615                 ctdb->recovery_lock_fd = -1;
2616                 talloc_free(state);
2617                 return -1;
2618         }
2619
2620         talloc_free(state);
2621         return 0;
2622 }
2623
2624 static int update_recovery_lock_file(struct ctdb_context *ctdb)
2625 {
2626         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
2627         const char *reclockfile;
2628
2629         if (ctdb_ctrl_getreclock(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &reclockfile) != 0) {
2630                 DEBUG(DEBUG_ERR,("Failed to read reclock file from daemon\n"));
2631                 talloc_free(tmp_ctx);
2632                 return -1;      
2633         }
2634
2635         if (reclockfile == NULL) {
2636                 if (ctdb->recovery_lock_file != NULL) {
2637                         DEBUG(DEBUG_ERR,("Reclock file disabled\n"));
2638                         talloc_free(ctdb->recovery_lock_file);
2639                         ctdb->recovery_lock_file = NULL;
2640                         if (ctdb->recovery_lock_fd != -1) {
2641                                 close(ctdb->recovery_lock_fd);
2642                                 ctdb->recovery_lock_fd = -1;
2643                         }
2644                 }
2645                 ctdb->tunable.verify_recovery_lock = 0;
2646                 talloc_free(tmp_ctx);
2647                 return 0;
2648         }
2649
2650         if (ctdb->recovery_lock_file == NULL) {
2651                 ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
2652                 if (ctdb->recovery_lock_fd != -1) {
2653                         close(ctdb->recovery_lock_fd);
2654                         ctdb->recovery_lock_fd = -1;
2655                 }
2656                 talloc_free(tmp_ctx);
2657                 return 0;
2658         }
2659
2660
2661         if (!strcmp(reclockfile, ctdb->recovery_lock_file)) {
2662                 talloc_free(tmp_ctx);
2663                 return 0;
2664         }
2665
2666         talloc_free(ctdb->recovery_lock_file);
2667         ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
2668         ctdb->tunable.verify_recovery_lock = 0;
2669         if (ctdb->recovery_lock_fd != -1) {
2670                 close(ctdb->recovery_lock_fd);
2671                 ctdb->recovery_lock_fd = -1;
2672         }
2673
2674         talloc_free(tmp_ctx);
2675         return 0;
2676 }
2677                 
2678 /*
2679   the main monitoring loop
2680  */
2681 static void monitor_cluster(struct ctdb_context *ctdb)
2682 {
2683         uint32_t pnn;
2684         TALLOC_CTX *mem_ctx=NULL;
2685         struct ctdb_node_map *nodemap=NULL;
2686         struct ctdb_node_map *recmaster_nodemap=NULL;
2687         struct ctdb_node_map **remote_nodemaps=NULL;
2688         struct ctdb_vnn_map *vnnmap=NULL;
2689         struct ctdb_vnn_map *remote_vnnmap=NULL;
2690         int32_t debug_level;
2691         int i, j, ret;
2692         struct ctdb_recoverd *rec;
2693
2694         DEBUG(DEBUG_NOTICE,("monitor_cluster starting\n"));
2695
2696         rec = talloc_zero(ctdb, struct ctdb_recoverd);
2697         CTDB_NO_MEMORY_FATAL(ctdb, rec);
2698
2699         rec->ctdb = ctdb;
2700
2701         rec->priority_time = timeval_current();
2702
2703         /* register a message port for sending memory dumps */
2704         ctdb_set_message_handler(ctdb, CTDB_SRVID_MEM_DUMP, mem_dump_handler, rec);
2705
2706         /* register a message port for recovery elections */
2707         ctdb_set_message_handler(ctdb, CTDB_SRVID_RECOVERY, election_handler, rec);
2708
2709         /* when nodes are disabled/enabled */
2710         ctdb_set_message_handler(ctdb, CTDB_SRVID_SET_NODE_FLAGS, monitor_handler, rec);
2711
2712         /* when we are asked to puch out a flag change */
2713         ctdb_set_message_handler(ctdb, CTDB_SRVID_PUSH_NODE_FLAGS, push_flags_handler, rec);
2714
2715         /* register a message port for vacuum fetch */
2716         ctdb_set_message_handler(ctdb, CTDB_SRVID_VACUUM_FETCH, vacuum_fetch_handler, rec);
2717
2718         /* register a message port for reloadnodes  */
2719         ctdb_set_message_handler(ctdb, CTDB_SRVID_RELOAD_NODES, reload_nodes_handler, rec);
2720
2721         /* register a message port for performing a takeover run */
2722         ctdb_set_message_handler(ctdb, CTDB_SRVID_TAKEOVER_RUN, ip_reallocate_handler, rec);
2723
2724         /* register a message port for disabling the ip check for a short while */
2725         ctdb_set_message_handler(ctdb, CTDB_SRVID_DISABLE_IP_CHECK, disable_ip_check_handler, rec);
2726
2727 again:
2728         if (mem_ctx) {
2729                 talloc_free(mem_ctx);
2730                 mem_ctx = NULL;
2731         }
2732         mem_ctx = talloc_new(ctdb);
2733         if (!mem_ctx) {
2734                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create temporary context\n"));
2735                 exit(-1);
2736         }
2737
2738         /* we only check for recovery once every second */
2739         ctdb_wait_timeout(ctdb, ctdb->tunable.recover_interval);
2740
2741         /* verify that the main daemon is still running */
2742         if (kill(ctdb->ctdbd_pid, 0) != 0) {
2743                 DEBUG(DEBUG_CRIT,("CTDB daemon is no longer available. Shutting down recovery daemon\n"));
2744                 exit(-1);
2745         }
2746
2747         /* ping the local daemon to tell it we are alive */
2748         ctdb_ctrl_recd_ping(ctdb);
2749
2750         if (rec->election_timeout) {
2751                 /* an election is in progress */
2752                 goto again;
2753         }
2754
2755         /* read the debug level from the parent and update locally */
2756         ret = ctdb_ctrl_get_debuglevel(ctdb, CTDB_CURRENT_NODE, &debug_level);
2757         if (ret !=0) {
2758                 DEBUG(DEBUG_ERR, (__location__ " Failed to read debuglevel from parent\n"));
2759                 goto again;
2760         }
2761         LogLevel = debug_level;
2762
2763
2764         /* We must check if we need to ban a node here but we want to do this
2765            as early as possible so we dont wait until we have pulled the node
2766            map from the local node. thats why we have the hardcoded value 20
2767         */
2768         for (i=0; i<ctdb->num_nodes; i++) {
2769                 struct ctdb_banning_state *ban_state;
2770
2771                 if (ctdb->nodes[i]->ban_state == NULL) {
2772                         continue;
2773                 }
2774                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
2775                 if (ban_state->count < 20) {
2776                         continue;
2777                 }
2778                 DEBUG(DEBUG_NOTICE,("Node %u has caused %u recoveries recently - banning it for %u seconds\n",
2779                         ctdb->nodes[i]->pnn, ban_state->count,
2780                         ctdb->tunable.recovery_ban_period));
2781                 ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
2782                 ban_state->count = 0;
2783         }
2784
2785         /* get relevant tunables */
2786         ret = ctdb_ctrl_get_all_tunables(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->tunable);
2787         if (ret != 0) {
2788                 DEBUG(DEBUG_ERR,("Failed to get tunables - retrying\n"));
2789                 goto again;
2790         }
2791
2792         /* get the current recovery lock file from the server */
2793         if (update_recovery_lock_file(ctdb) != 0) {
2794                 DEBUG(DEBUG_ERR,("Failed to update the recovery lock file\n"));
2795                 goto again;
2796         }
2797
2798         /* Make sure that if recovery lock verification becomes disabled when
2799            we close the file
2800         */
2801         if (ctdb->tunable.verify_recovery_lock == 0) {
2802                 if (ctdb->recovery_lock_fd != -1) {
2803                         close(ctdb->recovery_lock_fd);
2804                         ctdb->recovery_lock_fd = -1;
2805                 }
2806         }
2807
2808         pnn = ctdb_ctrl_getpnn(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
2809         if (pnn == (uint32_t)-1) {
2810                 DEBUG(DEBUG_ERR,("Failed to get local pnn - retrying\n"));
2811                 goto again;
2812         }
2813
2814         /* get the vnnmap */
2815         ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &vnnmap);
2816         if (ret != 0) {
2817                 DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from node %u\n", pnn));
2818                 goto again;
2819         }
2820
2821
2822         /* get number of nodes */
2823         if (rec->nodemap) {
2824                 talloc_free(rec->nodemap);
2825                 rec->nodemap = NULL;
2826                 nodemap=NULL;
2827         }
2828         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), pnn, rec, &rec->nodemap);
2829         if (ret != 0) {
2830                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", pnn));
2831                 goto again;
2832         }
2833         nodemap = rec->nodemap;
2834
2835         /* check which node is the recovery master */
2836         ret = ctdb_ctrl_getrecmaster(ctdb, mem_ctx, CONTROL_TIMEOUT(), pnn, &rec->recmaster);
2837         if (ret != 0) {
2838                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from node %u\n", pnn));
2839                 goto again;
2840         }
2841
2842         /* if we are not the recmaster we can safely ignore any ip reallocate requests */
2843         if (rec->recmaster != pnn) {
2844                 if (rec->ip_reallocate_ctx != NULL) {
2845                         talloc_free(rec->ip_reallocate_ctx);
2846                         rec->ip_reallocate_ctx = NULL;
2847                         rec->reallocate_callers = NULL;
2848                 }
2849         }
2850         /* if there are takeovers requested, perform it and notify the waiters */
2851         if (rec->reallocate_callers) {
2852                 process_ipreallocate_requests(ctdb, rec);
2853         }
2854
2855         if (rec->recmaster == (uint32_t)-1) {
2856                 DEBUG(DEBUG_NOTICE,(__location__ " Initial recovery master set - forcing election\n"));
2857                 force_election(rec, pnn, nodemap);
2858                 goto again;
2859         }
2860
2861
2862         /* if the local daemon is STOPPED, we verify that the databases are
2863            also frozen and thet the recmode is set to active 
2864         */
2865         if (nodemap->nodes[pnn].flags & NODE_FLAGS_STOPPED) {
2866                 ret = ctdb_ctrl_getrecmode(ctdb, mem_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->recovery_mode);
2867                 if (ret != 0) {
2868                         DEBUG(DEBUG_ERR,(__location__ " Failed to read recmode from local node\n"));
2869                 }
2870                 if (ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
2871                         DEBUG(DEBUG_ERR,("Node is stopped but recovery mode is not active. Activate recovery mode and lock databases\n"));
2872
2873                         ret = ctdb_ctrl_freeze_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, 1);
2874                         if (ret != 0) {
2875                                 DEBUG(DEBUG_ERR,(__location__ " Failed to freeze node due to node being STOPPED\n"));
2876                                 goto again;
2877                         }
2878                         ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
2879                         if (ret != 0) {
2880                                 DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode due to node being stopped\n"));
2881
2882                                 goto again;
2883                         }
2884                         goto again;
2885                 }
2886         }
2887         /* If the local node is stopped, verify we are not the recmaster 
2888            and yield this role if so
2889         */
2890         if ((nodemap->nodes[pnn].flags & NODE_FLAGS_STOPPED) && (rec->recmaster == pnn)) {
2891                 DEBUG(DEBUG_ERR,("Local node is STOPPED. Yielding recmaster role\n"));
2892                 force_election(rec, pnn, nodemap);
2893                 goto again;
2894         }
2895         
2896         /* check that we (recovery daemon) and the local ctdb daemon
2897            agrees on whether we are banned or not
2898         */
2899 //qqq
2900
2901         /* remember our own node flags */
2902         rec->node_flags = nodemap->nodes[pnn].flags;
2903
2904         /* count how many active nodes there are */
2905         rec->num_active    = 0;
2906         rec->num_connected = 0;
2907         for (i=0; i<nodemap->num; i++) {
2908                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
2909                         rec->num_active++;
2910                 }
2911                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
2912                         rec->num_connected++;
2913                 }
2914         }
2915
2916
2917         /* verify that the recmaster node is still active */
2918         for (j=0; j<nodemap->num; j++) {
2919                 if (nodemap->nodes[j].pnn==rec->recmaster) {
2920                         break;
2921                 }
2922         }
2923
2924         if (j == nodemap->num) {
2925                 DEBUG(DEBUG_ERR, ("Recmaster node %u not in list. Force reelection\n", rec->recmaster));
2926                 force_election(rec, pnn, nodemap);
2927                 goto again;
2928         }
2929
2930         /* if recovery master is disconnected we must elect a new recmaster */
2931         if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
2932                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u is disconnected. Force reelection\n", nodemap->nodes[j].pnn));
2933                 force_election(rec, pnn, nodemap);
2934                 goto again;
2935         }
2936
2937         /* grap the nodemap from the recovery master to check if it is banned */
2938         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
2939                                    mem_ctx, &recmaster_nodemap);
2940         if (ret != 0) {
2941                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from recovery master %u\n", 
2942                           nodemap->nodes[j].pnn));
2943                 goto again;
2944         }
2945
2946
2947         if (recmaster_nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2948                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u no longer available. Force reelection\n", nodemap->nodes[j].pnn));
2949                 force_election(rec, pnn, nodemap);
2950                 goto again;
2951         }
2952
2953
2954         /* verify that we have all ip addresses we should have and we dont
2955          * have addresses we shouldnt have.
2956          */ 
2957         if (ctdb->do_checkpublicip) {
2958                 if (rec->ip_check_disable_ctx == NULL) {
2959                         if (verify_ip_allocation(ctdb, rec, pnn) != 0) {
2960                                 DEBUG(DEBUG_ERR, (__location__ " Public IPs were inconsistent.\n"));
2961                         }
2962                 }
2963         }
2964
2965
2966         /* if we are not the recmaster then we do not need to check
2967            if recovery is needed
2968          */
2969         if (pnn != rec->recmaster) {
2970                 goto again;
2971         }
2972
2973
2974         /* ensure our local copies of flags are right */
2975         ret = update_local_flags(rec, nodemap);
2976         if (ret == MONITOR_ELECTION_NEEDED) {
2977                 DEBUG(DEBUG_NOTICE,("update_local_flags() called for a re-election.\n"));
2978                 force_election(rec, pnn, nodemap);
2979                 goto again;
2980         }
2981         if (ret != MONITOR_OK) {
2982                 DEBUG(DEBUG_ERR,("Unable to update local flags\n"));
2983                 goto again;
2984         }
2985
2986         /* update the list of public ips that a node can handle for
2987            all connected nodes
2988         */
2989         if (ctdb->num_nodes != nodemap->num) {
2990                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) reloading nodes file\n", ctdb->num_nodes, nodemap->num));
2991                 reload_nodes_file(ctdb);
2992                 goto again;
2993         }
2994         for (j=0; j<nodemap->num; j++) {
2995                 /* release any existing data */
2996                 if (ctdb->nodes[j]->public_ips) {
2997                         talloc_free(ctdb->nodes[j]->public_ips);
2998                         ctdb->nodes[j]->public_ips = NULL;
2999                 }
3000
3001                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3002                         continue;
3003                 }
3004
3005                 /* grab a new shiny list of public ips from the node */
3006                 if (ctdb_ctrl_get_public_ips(ctdb, CONTROL_TIMEOUT(),
3007                         ctdb->nodes[j]->pnn, 
3008                         ctdb->nodes,
3009                         &ctdb->nodes[j]->public_ips)) {
3010                         DEBUG(DEBUG_ERR,("Failed to read public ips from node : %u\n", 
3011                                 ctdb->nodes[j]->pnn));
3012                         goto again;
3013                 }
3014         }
3015
3016
3017         /* verify that all active nodes agree that we are the recmaster */
3018         switch (verify_recmaster(rec, nodemap, pnn)) {
3019         case MONITOR_RECOVERY_NEEDED:
3020                 /* can not happen */
3021                 goto again;
3022         case MONITOR_ELECTION_NEEDED:
3023                 force_election(rec, pnn, nodemap);
3024                 goto again;
3025         case MONITOR_OK:
3026                 break;
3027         case MONITOR_FAILED:
3028                 goto again;
3029         }
3030
3031
3032         if (rec->need_recovery) {
3033                 /* a previous recovery didn't finish */
3034                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3035                 goto again;             
3036         }
3037
3038         /* verify that all active nodes are in normal mode 
3039            and not in recovery mode 
3040         */
3041         switch (verify_recmode(ctdb, nodemap)) {
3042         case MONITOR_RECOVERY_NEEDED:
3043                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3044                 goto again;
3045         case MONITOR_FAILED:
3046                 goto again;
3047         case MONITOR_ELECTION_NEEDED:
3048                 /* can not happen */
3049         case MONITOR_OK:
3050                 break;
3051         }
3052
3053
3054         if (ctdb->tunable.verify_recovery_lock != 0) {
3055                 /* we should have the reclock - check its not stale */
3056                 ret = check_recovery_lock(ctdb);
3057                 if (ret != 0) {
3058                         DEBUG(DEBUG_ERR,("Failed check_recovery_lock. Force a recovery\n"));
3059                         ctdb_set_culprit(rec, ctdb->pnn);
3060                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3061                         goto again;
3062                 }
3063         }
3064
3065         /* get the nodemap for all active remote nodes
3066          */
3067         remote_nodemaps = talloc_array(mem_ctx, struct ctdb_node_map *, nodemap->num);
3068         if (remote_nodemaps == NULL) {
3069                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate remote nodemap array\n"));
3070                 goto again;
3071         }
3072         for(i=0; i<nodemap->num; i++) {
3073                 remote_nodemaps[i] = NULL;
3074         }
3075         if (get_remote_nodemaps(ctdb, mem_ctx, nodemap, remote_nodemaps) != 0) {
3076                 DEBUG(DEBUG_ERR,(__location__ " Failed to read remote nodemaps\n"));
3077                 goto again;
3078         } 
3079
3080         /* verify that all other nodes have the same nodemap as we have
3081         */
3082         for (j=0; j<nodemap->num; j++) {
3083                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3084                         continue;
3085                 }
3086
3087                 if (remote_nodemaps[j] == NULL) {
3088                         DEBUG(DEBUG_ERR,(__location__ " Did not get a remote nodemap for node %d, restarting monitoring\n", j));
3089                         ctdb_set_culprit(rec, j);
3090
3091                         goto again;
3092                 }
3093
3094                 /* if the nodes disagree on how many nodes there are
3095                    then this is a good reason to try recovery
3096                  */
3097                 if (remote_nodemaps[j]->num != nodemap->num) {
3098                         DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different node count. %u vs %u of the local node\n",
3099                                   nodemap->nodes[j].pnn, remote_nodemaps[j]->num, nodemap->num));
3100                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3101                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3102                         goto again;
3103                 }
3104
3105                 /* if the nodes disagree on which nodes exist and are
3106                    active, then that is also a good reason to do recovery
3107                  */
3108                 for (i=0;i<nodemap->num;i++) {
3109                         if (remote_nodemaps[j]->nodes[i].pnn != nodemap->nodes[i].pnn) {
3110                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different nodemap pnn for %d (%u vs %u).\n", 
3111                                           nodemap->nodes[j].pnn, i, 
3112                                           remote_nodemaps[j]->nodes[i].pnn, nodemap->nodes[i].pnn));
3113                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3114                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3115                                             vnnmap);
3116                                 goto again;
3117                         }
3118                 }
3119
3120                 /* verify the flags are consistent
3121                 */
3122                 for (i=0; i<nodemap->num; i++) {
3123                         if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3124                                 continue;
3125                         }
3126                         
3127                         if (nodemap->nodes[i].flags != remote_nodemaps[j]->nodes[i].flags) {
3128                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different flags for node %u. It has 0x%02x vs our 0x%02x\n", 
3129                                   nodemap->nodes[j].pnn, 
3130                                   nodemap->nodes[i].pnn, 
3131                                   remote_nodemaps[j]->nodes[i].flags,
3132                                   nodemap->nodes[j].flags));
3133                                 if (i == j) {
3134                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from remote node %d for cluster update of its own flags\n", remote_nodemaps[j]->nodes[i].flags, j));
3135                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, remote_nodemaps[j]->nodes[i].flags);
3136                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3137                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3138                                                     vnnmap);
3139                                         goto again;
3140                                 } else {
3141                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from local recmaster node for cluster update of node %d flags\n", nodemap->nodes[i].flags, i));
3142                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, nodemap->nodes[i].flags);
3143                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3144                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3145                                                     vnnmap);
3146                                         goto again;
3147                                 }
3148                         }
3149                 }
3150         }
3151
3152
3153         /* there better be the same number of lmasters in the vnn map
3154            as there are active nodes or we will have to do a recovery
3155          */
3156         if (vnnmap->size != rec->num_active) {
3157                 DEBUG(DEBUG_ERR, (__location__ " The vnnmap count is different from the number of active nodes. %u vs %u\n", 
3158                           vnnmap->size, rec->num_active));
3159                 ctdb_set_culprit(rec, ctdb->pnn);
3160                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3161                 goto again;
3162         }
3163
3164         /* verify that all active nodes in the nodemap also exist in 
3165            the vnnmap.
3166          */
3167         for (j=0; j<nodemap->num; j++) {
3168                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3169                         continue;
3170                 }
3171                 if (nodemap->nodes[j].pnn == pnn) {
3172                         continue;
3173                 }
3174
3175                 for (i=0; i<vnnmap->size; i++) {
3176                         if (vnnmap->map[i] == nodemap->nodes[j].pnn) {
3177                                 break;
3178                         }
3179                 }
3180                 if (i == vnnmap->size) {
3181                         DEBUG(DEBUG_ERR, (__location__ " Node %u is active in the nodemap but did not exist in the vnnmap\n", 
3182                                   nodemap->nodes[j].pnn));
3183                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3184                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3185                         goto again;
3186                 }
3187         }
3188
3189         
3190         /* verify that all other nodes have the same vnnmap
3191            and are from the same generation
3192          */
3193         for (j=0; j<nodemap->num; j++) {
3194                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3195                         continue;
3196                 }
3197                 if (nodemap->nodes[j].pnn == pnn) {
3198                         continue;
3199                 }
3200
3201                 ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
3202                                           mem_ctx, &remote_vnnmap);
3203                 if (ret != 0) {
3204                         DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from remote node %u\n", 
3205                                   nodemap->nodes[j].pnn));
3206                         goto again;
3207                 }
3208
3209                 /* verify the vnnmap generation is the same */
3210                 if (vnnmap->generation != remote_vnnmap->generation) {
3211                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different generation of vnnmap. %u vs %u (ours)\n", 
3212                                   nodemap->nodes[j].pnn, remote_vnnmap->generation, vnnmap->generation));
3213                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3214                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3215                         goto again;
3216                 }
3217
3218                 /* verify the vnnmap size is the same */
3219                 if (vnnmap->size != remote_vnnmap->size) {
3220                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different size of vnnmap. %u vs %u (ours)\n", 
3221                                   nodemap->nodes[j].pnn, remote_vnnmap->size, vnnmap->size));
3222                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3223                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3224                         goto again;
3225                 }
3226
3227                 /* verify the vnnmap is the same */
3228                 for (i=0;i<vnnmap->size;i++) {
3229                         if (remote_vnnmap->map[i] != vnnmap->map[i]) {
3230                                 DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different vnnmap.\n", 
3231                                           nodemap->nodes[j].pnn));
3232                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3233                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3234                                             vnnmap);
3235                                 goto again;
3236                         }
3237                 }
3238         }
3239
3240         /* we might need to change who has what IP assigned */
3241         if (rec->need_takeover_run) {
3242                 rec->need_takeover_run = false;
3243
3244                 /* execute the "startrecovery" event script on all nodes */
3245                 ret = run_startrecovery_eventscript(rec, nodemap);
3246                 if (ret!=0) {
3247                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
3248                         ctdb_set_culprit(rec, ctdb->pnn);
3249                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3250                 }
3251
3252                 ret = ctdb_takeover_run(ctdb, nodemap);
3253                 if (ret != 0) {
3254                         DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses - starting recovery\n"));
3255                         ctdb_set_culprit(rec, ctdb->pnn);
3256                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3257                 }
3258
3259                 /* execute the "recovered" event script on all nodes */
3260                 ret = run_recovered_eventscript(ctdb, nodemap, "monitor_cluster");
3261 #if 0
3262 // we cant check whether the event completed successfully
3263 // since this script WILL fail if the node is in recovery mode
3264 // and if that race happens, the code here would just cause a second
3265 // cascading recovery.
3266                 if (ret!=0) {
3267                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Update of public ips failed.\n"));
3268                         ctdb_set_culprit(rec, ctdb->pnn);
3269                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3270                 }
3271 #endif
3272         }
3273
3274
3275         goto again;
3276
3277 }
3278
3279 /*
3280   event handler for when the main ctdbd dies
3281  */
3282 static void ctdb_recoverd_parent(struct event_context *ev, struct fd_event *fde, 
3283                                  uint16_t flags, void *private_data)
3284 {
3285         DEBUG(DEBUG_ALERT,("recovery daemon parent died - exiting\n"));
3286         _exit(1);
3287 }
3288
3289 /*
3290   called regularly to verify that the recovery daemon is still running
3291  */
3292 static void ctdb_check_recd(struct event_context *ev, struct timed_event *te, 
3293                               struct timeval yt, void *p)
3294 {
3295         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
3296
3297         if (kill(ctdb->recoverd_pid, 0) != 0) {
3298                 DEBUG(DEBUG_ERR,("Recovery daemon (pid:%d) is no longer running. Shutting down main daemon\n", (int)ctdb->recoverd_pid));
3299
3300                 ctdb_stop_recoverd(ctdb);
3301                 ctdb_stop_keepalive(ctdb);
3302                 ctdb_stop_monitoring(ctdb);
3303                 ctdb_release_all_ips(ctdb);
3304                 if (ctdb->methods != NULL) {
3305                         ctdb->methods->shutdown(ctdb);
3306                 }
3307                 ctdb_event_script(ctdb, CTDB_EVENT_SHUTDOWN);
3308
3309                 exit(10);       
3310         }
3311
3312         event_add_timed(ctdb->ev, ctdb, 
3313                         timeval_current_ofs(30, 0),
3314                         ctdb_check_recd, ctdb);
3315 }
3316
3317 static void recd_sig_child_handler(struct event_context *ev,
3318         struct signal_event *se, int signum, int count,
3319         void *dont_care, 
3320         void *private_data)
3321 {
3322 //      struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
3323         int status;
3324         pid_t pid = -1;
3325
3326         while (pid != 0) {
3327                 pid = waitpid(-1, &status, WNOHANG);
3328                 if (pid == -1) {
3329                         if (errno != ECHILD) {
3330                                 DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%s(%d)\n", strerror(errno),errno));
3331                         }
3332                         return;
3333                 }
3334                 if (pid > 0) {
3335                         DEBUG(DEBUG_DEBUG, ("RECD SIGCHLD from %d\n", (int)pid));
3336                 }
3337         }
3338 }
3339
3340 /*
3341   startup the recovery daemon as a child of the main ctdb daemon
3342  */
3343 int ctdb_start_recoverd(struct ctdb_context *ctdb)
3344 {
3345         int fd[2];
3346         struct signal_event *se;
3347
3348         if (pipe(fd) != 0) {
3349                 return -1;
3350         }
3351
3352         ctdb->ctdbd_pid = getpid();
3353
3354         ctdb->recoverd_pid = fork();
3355         if (ctdb->recoverd_pid == -1) {
3356                 return -1;
3357         }
3358         
3359         if (ctdb->recoverd_pid != 0) {
3360                 close(fd[0]);
3361                 event_add_timed(ctdb->ev, ctdb, 
3362                                 timeval_current_ofs(30, 0),
3363                                 ctdb_check_recd, ctdb);
3364                 return 0;
3365         }
3366
3367         close(fd[1]);
3368
3369         srandom(getpid() ^ time(NULL));
3370
3371         if (switch_from_server_to_client(ctdb) != 0) {
3372                 DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch recovery daemon into client mode. shutting down.\n"));
3373                 exit(1);
3374         }
3375
3376         DEBUG(DEBUG_NOTICE, (__location__ " Created PIPE FD:%d to recovery daemon\n", fd[0]));
3377
3378         event_add_fd(ctdb->ev, ctdb, fd[0], EVENT_FD_READ|EVENT_FD_AUTOCLOSE, 
3379                      ctdb_recoverd_parent, &fd[0]);     
3380
3381         /* set up a handler to pick up sigchld */
3382         se = event_add_signal(ctdb->ev, ctdb,
3383                                      SIGCHLD, 0,
3384                                      recd_sig_child_handler,
3385                                      ctdb);
3386         if (se == NULL) {
3387                 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD in recovery daemon\n"));
3388                 exit(1);
3389         }
3390
3391         monitor_cluster(ctdb);
3392
3393         DEBUG(DEBUG_ALERT,("ERROR: ctdb_recoverd finished!?\n"));
3394         return -1;
3395 }
3396
3397 /*
3398   shutdown the recovery daemon
3399  */
3400 void ctdb_stop_recoverd(struct ctdb_context *ctdb)
3401 {
3402         if (ctdb->recoverd_pid == 0) {
3403                 return;
3404         }
3405
3406         DEBUG(DEBUG_NOTICE,("Shutting down recovery daemon\n"));
3407         kill(ctdb->recoverd_pid, SIGTERM);
3408 }