d56fdb5fd6b5e503c885c58f77a85ea94f898856
[obnox/samba/samba-obnox.git] / ctdb / server / ctdb_recoverd.c
1 /* 
2    ctdb recovery daemon
3
4    Copyright (C) Ronnie Sahlberg  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "system/filesys.h"
22 #include "system/time.h"
23 #include "system/network.h"
24 #include "system/wait.h"
25 #include "popt.h"
26 #include "cmdline.h"
27 #include "../include/ctdb_client.h"
28 #include "../include/ctdb_private.h"
29 #include "db_wrap.h"
30 #include "dlinklist.h"
31
32
33 /* most recent reload all ips request we need to perform during the 
34    next monitoring loop
35 */
36 struct reloadips_all_reply *reload_all_ips_request = NULL;
37
38 /* list of "ctdb ipreallocate" processes to call back when we have
39    finished the takeover run.
40 */
41 struct ip_reallocate_list {
42         struct ip_reallocate_list *next;
43         struct rd_memdump_reply *rd;
44 };
45
46 struct ctdb_banning_state {
47         uint32_t count;
48         struct timeval last_reported_time;
49 };
50
51 /*
52   private state of recovery daemon
53  */
54 struct ctdb_recoverd {
55         struct ctdb_context *ctdb;
56         uint32_t recmaster;
57         uint32_t num_active;
58         uint32_t num_connected;
59         uint32_t last_culprit_node;
60         struct ctdb_node_map *nodemap;
61         struct timeval priority_time;
62         bool need_takeover_run;
63         bool need_recovery;
64         uint32_t node_flags;
65         struct timed_event *send_election_te;
66         struct timed_event *election_timeout;
67         struct vacuum_info *vacuum_info;
68         TALLOC_CTX *ip_reallocate_ctx;
69         struct ip_reallocate_list *reallocate_callers;
70         TALLOC_CTX *ip_check_disable_ctx;
71         struct ctdb_control_get_ifaces *ifaces;
72         TALLOC_CTX *deferred_rebalance_ctx;
73 };
74
75 #define CONTROL_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_timeout, 0)
76 #define MONITOR_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_interval, 0)
77
78 static void ctdb_restart_recd(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data);
79
80 /*
81   ban a node for a period of time
82  */
83 static void ctdb_ban_node(struct ctdb_recoverd *rec, uint32_t pnn, uint32_t ban_time)
84 {
85         int ret;
86         struct ctdb_context *ctdb = rec->ctdb;
87         struct ctdb_ban_time bantime;
88        
89         DEBUG(DEBUG_NOTICE,("Banning node %u for %u seconds\n", pnn, ban_time));
90
91         if (!ctdb_validate_pnn(ctdb, pnn)) {
92                 DEBUG(DEBUG_ERR,("Bad pnn %u in ctdb_ban_node\n", pnn));
93                 return;
94         }
95
96         bantime.pnn  = pnn;
97         bantime.time = ban_time;
98
99         ret = ctdb_ctrl_set_ban(ctdb, CONTROL_TIMEOUT(), pnn, &bantime);
100         if (ret != 0) {
101                 DEBUG(DEBUG_ERR,(__location__ " Failed to ban node %d\n", pnn));
102                 return;
103         }
104
105 }
106
107 enum monitor_result { MONITOR_OK, MONITOR_RECOVERY_NEEDED, MONITOR_ELECTION_NEEDED, MONITOR_FAILED};
108
109
110 /*
111   run the "recovered" eventscript on all nodes
112  */
113 static int run_recovered_eventscript(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, const char *caller)
114 {
115         TALLOC_CTX *tmp_ctx;
116         uint32_t *nodes;
117
118         tmp_ctx = talloc_new(ctdb);
119         CTDB_NO_MEMORY(ctdb, tmp_ctx);
120
121         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
122         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_END_RECOVERY,
123                                         nodes, 0,
124                                         CONTROL_TIMEOUT(), false, tdb_null,
125                                         NULL, NULL,
126                                         NULL) != 0) {
127                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event when called from %s\n", caller));
128
129                 talloc_free(tmp_ctx);
130                 return -1;
131         }
132
133         talloc_free(tmp_ctx);
134         return 0;
135 }
136
137 /*
138   remember the trouble maker
139  */
140 static void ctdb_set_culprit_count(struct ctdb_recoverd *rec, uint32_t culprit, uint32_t count)
141 {
142         struct ctdb_context *ctdb = talloc_get_type(rec->ctdb, struct ctdb_context);
143         struct ctdb_banning_state *ban_state;
144
145         if (culprit > ctdb->num_nodes) {
146                 DEBUG(DEBUG_ERR,("Trying to set culprit %d but num_nodes is %d\n", culprit, ctdb->num_nodes));
147                 return;
148         }
149
150         if (ctdb->nodes[culprit]->ban_state == NULL) {
151                 ctdb->nodes[culprit]->ban_state = talloc_zero(ctdb->nodes[culprit], struct ctdb_banning_state);
152                 CTDB_NO_MEMORY_VOID(ctdb, ctdb->nodes[culprit]->ban_state);
153
154                 
155         }
156         ban_state = ctdb->nodes[culprit]->ban_state;
157         if (timeval_elapsed(&ban_state->last_reported_time) > ctdb->tunable.recovery_grace_period) {
158                 /* this was the first time in a long while this node
159                    misbehaved so we will forgive any old transgressions.
160                 */
161                 ban_state->count = 0;
162         }
163
164         ban_state->count += count;
165         ban_state->last_reported_time = timeval_current();
166         rec->last_culprit_node = culprit;
167 }
168
169 /*
170   remember the trouble maker
171  */
172 static void ctdb_set_culprit(struct ctdb_recoverd *rec, uint32_t culprit)
173 {
174         ctdb_set_culprit_count(rec, culprit, 1);
175 }
176
177
178 /* this callback is called for every node that failed to execute the
179    start recovery event
180 */
181 static void startrecovery_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
182 {
183         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
184
185         DEBUG(DEBUG_ERR, (__location__ " Node %u failed the startrecovery event. Setting it as recovery fail culprit\n", node_pnn));
186
187         ctdb_set_culprit(rec, node_pnn);
188 }
189
190 /*
191   run the "startrecovery" eventscript on all nodes
192  */
193 static int run_startrecovery_eventscript(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
194 {
195         TALLOC_CTX *tmp_ctx;
196         uint32_t *nodes;
197         struct ctdb_context *ctdb = rec->ctdb;
198
199         tmp_ctx = talloc_new(ctdb);
200         CTDB_NO_MEMORY(ctdb, tmp_ctx);
201
202         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
203         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_START_RECOVERY,
204                                         nodes, 0,
205                                         CONTROL_TIMEOUT(), false, tdb_null,
206                                         NULL,
207                                         startrecovery_fail_callback,
208                                         rec) != 0) {
209                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event. Recovery failed.\n"));
210                 talloc_free(tmp_ctx);
211                 return -1;
212         }
213
214         talloc_free(tmp_ctx);
215         return 0;
216 }
217
218 static void async_getcap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
219 {
220         if ( (outdata.dsize != sizeof(uint32_t)) || (outdata.dptr == NULL) ) {
221                 DEBUG(DEBUG_ERR, (__location__ " Invalid length/pointer for getcap callback : %u %p\n",  (unsigned)outdata.dsize, outdata.dptr));
222                 return;
223         }
224         if (node_pnn < ctdb->num_nodes) {
225                 ctdb->nodes[node_pnn]->capabilities = *((uint32_t *)outdata.dptr);
226         }
227
228         if (node_pnn == ctdb->pnn) {
229                 ctdb->capabilities = ctdb->nodes[node_pnn]->capabilities;
230         }
231 }
232
233 /*
234   update the node capabilities for all connected nodes
235  */
236 static int update_capabilities(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
237 {
238         uint32_t *nodes;
239         TALLOC_CTX *tmp_ctx;
240
241         tmp_ctx = talloc_new(ctdb);
242         CTDB_NO_MEMORY(ctdb, tmp_ctx);
243
244         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
245         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_CAPABILITIES,
246                                         nodes, 0,
247                                         CONTROL_TIMEOUT(),
248                                         false, tdb_null,
249                                         async_getcap_callback, NULL,
250                                         NULL) != 0) {
251                 DEBUG(DEBUG_ERR, (__location__ " Failed to read node capabilities.\n"));
252                 talloc_free(tmp_ctx);
253                 return -1;
254         }
255
256         talloc_free(tmp_ctx);
257         return 0;
258 }
259
260 static void set_recmode_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
261 {
262         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
263
264         DEBUG(DEBUG_ERR,("Failed to freeze node %u during recovery. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
265         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
266 }
267
268 static void transaction_start_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
269 {
270         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
271
272         DEBUG(DEBUG_ERR,("Failed to start recovery transaction on node %u. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
273         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
274 }
275
276 /*
277   change recovery mode on all nodes
278  */
279 static int set_recovery_mode(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t rec_mode)
280 {
281         TDB_DATA data;
282         uint32_t *nodes;
283         TALLOC_CTX *tmp_ctx;
284
285         tmp_ctx = talloc_new(ctdb);
286         CTDB_NO_MEMORY(ctdb, tmp_ctx);
287
288         /* freeze all nodes */
289         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
290         if (rec_mode == CTDB_RECOVERY_ACTIVE) {
291                 int i;
292
293                 for (i=1; i<=NUM_DB_PRIORITIES; i++) {
294                         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
295                                                 nodes, i,
296                                                 CONTROL_TIMEOUT(),
297                                                 false, tdb_null,
298                                                 NULL,
299                                                 set_recmode_fail_callback,
300                                                 rec) != 0) {
301                                 DEBUG(DEBUG_ERR, (__location__ " Unable to freeze nodes. Recovery failed.\n"));
302                                 talloc_free(tmp_ctx);
303                                 return -1;
304                         }
305                 }
306         }
307
308
309         data.dsize = sizeof(uint32_t);
310         data.dptr = (unsigned char *)&rec_mode;
311
312         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMODE,
313                                         nodes, 0,
314                                         CONTROL_TIMEOUT(),
315                                         false, data,
316                                         NULL, NULL,
317                                         NULL) != 0) {
318                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode. Recovery failed.\n"));
319                 talloc_free(tmp_ctx);
320                 return -1;
321         }
322
323         talloc_free(tmp_ctx);
324         return 0;
325 }
326
327 /*
328   change recovery master on all node
329  */
330 static int set_recovery_master(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn)
331 {
332         TDB_DATA data;
333         TALLOC_CTX *tmp_ctx;
334         uint32_t *nodes;
335
336         tmp_ctx = talloc_new(ctdb);
337         CTDB_NO_MEMORY(ctdb, tmp_ctx);
338
339         data.dsize = sizeof(uint32_t);
340         data.dptr = (unsigned char *)&pnn;
341
342         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
343         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMASTER,
344                                         nodes, 0,
345                                         CONTROL_TIMEOUT(), false, data,
346                                         NULL, NULL,
347                                         NULL) != 0) {
348                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recmaster. Recovery failed.\n"));
349                 talloc_free(tmp_ctx);
350                 return -1;
351         }
352
353         talloc_free(tmp_ctx);
354         return 0;
355 }
356
357 /* update all remote nodes to use the same db priority that we have
358    this can fail if the remove node has not yet been upgraded to 
359    support this function, so we always return success and never fail
360    a recovery if this call fails.
361 */
362 static int update_db_priority_on_remote_nodes(struct ctdb_context *ctdb,
363         struct ctdb_node_map *nodemap, 
364         uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
365 {
366         int db;
367         uint32_t *nodes;
368
369         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
370
371         /* step through all local databases */
372         for (db=0; db<dbmap->num;db++) {
373                 TDB_DATA data;
374                 struct ctdb_db_priority db_prio;
375                 int ret;
376
377                 db_prio.db_id     = dbmap->dbs[db].dbid;
378                 ret = ctdb_ctrl_get_db_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, dbmap->dbs[db].dbid, &db_prio.priority);
379                 if (ret != 0) {
380                         DEBUG(DEBUG_ERR,(__location__ " Failed to read database priority from local node for db 0x%08x\n", dbmap->dbs[db].dbid));
381                         continue;
382                 }
383
384                 DEBUG(DEBUG_INFO,("Update DB priority for db 0x%08x to %u\n", dbmap->dbs[db].dbid, db_prio.priority)); 
385
386                 data.dptr  = (uint8_t *)&db_prio;
387                 data.dsize = sizeof(db_prio);
388
389                 if (ctdb_client_async_control(ctdb,
390                                         CTDB_CONTROL_SET_DB_PRIORITY,
391                                         nodes, 0,
392                                         CONTROL_TIMEOUT(), false, data,
393                                         NULL, NULL,
394                                         NULL) != 0) {
395                         DEBUG(DEBUG_ERR,(__location__ " Failed to set DB priority for 0x%08x\n", db_prio.db_id));
396                 }
397         }
398
399         return 0;
400 }                       
401
402 /*
403   ensure all other nodes have attached to any databases that we have
404  */
405 static int create_missing_remote_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
406                                            uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
407 {
408         int i, j, db, ret;
409         struct ctdb_dbid_map *remote_dbmap;
410
411         /* verify that all other nodes have all our databases */
412         for (j=0; j<nodemap->num; j++) {
413                 /* we dont need to ourself ourselves */
414                 if (nodemap->nodes[j].pnn == pnn) {
415                         continue;
416                 }
417                 /* dont check nodes that are unavailable */
418                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
419                         continue;
420                 }
421
422                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
423                                          mem_ctx, &remote_dbmap);
424                 if (ret != 0) {
425                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
426                         return -1;
427                 }
428
429                 /* step through all local databases */
430                 for (db=0; db<dbmap->num;db++) {
431                         const char *name;
432
433
434                         for (i=0;i<remote_dbmap->num;i++) {
435                                 if (dbmap->dbs[db].dbid == remote_dbmap->dbs[i].dbid) {
436                                         break;
437                                 }
438                         }
439                         /* the remote node already have this database */
440                         if (i!=remote_dbmap->num) {
441                                 continue;
442                         }
443                         /* ok so we need to create this database */
444                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), pnn, dbmap->dbs[db].dbid, 
445                                             mem_ctx, &name);
446                         if (ret != 0) {
447                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", pnn));
448                                 return -1;
449                         }
450                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
451                                            mem_ctx, name,
452                                            dbmap->dbs[db].flags & CTDB_DB_FLAGS_PERSISTENT);
453                         if (ret != 0) {
454                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create remote db:%s\n", name));
455                                 return -1;
456                         }
457                 }
458         }
459
460         return 0;
461 }
462
463
464 /*
465   ensure we are attached to any databases that anyone else is attached to
466  */
467 static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
468                                           uint32_t pnn, struct ctdb_dbid_map **dbmap, TALLOC_CTX *mem_ctx)
469 {
470         int i, j, db, ret;
471         struct ctdb_dbid_map *remote_dbmap;
472
473         /* verify that we have all database any other node has */
474         for (j=0; j<nodemap->num; j++) {
475                 /* we dont need to ourself ourselves */
476                 if (nodemap->nodes[j].pnn == pnn) {
477                         continue;
478                 }
479                 /* dont check nodes that are unavailable */
480                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
481                         continue;
482                 }
483
484                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
485                                          mem_ctx, &remote_dbmap);
486                 if (ret != 0) {
487                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
488                         return -1;
489                 }
490
491                 /* step through all databases on the remote node */
492                 for (db=0; db<remote_dbmap->num;db++) {
493                         const char *name;
494
495                         for (i=0;i<(*dbmap)->num;i++) {
496                                 if (remote_dbmap->dbs[db].dbid == (*dbmap)->dbs[i].dbid) {
497                                         break;
498                                 }
499                         }
500                         /* we already have this db locally */
501                         if (i!=(*dbmap)->num) {
502                                 continue;
503                         }
504                         /* ok so we need to create this database and
505                            rebuild dbmap
506                          */
507                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
508                                             remote_dbmap->dbs[db].dbid, mem_ctx, &name);
509                         if (ret != 0) {
510                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", 
511                                           nodemap->nodes[j].pnn));
512                                 return -1;
513                         }
514                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, name, 
515                                            remote_dbmap->dbs[db].flags & CTDB_DB_FLAGS_PERSISTENT);
516                         if (ret != 0) {
517                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create local db:%s\n", name));
518                                 return -1;
519                         }
520                         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, dbmap);
521                         if (ret != 0) {
522                                 DEBUG(DEBUG_ERR, (__location__ " Unable to reread dbmap on node %u\n", pnn));
523                                 return -1;
524                         }
525                 }
526         }
527
528         return 0;
529 }
530
531
532 /*
533   pull the remote database contents from one node into the recdb
534  */
535 static int pull_one_remote_database(struct ctdb_context *ctdb, uint32_t srcnode, 
536                                     struct tdb_wrap *recdb, uint32_t dbid)
537 {
538         int ret;
539         TDB_DATA outdata;
540         struct ctdb_marshall_buffer *reply;
541         struct ctdb_rec_data *rec;
542         int i;
543         TALLOC_CTX *tmp_ctx = talloc_new(recdb);
544
545         ret = ctdb_ctrl_pulldb(ctdb, srcnode, dbid, CTDB_LMASTER_ANY, tmp_ctx,
546                                CONTROL_TIMEOUT(), &outdata);
547         if (ret != 0) {
548                 DEBUG(DEBUG_ERR,(__location__ " Unable to copy db from node %u\n", srcnode));
549                 talloc_free(tmp_ctx);
550                 return -1;
551         }
552
553         reply = (struct ctdb_marshall_buffer *)outdata.dptr;
554
555         if (outdata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
556                 DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
557                 talloc_free(tmp_ctx);
558                 return -1;
559         }
560         
561         rec = (struct ctdb_rec_data *)&reply->data[0];
562         
563         for (i=0;
564              i<reply->count;
565              rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec), i++) {
566                 TDB_DATA key, data;
567                 struct ctdb_ltdb_header *hdr;
568                 TDB_DATA existing;
569                 
570                 key.dptr = &rec->data[0];
571                 key.dsize = rec->keylen;
572                 data.dptr = &rec->data[key.dsize];
573                 data.dsize = rec->datalen;
574                 
575                 hdr = (struct ctdb_ltdb_header *)data.dptr;
576
577                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
578                         DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
579                         talloc_free(tmp_ctx);
580                         return -1;
581                 }
582
583                 /* fetch the existing record, if any */
584                 existing = tdb_fetch(recdb->tdb, key);
585                 
586                 if (existing.dptr != NULL) {
587                         struct ctdb_ltdb_header header;
588                         if (existing.dsize < sizeof(struct ctdb_ltdb_header)) {
589                                 DEBUG(DEBUG_CRIT,(__location__ " Bad record size %u from node %u\n", 
590                                          (unsigned)existing.dsize, srcnode));
591                                 free(existing.dptr);
592                                 talloc_free(tmp_ctx);
593                                 return -1;
594                         }
595                         header = *(struct ctdb_ltdb_header *)existing.dptr;
596                         free(existing.dptr);
597                         if (!(header.rsn < hdr->rsn ||
598                               (header.dmaster != ctdb->recovery_master && header.rsn == hdr->rsn))) {
599                                 continue;
600                         }
601                 }
602                 
603                 if (tdb_store(recdb->tdb, key, data, TDB_REPLACE) != 0) {
604                         DEBUG(DEBUG_CRIT,(__location__ " Failed to store record\n"));
605                         talloc_free(tmp_ctx);
606                         return -1;                              
607                 }
608         }
609
610         talloc_free(tmp_ctx);
611
612         return 0;
613 }
614
615
616 struct pull_seqnum_cbdata {
617         int failed;
618         uint32_t pnn;
619         uint64_t seqnum;
620 };
621
622 static void pull_seqnum_cb(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
623 {
624         struct pull_seqnum_cbdata *cb_data = talloc_get_type(callback_data, struct pull_seqnum_cbdata);
625         uint64_t seqnum;
626
627         if (cb_data->failed != 0) {
628                 DEBUG(DEBUG_ERR, ("Got seqnum from node %d but we have already failed the entire operation\n", node_pnn));
629                 return;
630         }
631
632         if (res != 0) {
633                 DEBUG(DEBUG_ERR, ("Error when pulling seqnum from node %d\n", node_pnn));
634                 cb_data->failed = 1;
635                 return;
636         }
637
638         if (outdata.dsize != sizeof(uint64_t)) {
639                 DEBUG(DEBUG_ERR, ("Error when reading pull seqnum from node %d, got %d bytes but expected %d\n", node_pnn, (int)outdata.dsize, (int)sizeof(uint64_t)));
640                 cb_data->failed = -1;
641                 return;
642         }
643
644         seqnum = *((uint64_t *)outdata.dptr);
645
646         if (seqnum > cb_data->seqnum) {
647                 cb_data->seqnum = seqnum;
648                 cb_data->pnn = node_pnn;
649         }
650 }
651
652 static void pull_seqnum_fail_cb(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
653 {
654         struct pull_seqnum_cbdata *cb_data = talloc_get_type(callback_data, struct pull_seqnum_cbdata);
655
656         DEBUG(DEBUG_ERR, ("Failed to pull db seqnum from node %d\n", node_pnn));
657         cb_data->failed = 1;
658 }
659
660 static int pull_highest_seqnum_pdb(struct ctdb_context *ctdb,
661                                 struct ctdb_recoverd *rec, 
662                                 struct ctdb_node_map *nodemap, 
663                                 struct tdb_wrap *recdb, uint32_t dbid)
664 {
665         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
666         uint32_t *nodes;
667         TDB_DATA data;
668         uint32_t outdata[2];
669         struct pull_seqnum_cbdata *cb_data;
670
671         DEBUG(DEBUG_NOTICE, ("Scan for highest seqnum pdb for db:0x%08x\n", dbid));
672
673         outdata[0] = dbid;
674         outdata[1] = 0;
675
676         data.dsize = sizeof(outdata);
677         data.dptr  = (uint8_t *)&outdata[0];
678
679         cb_data = talloc(tmp_ctx, struct pull_seqnum_cbdata);
680         if (cb_data == NULL) {
681                 DEBUG(DEBUG_ERR, ("Failed to allocate pull highest seqnum cb_data structure\n"));
682                 talloc_free(tmp_ctx);
683                 return -1;
684         }
685
686         cb_data->failed = 0;
687         cb_data->pnn    = -1;
688         cb_data->seqnum = 0;
689         
690         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
691         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_DB_SEQNUM,
692                                         nodes, 0,
693                                         CONTROL_TIMEOUT(), false, data,
694                                         pull_seqnum_cb,
695                                         pull_seqnum_fail_cb,
696                                         cb_data) != 0) {
697                 DEBUG(DEBUG_ERR, (__location__ " Failed to run async GET_DB_SEQNUM\n"));
698
699                 talloc_free(tmp_ctx);
700                 return -1;
701         }
702
703         if (cb_data->failed != 0) {
704                 DEBUG(DEBUG_NOTICE, ("Failed to pull sequence numbers for DB 0x%08x\n", dbid));
705                 talloc_free(tmp_ctx);
706                 return -1;
707         }
708
709         if (cb_data->seqnum == 0 || cb_data->pnn == -1) {
710                 DEBUG(DEBUG_NOTICE, ("Failed to find a node with highest sequence numbers for DB 0x%08x\n", dbid));
711                 talloc_free(tmp_ctx);
712                 return -1;
713         }
714
715         DEBUG(DEBUG_NOTICE, ("Pull persistent db:0x%08x from node %d with highest seqnum:%lld\n", dbid, cb_data->pnn, (long long)cb_data->seqnum)); 
716
717         if (pull_one_remote_database(ctdb, cb_data->pnn, recdb, dbid) != 0) {
718                 DEBUG(DEBUG_ERR, ("Failed to pull higest seqnum database 0x%08x from node %d\n", dbid, cb_data->pnn));
719                 talloc_free(tmp_ctx);
720                 return -1;
721         }
722
723         talloc_free(tmp_ctx);
724         return 0;
725 }
726
727
728 /*
729   pull all the remote database contents into the recdb
730  */
731 static int pull_remote_database(struct ctdb_context *ctdb,
732                                 struct ctdb_recoverd *rec, 
733                                 struct ctdb_node_map *nodemap, 
734                                 struct tdb_wrap *recdb, uint32_t dbid,
735                                 bool persistent)
736 {
737         int j;
738
739         if (persistent && ctdb->tunable.recover_pdb_by_seqnum != 0) {
740                 int ret;
741                 ret = pull_highest_seqnum_pdb(ctdb, rec, nodemap, recdb, dbid);
742                 if (ret == 0) {
743                         return 0;
744                 }
745         }
746
747         /* pull all records from all other nodes across onto this node
748            (this merges based on rsn)
749         */
750         for (j=0; j<nodemap->num; j++) {
751                 /* dont merge from nodes that are unavailable */
752                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
753                         continue;
754                 }
755                 if (pull_one_remote_database(ctdb, nodemap->nodes[j].pnn, recdb, dbid) != 0) {
756                         DEBUG(DEBUG_ERR,(__location__ " Failed to pull remote database from node %u\n", 
757                                  nodemap->nodes[j].pnn));
758                         ctdb_set_culprit_count(rec, nodemap->nodes[j].pnn, nodemap->num);
759                         return -1;
760                 }
761         }
762         
763         return 0;
764 }
765
766
767 /*
768   update flags on all active nodes
769  */
770 static int update_flags_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn, uint32_t flags)
771 {
772         int ret;
773
774         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, flags, ~flags);
775                 if (ret != 0) {
776                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
777                 return -1;
778         }
779
780         return 0;
781 }
782
783 /*
784   ensure all nodes have the same vnnmap we do
785  */
786 static int update_vnnmap_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
787                                       uint32_t pnn, struct ctdb_vnn_map *vnnmap, TALLOC_CTX *mem_ctx)
788 {
789         int j, ret;
790
791         /* push the new vnn map out to all the nodes */
792         for (j=0; j<nodemap->num; j++) {
793                 /* dont push to nodes that are unavailable */
794                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
795                         continue;
796                 }
797
798                 ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, mem_ctx, vnnmap);
799                 if (ret != 0) {
800                         DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
801                         return -1;
802                 }
803         }
804
805         return 0;
806 }
807
808
809 struct vacuum_info {
810         struct vacuum_info *next, *prev;
811         struct ctdb_recoverd *rec;
812         uint32_t srcnode;
813         struct ctdb_db_context *ctdb_db;
814         struct ctdb_marshall_buffer *recs;
815         struct ctdb_rec_data *r;
816 };
817
818 static void vacuum_fetch_next(struct vacuum_info *v);
819
820 /*
821   called when a vacuum fetch has completed - just free it and do the next one
822  */
823 static void vacuum_fetch_callback(struct ctdb_client_call_state *state)
824 {
825         struct vacuum_info *v = talloc_get_type(state->async.private_data, struct vacuum_info);
826         talloc_free(state);
827         vacuum_fetch_next(v);
828 }
829
830
831 /*
832   process the next element from the vacuum list
833 */
834 static void vacuum_fetch_next(struct vacuum_info *v)
835 {
836         struct ctdb_call call;
837         struct ctdb_rec_data *r;
838
839         while (v->recs->count) {
840                 struct ctdb_client_call_state *state;
841                 TDB_DATA data;
842                 struct ctdb_ltdb_header *hdr;
843
844                 ZERO_STRUCT(call);
845                 call.call_id = CTDB_NULL_FUNC;
846                 call.flags = CTDB_IMMEDIATE_MIGRATION;
847                 call.flags |= CTDB_CALL_FLAG_VACUUM_MIGRATION;
848
849                 r = v->r;
850                 v->r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
851                 v->recs->count--;
852
853                 call.key.dptr = &r->data[0];
854                 call.key.dsize = r->keylen;
855
856                 /* ensure we don't block this daemon - just skip a record if we can't get
857                    the chainlock */
858                 if (tdb_chainlock_nonblock(v->ctdb_db->ltdb->tdb, call.key) != 0) {
859                         continue;
860                 }
861
862                 data = tdb_fetch(v->ctdb_db->ltdb->tdb, call.key);
863                 if (data.dptr == NULL) {
864                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
865                         continue;
866                 }
867
868                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
869                         free(data.dptr);
870                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
871                         continue;
872                 }
873                 
874                 hdr = (struct ctdb_ltdb_header *)data.dptr;
875                 if (hdr->dmaster == v->rec->ctdb->pnn) {
876                         /* its already local */
877                         free(data.dptr);
878                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
879                         continue;
880                 }
881
882                 free(data.dptr);
883
884                 state = ctdb_call_send(v->ctdb_db, &call);
885                 tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
886                 if (state == NULL) {
887                         DEBUG(DEBUG_ERR,(__location__ " Failed to setup vacuum fetch call\n"));
888                         talloc_free(v);
889                         return;
890                 }
891                 state->async.fn = vacuum_fetch_callback;
892                 state->async.private_data = v;
893                 return;
894         }
895
896         talloc_free(v);
897 }
898
899
900 /*
901   destroy a vacuum info structure
902  */
903 static int vacuum_info_destructor(struct vacuum_info *v)
904 {
905         DLIST_REMOVE(v->rec->vacuum_info, v);
906         return 0;
907 }
908
909
910 /*
911   handler for vacuum fetch
912 */
913 static void vacuum_fetch_handler(struct ctdb_context *ctdb, uint64_t srvid, 
914                                  TDB_DATA data, void *private_data)
915 {
916         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
917         struct ctdb_marshall_buffer *recs;
918         int ret, i;
919         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
920         const char *name;
921         struct ctdb_dbid_map *dbmap=NULL;
922         bool persistent = false;
923         struct ctdb_db_context *ctdb_db;
924         struct ctdb_rec_data *r;
925         uint32_t srcnode;
926         struct vacuum_info *v;
927
928         recs = (struct ctdb_marshall_buffer *)data.dptr;
929         r = (struct ctdb_rec_data *)&recs->data[0];
930
931         if (recs->count == 0) {
932                 talloc_free(tmp_ctx);
933                 return;
934         }
935
936         srcnode = r->reqid;
937
938         for (v=rec->vacuum_info;v;v=v->next) {
939                 if (srcnode == v->srcnode && recs->db_id == v->ctdb_db->db_id) {
940                         /* we're already working on records from this node */
941                         talloc_free(tmp_ctx);
942                         return;
943                 }
944         }
945
946         /* work out if the database is persistent */
947         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &dbmap);
948         if (ret != 0) {
949                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from local node\n"));
950                 talloc_free(tmp_ctx);
951                 return;
952         }
953
954         for (i=0;i<dbmap->num;i++) {
955                 if (dbmap->dbs[i].dbid == recs->db_id) {
956                         persistent = dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT;
957                         break;
958                 }
959         }
960         if (i == dbmap->num) {
961                 DEBUG(DEBUG_ERR, (__location__ " Unable to find db_id 0x%x on local node\n", recs->db_id));
962                 talloc_free(tmp_ctx);
963                 return;         
964         }
965
966         /* find the name of this database */
967         if (ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, recs->db_id, tmp_ctx, &name) != 0) {
968                 DEBUG(DEBUG_ERR,(__location__ " Failed to get name of db 0x%x\n", recs->db_id));
969                 talloc_free(tmp_ctx);
970                 return;
971         }
972
973         /* attach to it */
974         ctdb_db = ctdb_attach(ctdb, CONTROL_TIMEOUT(), name, persistent, 0);
975         if (ctdb_db == NULL) {
976                 DEBUG(DEBUG_ERR,(__location__ " Failed to attach to database '%s'\n", name));
977                 talloc_free(tmp_ctx);
978                 return;
979         }
980
981         v = talloc_zero(rec, struct vacuum_info);
982         if (v == NULL) {
983                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
984                 talloc_free(tmp_ctx);
985                 return;
986         }
987
988         v->rec = rec;
989         v->srcnode = srcnode;
990         v->ctdb_db = ctdb_db;
991         v->recs = talloc_memdup(v, recs, data.dsize);
992         if (v->recs == NULL) {
993                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
994                 talloc_free(v);
995                 talloc_free(tmp_ctx);
996                 return;         
997         }
998         v->r =  (struct ctdb_rec_data *)&v->recs->data[0];
999
1000         DLIST_ADD(rec->vacuum_info, v);
1001
1002         talloc_set_destructor(v, vacuum_info_destructor);
1003
1004         vacuum_fetch_next(v);
1005         talloc_free(tmp_ctx);
1006 }
1007
1008
1009 /*
1010   called when ctdb_wait_timeout should finish
1011  */
1012 static void ctdb_wait_handler(struct event_context *ev, struct timed_event *te, 
1013                               struct timeval yt, void *p)
1014 {
1015         uint32_t *timed_out = (uint32_t *)p;
1016         (*timed_out) = 1;
1017 }
1018
1019 /*
1020   wait for a given number of seconds
1021  */
1022 static void ctdb_wait_timeout(struct ctdb_context *ctdb, double secs)
1023 {
1024         uint32_t timed_out = 0;
1025         time_t usecs = (secs - (time_t)secs) * 1000000;
1026         event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(secs, usecs), ctdb_wait_handler, &timed_out);
1027         while (!timed_out) {
1028                 event_loop_once(ctdb->ev);
1029         }
1030 }
1031
1032 /*
1033   called when an election times out (ends)
1034  */
1035 static void ctdb_election_timeout(struct event_context *ev, struct timed_event *te, 
1036                                   struct timeval t, void *p)
1037 {
1038         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1039         rec->election_timeout = NULL;
1040         fast_start = false;
1041
1042         DEBUG(DEBUG_WARNING,(__location__ " Election timed out\n"));
1043 }
1044
1045
1046 /*
1047   wait for an election to finish. It finished election_timeout seconds after
1048   the last election packet is received
1049  */
1050 static void ctdb_wait_election(struct ctdb_recoverd *rec)
1051 {
1052         struct ctdb_context *ctdb = rec->ctdb;
1053         while (rec->election_timeout) {
1054                 event_loop_once(ctdb->ev);
1055         }
1056 }
1057
1058 /*
1059   Update our local flags from all remote connected nodes. 
1060   This is only run when we are or we belive we are the recovery master
1061  */
1062 static int update_local_flags(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
1063 {
1064         int j;
1065         struct ctdb_context *ctdb = rec->ctdb;
1066         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
1067
1068         /* get the nodemap for all active remote nodes and verify
1069            they are the same as for this node
1070          */
1071         for (j=0; j<nodemap->num; j++) {
1072                 struct ctdb_node_map *remote_nodemap=NULL;
1073                 int ret;
1074
1075                 if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
1076                         continue;
1077                 }
1078                 if (nodemap->nodes[j].pnn == ctdb->pnn) {
1079                         continue;
1080                 }
1081
1082                 ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
1083                                            mem_ctx, &remote_nodemap);
1084                 if (ret != 0) {
1085                         DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from remote node %u\n", 
1086                                   nodemap->nodes[j].pnn));
1087                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
1088                         talloc_free(mem_ctx);
1089                         return MONITOR_FAILED;
1090                 }
1091                 if (nodemap->nodes[j].flags != remote_nodemap->nodes[j].flags) {
1092                         /* We should tell our daemon about this so it
1093                            updates its flags or else we will log the same 
1094                            message again in the next iteration of recovery.
1095                            Since we are the recovery master we can just as
1096                            well update the flags on all nodes.
1097                         */
1098                         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, nodemap->nodes[j].flags, ~nodemap->nodes[j].flags);
1099                         if (ret != 0) {
1100                                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
1101                                 return -1;
1102                         }
1103
1104                         /* Update our local copy of the flags in the recovery
1105                            daemon.
1106                         */
1107                         DEBUG(DEBUG_NOTICE,("Remote node %u had flags 0x%x, local had 0x%x - updating local\n",
1108                                  nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags,
1109                                  nodemap->nodes[j].flags));
1110                         nodemap->nodes[j].flags = remote_nodemap->nodes[j].flags;
1111                 }
1112                 talloc_free(remote_nodemap);
1113         }
1114         talloc_free(mem_ctx);
1115         return MONITOR_OK;
1116 }
1117
1118
1119 /* Create a new random generation ip. 
1120    The generation id can not be the INVALID_GENERATION id
1121 */
1122 static uint32_t new_generation(void)
1123 {
1124         uint32_t generation;
1125
1126         while (1) {
1127                 generation = random();
1128
1129                 if (generation != INVALID_GENERATION) {
1130                         break;
1131                 }
1132         }
1133
1134         return generation;
1135 }
1136
1137
1138 /*
1139   create a temporary working database
1140  */
1141 static struct tdb_wrap *create_recdb(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx)
1142 {
1143         char *name;
1144         struct tdb_wrap *recdb;
1145         unsigned tdb_flags;
1146
1147         /* open up the temporary recovery database */
1148         name = talloc_asprintf(mem_ctx, "%s/recdb.tdb.%u",
1149                                ctdb->db_directory_state,
1150                                ctdb->pnn);
1151         if (name == NULL) {
1152                 return NULL;
1153         }
1154         unlink(name);
1155
1156         tdb_flags = TDB_NOLOCK;
1157         if (ctdb->valgrinding) {
1158                 tdb_flags |= TDB_NOMMAP;
1159         }
1160         tdb_flags |= TDB_DISALLOW_NESTING;
1161
1162         recdb = tdb_wrap_open(mem_ctx, name, ctdb->tunable.database_hash_size, 
1163                               tdb_flags, O_RDWR|O_CREAT|O_EXCL, 0600);
1164         if (recdb == NULL) {
1165                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create temp recovery database '%s'\n", name));
1166         }
1167
1168         talloc_free(name);
1169
1170         return recdb;
1171 }
1172
1173
1174 /* 
1175    a traverse function for pulling all relevent records from recdb
1176  */
1177 struct recdb_data {
1178         struct ctdb_context *ctdb;
1179         struct ctdb_marshall_buffer *recdata;
1180         uint32_t len;
1181         bool failed;
1182         bool persistent;
1183 };
1184
1185 static int traverse_recdb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
1186 {
1187         struct recdb_data *params = (struct recdb_data *)p;
1188         struct ctdb_rec_data *rec;
1189         struct ctdb_ltdb_header *hdr;
1190
1191         /* skip empty records */
1192         if (data.dsize <= sizeof(struct ctdb_ltdb_header)) {
1193                 return 0;
1194         }
1195
1196         /* update the dmaster field to point to us */
1197         hdr = (struct ctdb_ltdb_header *)data.dptr;
1198         if (!params->persistent) {
1199                 hdr->dmaster = params->ctdb->pnn;
1200                 hdr->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
1201         }
1202
1203         /* add the record to the blob ready to send to the nodes */
1204         rec = ctdb_marshall_record(params->recdata, 0, key, NULL, data);
1205         if (rec == NULL) {
1206                 params->failed = true;
1207                 return -1;
1208         }
1209         params->recdata = talloc_realloc_size(NULL, params->recdata, rec->length + params->len);
1210         if (params->recdata == NULL) {
1211                 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata to %u (%u records)\n", 
1212                          rec->length + params->len, params->recdata->count));
1213                 params->failed = true;
1214                 return -1;
1215         }
1216         params->recdata->count++;
1217         memcpy(params->len+(uint8_t *)params->recdata, rec, rec->length);
1218         params->len += rec->length;
1219         talloc_free(rec);
1220
1221         return 0;
1222 }
1223
1224 /*
1225   push the recdb database out to all nodes
1226  */
1227 static int push_recdb_database(struct ctdb_context *ctdb, uint32_t dbid,
1228                                bool persistent,
1229                                struct tdb_wrap *recdb, struct ctdb_node_map *nodemap)
1230 {
1231         struct recdb_data params;
1232         struct ctdb_marshall_buffer *recdata;
1233         TDB_DATA outdata;
1234         TALLOC_CTX *tmp_ctx;
1235         uint32_t *nodes;
1236
1237         tmp_ctx = talloc_new(ctdb);
1238         CTDB_NO_MEMORY(ctdb, tmp_ctx);
1239
1240         recdata = talloc_zero(recdb, struct ctdb_marshall_buffer);
1241         CTDB_NO_MEMORY(ctdb, recdata);
1242
1243         recdata->db_id = dbid;
1244
1245         params.ctdb = ctdb;
1246         params.recdata = recdata;
1247         params.len = offsetof(struct ctdb_marshall_buffer, data);
1248         params.failed = false;
1249         params.persistent = persistent;
1250
1251         if (tdb_traverse_read(recdb->tdb, traverse_recdb, &params) == -1) {
1252                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1253                 talloc_free(params.recdata);
1254                 talloc_free(tmp_ctx);
1255                 return -1;
1256         }
1257
1258         if (params.failed) {
1259                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1260                 talloc_free(params.recdata);
1261                 talloc_free(tmp_ctx);
1262                 return -1;              
1263         }
1264
1265         recdata = params.recdata;
1266
1267         outdata.dptr = (void *)recdata;
1268         outdata.dsize = params.len;
1269
1270         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
1271         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_PUSH_DB,
1272                                         nodes, 0,
1273                                         CONTROL_TIMEOUT(), false, outdata,
1274                                         NULL, NULL,
1275                                         NULL) != 0) {
1276                 DEBUG(DEBUG_ERR,(__location__ " Failed to push recdb records to nodes for db 0x%x\n", dbid));
1277                 talloc_free(recdata);
1278                 talloc_free(tmp_ctx);
1279                 return -1;
1280         }
1281
1282         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pushed remote database 0x%x of size %u\n", 
1283                   dbid, recdata->count));
1284
1285         talloc_free(recdata);
1286         talloc_free(tmp_ctx);
1287
1288         return 0;
1289 }
1290
1291
1292 /*
1293   go through a full recovery on one database 
1294  */
1295 static int recover_database(struct ctdb_recoverd *rec, 
1296                             TALLOC_CTX *mem_ctx,
1297                             uint32_t dbid,
1298                             bool persistent,
1299                             uint32_t pnn, 
1300                             struct ctdb_node_map *nodemap,
1301                             uint32_t transaction_id)
1302 {
1303         struct tdb_wrap *recdb;
1304         int ret;
1305         struct ctdb_context *ctdb = rec->ctdb;
1306         TDB_DATA data;
1307         struct ctdb_control_wipe_database w;
1308         uint32_t *nodes;
1309
1310         recdb = create_recdb(ctdb, mem_ctx);
1311         if (recdb == NULL) {
1312                 return -1;
1313         }
1314
1315         /* pull all remote databases onto the recdb */
1316         ret = pull_remote_database(ctdb, rec, nodemap, recdb, dbid, persistent);
1317         if (ret != 0) {
1318                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull remote database 0x%x\n", dbid));
1319                 return -1;
1320         }
1321
1322         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pulled remote database 0x%x\n", dbid));
1323
1324         /* wipe all the remote databases. This is safe as we are in a transaction */
1325         w.db_id = dbid;
1326         w.transaction_id = transaction_id;
1327
1328         data.dptr = (void *)&w;
1329         data.dsize = sizeof(w);
1330
1331         nodes = list_of_active_nodes(ctdb, nodemap, recdb, true);
1332         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
1333                                         nodes, 0,
1334                                         CONTROL_TIMEOUT(), false, data,
1335                                         NULL, NULL,
1336                                         NULL) != 0) {
1337                 DEBUG(DEBUG_ERR, (__location__ " Unable to wipe database. Recovery failed.\n"));
1338                 talloc_free(recdb);
1339                 return -1;
1340         }
1341         
1342         /* push out the correct database. This sets the dmaster and skips 
1343            the empty records */
1344         ret = push_recdb_database(ctdb, dbid, persistent, recdb, nodemap);
1345         if (ret != 0) {
1346                 talloc_free(recdb);
1347                 return -1;
1348         }
1349
1350         /* all done with this database */
1351         talloc_free(recdb);
1352
1353         return 0;
1354 }
1355
1356 /*
1357   reload the nodes file 
1358 */
1359 static void reload_nodes_file(struct ctdb_context *ctdb)
1360 {
1361         ctdb->nodes = NULL;
1362         ctdb_load_nodes_file(ctdb);
1363 }
1364
1365 static int ctdb_reload_remote_public_ips(struct ctdb_context *ctdb,
1366                                          struct ctdb_recoverd *rec,
1367                                          struct ctdb_node_map *nodemap,
1368                                          uint32_t *culprit)
1369 {
1370         int j;
1371         int ret;
1372
1373         if (ctdb->num_nodes != nodemap->num) {
1374                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) invalid param\n",
1375                                   ctdb->num_nodes, nodemap->num));
1376                 if (culprit) {
1377                         *culprit = ctdb->pnn;
1378                 }
1379                 return -1;
1380         }
1381
1382         for (j=0; j<nodemap->num; j++) {
1383                 /* release any existing data */
1384                 if (ctdb->nodes[j]->known_public_ips) {
1385                         talloc_free(ctdb->nodes[j]->known_public_ips);
1386                         ctdb->nodes[j]->known_public_ips = NULL;
1387                 }
1388                 if (ctdb->nodes[j]->available_public_ips) {
1389                         talloc_free(ctdb->nodes[j]->available_public_ips);
1390                         ctdb->nodes[j]->available_public_ips = NULL;
1391                 }
1392
1393                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1394                         continue;
1395                 }
1396
1397                 /* grab a new shiny list of public ips from the node */
1398                 ret = ctdb_ctrl_get_public_ips_flags(ctdb,
1399                                         CONTROL_TIMEOUT(),
1400                                         ctdb->nodes[j]->pnn,
1401                                         ctdb->nodes,
1402                                         0,
1403                                         &ctdb->nodes[j]->known_public_ips);
1404                 if (ret != 0) {
1405                         DEBUG(DEBUG_ERR,("Failed to read known public ips from node : %u\n",
1406                                 ctdb->nodes[j]->pnn));
1407                         if (culprit) {
1408                                 *culprit = ctdb->nodes[j]->pnn;
1409                         }
1410                         return -1;
1411                 }
1412
1413                 if (ctdb->do_checkpublicip) {
1414                         if (rec->ip_check_disable_ctx == NULL) {
1415                                 if (verify_remote_ip_allocation(ctdb, ctdb->nodes[j]->known_public_ips)) {
1416                                         DEBUG(DEBUG_ERR,("Node %d has inconsistent public ip allocation and needs update.\n", ctdb->nodes[j]->pnn));
1417                                         rec->need_takeover_run = true;
1418                                 }
1419                         }
1420                 }
1421
1422                 /* grab a new shiny list of public ips from the node */
1423                 ret = ctdb_ctrl_get_public_ips_flags(ctdb,
1424                                         CONTROL_TIMEOUT(),
1425                                         ctdb->nodes[j]->pnn,
1426                                         ctdb->nodes,
1427                                         CTDB_PUBLIC_IP_FLAGS_ONLY_AVAILABLE,
1428                                         &ctdb->nodes[j]->available_public_ips);
1429                 if (ret != 0) {
1430                         DEBUG(DEBUG_ERR,("Failed to read available public ips from node : %u\n",
1431                                 ctdb->nodes[j]->pnn));
1432                         if (culprit) {
1433                                 *culprit = ctdb->nodes[j]->pnn;
1434                         }
1435                         return -1;
1436                 }
1437         }
1438
1439         return 0;
1440 }
1441
1442 /* when we start a recovery, make sure all nodes use the same reclock file
1443    setting
1444 */
1445 static int sync_recovery_lock_file_across_cluster(struct ctdb_recoverd *rec)
1446 {
1447         struct ctdb_context *ctdb = rec->ctdb;
1448         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
1449         TDB_DATA data;
1450         uint32_t *nodes;
1451
1452         if (ctdb->recovery_lock_file == NULL) {
1453                 data.dptr  = NULL;
1454                 data.dsize = 0;
1455         } else {
1456                 data.dsize = strlen(ctdb->recovery_lock_file) + 1;
1457                 data.dptr  = (uint8_t *)ctdb->recovery_lock_file;
1458         }
1459
1460         nodes = list_of_active_nodes(ctdb, rec->nodemap, tmp_ctx, true);
1461         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECLOCK_FILE,
1462                                         nodes, 0,
1463                                         CONTROL_TIMEOUT(),
1464                                         false, data,
1465                                         NULL, NULL,
1466                                         rec) != 0) {
1467                 DEBUG(DEBUG_ERR, (__location__ " Failed to sync reclock file settings\n"));
1468                 talloc_free(tmp_ctx);
1469                 return -1;
1470         }
1471
1472         talloc_free(tmp_ctx);
1473         return 0;
1474 }
1475
1476
1477 /*
1478   we are the recmaster, and recovery is needed - start a recovery run
1479  */
1480 static int do_recovery(struct ctdb_recoverd *rec, 
1481                        TALLOC_CTX *mem_ctx, uint32_t pnn,
1482                        struct ctdb_node_map *nodemap, struct ctdb_vnn_map *vnnmap)
1483 {
1484         struct ctdb_context *ctdb = rec->ctdb;
1485         int i, j, ret;
1486         uint32_t generation;
1487         struct ctdb_dbid_map *dbmap;
1488         TDB_DATA data;
1489         uint32_t *nodes;
1490         struct timeval start_time;
1491         uint32_t culprit = (uint32_t)-1;
1492
1493         DEBUG(DEBUG_NOTICE, (__location__ " Starting do_recovery\n"));
1494
1495         /* if recovery fails, force it again */
1496         rec->need_recovery = true;
1497
1498         for (i=0; i<ctdb->num_nodes; i++) {
1499                 struct ctdb_banning_state *ban_state;
1500
1501                 if (ctdb->nodes[i]->ban_state == NULL) {
1502                         continue;
1503                 }
1504                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
1505                 if (ban_state->count < 2*ctdb->num_nodes) {
1506                         continue;
1507                 }
1508                 DEBUG(DEBUG_NOTICE,("Node %u has caused %u recoveries recently - banning it for %u seconds\n",
1509                         ctdb->nodes[i]->pnn, ban_state->count,
1510                         ctdb->tunable.recovery_ban_period));
1511                 ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
1512                 ban_state->count = 0;
1513         }
1514
1515
1516         if (ctdb->tunable.verify_recovery_lock != 0) {
1517                 DEBUG(DEBUG_ERR,("Taking out recovery lock from recovery daemon\n"));
1518                 start_time = timeval_current();
1519                 if (!ctdb_recovery_lock(ctdb, true)) {
1520                         DEBUG(DEBUG_ERR,("Unable to get recovery lock - aborting recovery "
1521                                          "and ban ourself for %u seconds\n",
1522                                          ctdb->tunable.recovery_ban_period));
1523                         ctdb_ban_node(rec, pnn, ctdb->tunable.recovery_ban_period);
1524                         return -1;
1525                 }
1526                 ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&start_time));
1527                 DEBUG(DEBUG_NOTICE,("Recovery lock taken successfully by recovery daemon\n"));
1528         }
1529
1530         DEBUG(DEBUG_NOTICE, (__location__ " Recovery initiated due to problem with node %u\n", rec->last_culprit_node));
1531
1532         /* get a list of all databases */
1533         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &dbmap);
1534         if (ret != 0) {
1535                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node :%u\n", pnn));
1536                 return -1;
1537         }
1538
1539         /* we do the db creation before we set the recovery mode, so the freeze happens
1540            on all databases we will be dealing with. */
1541
1542         /* verify that we have all the databases any other node has */
1543         ret = create_missing_local_databases(ctdb, nodemap, pnn, &dbmap, mem_ctx);
1544         if (ret != 0) {
1545                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing local databases\n"));
1546                 return -1;
1547         }
1548
1549         /* verify that all other nodes have all our databases */
1550         ret = create_missing_remote_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
1551         if (ret != 0) {
1552                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing remote databases\n"));
1553                 return -1;
1554         }
1555         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - created remote databases\n"));
1556
1557         /* update the database priority for all remote databases */
1558         ret = update_db_priority_on_remote_nodes(ctdb, nodemap, pnn, dbmap, mem_ctx);
1559         if (ret != 0) {
1560                 DEBUG(DEBUG_ERR, (__location__ " Unable to set db priority on remote nodes\n"));
1561         }
1562         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated db priority for all databases\n"));
1563
1564
1565         /* update all other nodes to use the same setting for reclock files
1566            as the local recovery master.
1567         */
1568         sync_recovery_lock_file_across_cluster(rec);
1569
1570         /* set recovery mode to active on all nodes */
1571         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
1572         if (ret != 0) {
1573                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
1574                 return -1;
1575         }
1576
1577         /* execute the "startrecovery" event script on all nodes */
1578         ret = run_startrecovery_eventscript(rec, nodemap);
1579         if (ret!=0) {
1580                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
1581                 return -1;
1582         }
1583
1584         /*
1585           update all nodes to have the same flags that we have
1586          */
1587         for (i=0;i<nodemap->num;i++) {
1588                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1589                         continue;
1590                 }
1591
1592                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
1593                 if (ret != 0) {
1594                         DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
1595                         return -1;
1596                 }
1597         }
1598
1599         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
1600
1601         /* pick a new generation number */
1602         generation = new_generation();
1603
1604         /* change the vnnmap on this node to use the new generation 
1605            number but not on any other nodes.
1606            this guarantees that if we abort the recovery prematurely
1607            for some reason (a node stops responding?)
1608            that we can just return immediately and we will reenter
1609            recovery shortly again.
1610            I.e. we deliberately leave the cluster with an inconsistent
1611            generation id to allow us to abort recovery at any stage and
1612            just restart it from scratch.
1613          */
1614         vnnmap->generation = generation;
1615         ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, vnnmap);
1616         if (ret != 0) {
1617                 DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
1618                 return -1;
1619         }
1620
1621         data.dptr = (void *)&generation;
1622         data.dsize = sizeof(uint32_t);
1623
1624         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
1625         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
1626                                         nodes, 0,
1627                                         CONTROL_TIMEOUT(), false, data,
1628                                         NULL,
1629                                         transaction_start_fail_callback,
1630                                         rec) != 0) {
1631                 DEBUG(DEBUG_ERR, (__location__ " Unable to start transactions. Recovery failed.\n"));
1632                 if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_CANCEL,
1633                                         nodes, 0,
1634                                         CONTROL_TIMEOUT(), false, tdb_null,
1635                                         NULL,
1636                                         NULL,
1637                                         NULL) != 0) {
1638                         DEBUG(DEBUG_ERR,("Failed to cancel recovery transaction\n"));
1639                 }
1640                 return -1;
1641         }
1642
1643         DEBUG(DEBUG_NOTICE,(__location__ " started transactions on all nodes\n"));
1644
1645         for (i=0;i<dbmap->num;i++) {
1646                 ret = recover_database(rec, mem_ctx,
1647                                        dbmap->dbs[i].dbid,
1648                                        dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT,
1649                                        pnn, nodemap, generation);
1650                 if (ret != 0) {
1651                         DEBUG(DEBUG_ERR, (__location__ " Failed to recover database 0x%x\n", dbmap->dbs[i].dbid));
1652                         return -1;
1653                 }
1654         }
1655
1656         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - starting database commits\n"));
1657
1658         /* commit all the changes */
1659         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
1660                                         nodes, 0,
1661                                         CONTROL_TIMEOUT(), false, data,
1662                                         NULL, NULL,
1663                                         NULL) != 0) {
1664                 DEBUG(DEBUG_ERR, (__location__ " Unable to commit recovery changes. Recovery failed.\n"));
1665                 return -1;
1666         }
1667
1668         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - committed databases\n"));
1669         
1670
1671         /* update the capabilities for all nodes */
1672         ret = update_capabilities(ctdb, nodemap);
1673         if (ret!=0) {
1674                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
1675                 return -1;
1676         }
1677
1678         /* build a new vnn map with all the currently active and
1679            unbanned nodes */
1680         generation = new_generation();
1681         vnnmap = talloc(mem_ctx, struct ctdb_vnn_map);
1682         CTDB_NO_MEMORY(ctdb, vnnmap);
1683         vnnmap->generation = generation;
1684         vnnmap->size = 0;
1685         vnnmap->map = talloc_zero_array(vnnmap, uint32_t, vnnmap->size);
1686         CTDB_NO_MEMORY(ctdb, vnnmap->map);
1687         for (i=j=0;i<nodemap->num;i++) {
1688                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1689                         continue;
1690                 }
1691                 if (!(ctdb->nodes[i]->capabilities & CTDB_CAP_LMASTER)) {
1692                         /* this node can not be an lmaster */
1693                         DEBUG(DEBUG_DEBUG, ("Node %d cant be a LMASTER, skipping it\n", i));
1694                         continue;
1695                 }
1696
1697                 vnnmap->size++;
1698                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1699                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1700                 vnnmap->map[j++] = nodemap->nodes[i].pnn;
1701
1702         }
1703         if (vnnmap->size == 0) {
1704                 DEBUG(DEBUG_NOTICE, ("No suitable lmasters found. Adding local node (recmaster) anyway.\n"));
1705                 vnnmap->size++;
1706                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1707                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1708                 vnnmap->map[0] = pnn;
1709         }       
1710
1711         /* update to the new vnnmap on all nodes */
1712         ret = update_vnnmap_on_all_nodes(ctdb, nodemap, pnn, vnnmap, mem_ctx);
1713         if (ret != 0) {
1714                 DEBUG(DEBUG_ERR, (__location__ " Unable to update vnnmap on all nodes\n"));
1715                 return -1;
1716         }
1717
1718         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated vnnmap\n"));
1719
1720         /* update recmaster to point to us for all nodes */
1721         ret = set_recovery_master(ctdb, nodemap, pnn);
1722         if (ret!=0) {
1723                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery master\n"));
1724                 return -1;
1725         }
1726
1727         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated recmaster\n"));
1728
1729         /*
1730           update all nodes to have the same flags that we have
1731          */
1732         for (i=0;i<nodemap->num;i++) {
1733                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1734                         continue;
1735                 }
1736
1737                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
1738                 if (ret != 0) {
1739                         DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
1740                         return -1;
1741                 }
1742         }
1743
1744         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
1745
1746         /* disable recovery mode */
1747         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_NORMAL);
1748         if (ret != 0) {
1749                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to normal on cluster\n"));
1750                 return -1;
1751         }
1752
1753         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - disabled recovery mode\n"));
1754
1755         /*
1756           tell nodes to takeover their public IPs
1757          */
1758         ret = ctdb_reload_remote_public_ips(ctdb, rec, nodemap, &culprit);
1759         if (ret != 0) {
1760                 DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
1761                                  culprit));
1762                 rec->need_takeover_run = true;
1763                 return -1;
1764         }
1765         rec->need_takeover_run = false;
1766         ret = ctdb_takeover_run(ctdb, nodemap);
1767         if (ret != 0) {
1768                 DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses. ctdb_takeover_run() failed.\n"));
1769                 rec->need_takeover_run = true;
1770         }
1771
1772         /* execute the "recovered" event script on all nodes */
1773         ret = run_recovered_eventscript(ctdb, nodemap, "do_recovery");
1774         if (ret!=0) {
1775                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Recovery process failed.\n"));
1776                 return -1;
1777         }
1778
1779         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - finished the recovered event\n"));
1780
1781         /* send a message to all clients telling them that the cluster 
1782            has been reconfigured */
1783         ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_RECONFIGURE, tdb_null);
1784
1785         DEBUG(DEBUG_NOTICE, (__location__ " Recovery complete\n"));
1786
1787         rec->need_recovery = false;
1788
1789         /* we managed to complete a full recovery, make sure to forgive
1790            any past sins by the nodes that could now participate in the
1791            recovery.
1792         */
1793         DEBUG(DEBUG_ERR,("Resetting ban count to 0 for all nodes\n"));
1794         for (i=0;i<nodemap->num;i++) {
1795                 struct ctdb_banning_state *ban_state;
1796
1797                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1798                         continue;
1799                 }
1800
1801                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[nodemap->nodes[i].pnn]->ban_state;
1802                 if (ban_state == NULL) {
1803                         continue;
1804                 }
1805
1806                 ban_state->count = 0;
1807         }
1808
1809
1810         /* We just finished a recovery successfully. 
1811            We now wait for rerecovery_timeout before we allow 
1812            another recovery to take place.
1813         */
1814         DEBUG(DEBUG_NOTICE, ("Just finished a recovery. New recoveries will now be supressed for the rerecovery timeout (%d seconds)\n", ctdb->tunable.rerecovery_timeout));
1815         ctdb_wait_timeout(ctdb, ctdb->tunable.rerecovery_timeout);
1816         DEBUG(DEBUG_NOTICE, ("The rerecovery timeout has elapsed. We now allow recoveries to trigger again.\n"));
1817
1818         return 0;
1819 }
1820
1821
1822 /*
1823   elections are won by first checking the number of connected nodes, then
1824   the priority time, then the pnn
1825  */
1826 struct election_message {
1827         uint32_t num_connected;
1828         struct timeval priority_time;
1829         uint32_t pnn;
1830         uint32_t node_flags;
1831 };
1832
1833 /*
1834   form this nodes election data
1835  */
1836 static void ctdb_election_data(struct ctdb_recoverd *rec, struct election_message *em)
1837 {
1838         int ret, i;
1839         struct ctdb_node_map *nodemap;
1840         struct ctdb_context *ctdb = rec->ctdb;
1841
1842         ZERO_STRUCTP(em);
1843
1844         em->pnn = rec->ctdb->pnn;
1845         em->priority_time = rec->priority_time;
1846
1847         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, rec, &nodemap);
1848         if (ret != 0) {
1849                 DEBUG(DEBUG_ERR,(__location__ " unable to get election data\n"));
1850                 return;
1851         }
1852
1853         rec->node_flags = nodemap->nodes[ctdb->pnn].flags;
1854         em->node_flags = rec->node_flags;
1855
1856         for (i=0;i<nodemap->num;i++) {
1857                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1858                         em->num_connected++;
1859                 }
1860         }
1861
1862         /* we shouldnt try to win this election if we cant be a recmaster */
1863         if ((ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1864                 em->num_connected = 0;
1865                 em->priority_time = timeval_current();
1866         }
1867
1868         talloc_free(nodemap);
1869 }
1870
1871 /*
1872   see if the given election data wins
1873  */
1874 static bool ctdb_election_win(struct ctdb_recoverd *rec, struct election_message *em)
1875 {
1876         struct election_message myem;
1877         int cmp = 0;
1878
1879         ctdb_election_data(rec, &myem);
1880
1881         /* we cant win if we dont have the recmaster capability */
1882         if ((rec->ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1883                 return false;
1884         }
1885
1886         /* we cant win if we are banned */
1887         if (rec->node_flags & NODE_FLAGS_BANNED) {
1888                 return false;
1889         }       
1890
1891         /* we cant win if we are stopped */
1892         if (rec->node_flags & NODE_FLAGS_STOPPED) {
1893                 return false;
1894         }       
1895
1896         /* we will automatically win if the other node is banned */
1897         if (em->node_flags & NODE_FLAGS_BANNED) {
1898                 return true;
1899         }
1900
1901         /* we will automatically win if the other node is banned */
1902         if (em->node_flags & NODE_FLAGS_STOPPED) {
1903                 return true;
1904         }
1905
1906         /* try to use the most connected node */
1907         if (cmp == 0) {
1908                 cmp = (int)myem.num_connected - (int)em->num_connected;
1909         }
1910
1911         /* then the longest running node */
1912         if (cmp == 0) {
1913                 cmp = timeval_compare(&em->priority_time, &myem.priority_time);
1914         }
1915
1916         if (cmp == 0) {
1917                 cmp = (int)myem.pnn - (int)em->pnn;
1918         }
1919
1920         return cmp > 0;
1921 }
1922
1923 /*
1924   send out an election request
1925  */
1926 static int send_election_request(struct ctdb_recoverd *rec, uint32_t pnn, bool update_recmaster)
1927 {
1928         int ret;
1929         TDB_DATA election_data;
1930         struct election_message emsg;
1931         uint64_t srvid;
1932         struct ctdb_context *ctdb = rec->ctdb;
1933
1934         srvid = CTDB_SRVID_RECOVERY;
1935
1936         ctdb_election_data(rec, &emsg);
1937
1938         election_data.dsize = sizeof(struct election_message);
1939         election_data.dptr  = (unsigned char *)&emsg;
1940
1941
1942         /* send an election message to all active nodes */
1943         DEBUG(DEBUG_INFO,(__location__ " Send election request to all active nodes\n"));
1944         ctdb_client_send_message(ctdb, CTDB_BROADCAST_ALL, srvid, election_data);
1945
1946
1947         /* A new node that is already frozen has entered the cluster.
1948            The existing nodes are not frozen and dont need to be frozen
1949            until the election has ended and we start the actual recovery
1950         */
1951         if (update_recmaster == true) {
1952                 /* first we assume we will win the election and set 
1953                    recoverymaster to be ourself on the current node
1954                  */
1955                 ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), pnn, pnn);
1956                 if (ret != 0) {
1957                         DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request\n"));
1958                         return -1;
1959                 }
1960         }
1961
1962
1963         return 0;
1964 }
1965
1966 /*
1967   this function will unban all nodes in the cluster
1968 */
1969 static void unban_all_nodes(struct ctdb_context *ctdb)
1970 {
1971         int ret, i;
1972         struct ctdb_node_map *nodemap;
1973         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1974         
1975         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1976         if (ret != 0) {
1977                 DEBUG(DEBUG_ERR,(__location__ " failed to get nodemap to unban all nodes\n"));
1978                 return;
1979         }
1980
1981         for (i=0;i<nodemap->num;i++) {
1982                 if ( (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED))
1983                   && (nodemap->nodes[i].flags & NODE_FLAGS_BANNED) ) {
1984                         ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[i].pnn, 0, NODE_FLAGS_BANNED);
1985                 }
1986         }
1987
1988         talloc_free(tmp_ctx);
1989 }
1990
1991
1992 /*
1993   we think we are winning the election - send a broadcast election request
1994  */
1995 static void election_send_request(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
1996 {
1997         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1998         int ret;
1999
2000         ret = send_election_request(rec, ctdb_get_pnn(rec->ctdb), false);
2001         if (ret != 0) {
2002                 DEBUG(DEBUG_ERR,("Failed to send election request!\n"));
2003         }
2004
2005         talloc_free(rec->send_election_te);
2006         rec->send_election_te = NULL;
2007 }
2008
2009 /*
2010   handler for memory dumps
2011 */
2012 static void mem_dump_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2013                              TDB_DATA data, void *private_data)
2014 {
2015         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2016         TDB_DATA *dump;
2017         int ret;
2018         struct rd_memdump_reply *rd;
2019
2020         if (data.dsize != sizeof(struct rd_memdump_reply)) {
2021                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
2022                 talloc_free(tmp_ctx);
2023                 return;
2024         }
2025         rd = (struct rd_memdump_reply *)data.dptr;
2026
2027         dump = talloc_zero(tmp_ctx, TDB_DATA);
2028         if (dump == NULL) {
2029                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate memory for memdump\n"));
2030                 talloc_free(tmp_ctx);
2031                 return;
2032         }
2033         ret = ctdb_dump_memory(ctdb, dump);
2034         if (ret != 0) {
2035                 DEBUG(DEBUG_ERR, (__location__ " ctdb_dump_memory() failed\n"));
2036                 talloc_free(tmp_ctx);
2037                 return;
2038         }
2039
2040 DEBUG(DEBUG_ERR, ("recovery master memory dump\n"));            
2041
2042         ret = ctdb_client_send_message(ctdb, rd->pnn, rd->srvid, *dump);
2043         if (ret != 0) {
2044                 DEBUG(DEBUG_ERR,("Failed to send rd memdump reply message\n"));
2045                 talloc_free(tmp_ctx);
2046                 return;
2047         }
2048
2049         talloc_free(tmp_ctx);
2050 }
2051
2052 /*
2053   handler for reload_nodes
2054 */
2055 static void reload_nodes_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2056                              TDB_DATA data, void *private_data)
2057 {
2058         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2059
2060         DEBUG(DEBUG_ERR, (__location__ " Reload nodes file from recovery daemon\n"));
2061
2062         reload_nodes_file(rec->ctdb);
2063 }
2064
2065
2066 static void reenable_ip_check(struct event_context *ev, struct timed_event *te, 
2067                               struct timeval yt, void *p)
2068 {
2069         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
2070
2071         talloc_free(rec->ip_check_disable_ctx);
2072         rec->ip_check_disable_ctx = NULL;
2073 }
2074
2075
2076 static void ctdb_rebalance_timeout(struct event_context *ev, struct timed_event *te, 
2077                                   struct timeval t, void *p)
2078 {
2079         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
2080         struct ctdb_context *ctdb = rec->ctdb;
2081         int ret;
2082
2083         DEBUG(DEBUG_NOTICE,("Rebalance all nodes that have had ip assignment changes.\n"));
2084
2085         ret = ctdb_takeover_run(ctdb, rec->nodemap);
2086         if (ret != 0) {
2087                 DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses. ctdb_takeover_run() failed.\n"));
2088                 rec->need_takeover_run = true;
2089         }
2090
2091         talloc_free(rec->deferred_rebalance_ctx);
2092         rec->deferred_rebalance_ctx = NULL;
2093 }
2094
2095         
2096 static void recd_node_rebalance_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2097                              TDB_DATA data, void *private_data)
2098 {
2099         uint32_t pnn;
2100         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2101
2102         if (data.dsize != sizeof(uint32_t)) {
2103                 DEBUG(DEBUG_ERR,(__location__ " Incorrect size of node rebalance message. Was %zd but expected %zd bytes\n", data.dsize, sizeof(uint32_t)));
2104                 return;
2105         }
2106
2107         if (ctdb->tunable.deferred_rebalance_on_node_add == 0) {
2108                 return;
2109         }
2110
2111         pnn = *(uint32_t *)&data.dptr[0];
2112
2113         lcp2_forcerebalance(ctdb, pnn);
2114         DEBUG(DEBUG_NOTICE,("Received message to perform node rebalancing for node %d\n", pnn));
2115
2116         if (rec->deferred_rebalance_ctx != NULL) {
2117                 talloc_free(rec->deferred_rebalance_ctx);
2118         }
2119         rec->deferred_rebalance_ctx = talloc_new(rec);
2120         event_add_timed(ctdb->ev, rec->deferred_rebalance_ctx, 
2121                         timeval_current_ofs(ctdb->tunable.deferred_rebalance_on_node_add, 0),
2122                         ctdb_rebalance_timeout, rec);
2123 }
2124
2125
2126
2127 static void recd_update_ip_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2128                              TDB_DATA data, void *private_data)
2129 {
2130         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2131         struct ctdb_public_ip *ip;
2132
2133         if (rec->recmaster != rec->ctdb->pnn) {
2134                 DEBUG(DEBUG_INFO,("Not recmaster, ignore update ip message\n"));
2135                 return;
2136         }
2137
2138         if (data.dsize != sizeof(struct ctdb_public_ip)) {
2139                 DEBUG(DEBUG_ERR,(__location__ " Incorrect size of recd update ip message. Was %zd but expected %zd bytes\n", data.dsize, sizeof(struct ctdb_public_ip)));
2140                 return;
2141         }
2142
2143         ip = (struct ctdb_public_ip *)data.dptr;
2144
2145         update_ip_assignment_tree(rec->ctdb, ip);
2146 }
2147
2148
2149 static void disable_ip_check_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2150                              TDB_DATA data, void *private_data)
2151 {
2152         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2153         uint32_t timeout;
2154
2155         if (rec->ip_check_disable_ctx != NULL) {
2156                 talloc_free(rec->ip_check_disable_ctx);
2157                 rec->ip_check_disable_ctx = NULL;
2158         }
2159
2160         if (data.dsize != sizeof(uint32_t)) {
2161                 DEBUG(DEBUG_ERR,(__location__ " Wrong size for data :%lu "
2162                                  "expexting %lu\n", (long unsigned)data.dsize,
2163                                  (long unsigned)sizeof(uint32_t)));
2164                 return;
2165         }
2166         if (data.dptr == NULL) {
2167                 DEBUG(DEBUG_ERR,(__location__ " No data recaived\n"));
2168                 return;
2169         }
2170
2171         timeout = *((uint32_t *)data.dptr);
2172         DEBUG(DEBUG_NOTICE,("Disabling ip check for %u seconds\n", timeout));
2173
2174         rec->ip_check_disable_ctx = talloc_new(rec);
2175         CTDB_NO_MEMORY_VOID(ctdb, rec->ip_check_disable_ctx);
2176
2177         event_add_timed(ctdb->ev, rec->ip_check_disable_ctx, timeval_current_ofs(timeout, 0), reenable_ip_check, rec);
2178 }
2179
2180
2181 /*
2182   handler for reload all ips.
2183 */
2184 static void ip_reloadall_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2185                              TDB_DATA data, void *private_data)
2186 {
2187         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2188
2189         if (data.dsize != sizeof(struct reloadips_all_reply)) {
2190                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
2191                 return;
2192         }
2193
2194         reload_all_ips_request = (struct reloadips_all_reply *)talloc_steal(rec, data.dptr);
2195
2196         DEBUG(DEBUG_NOTICE,("RELOAD_ALL_IPS message received from node:%d srvid:%d\n", reload_all_ips_request->pnn, (int)reload_all_ips_request->srvid));
2197         return;
2198 }
2199
2200 static void async_reloadips_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
2201 {
2202         uint32_t *status = callback_data;
2203
2204         if (res != 0) {
2205                 DEBUG(DEBUG_ERR,("Reload ips all failed on node %d\n", node_pnn));
2206                 *status = 1;
2207         }
2208 }
2209
2210 static int
2211 reload_all_ips(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, struct reloadips_all_reply *rips)
2212 {
2213         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2214         uint32_t *nodes;
2215         uint32_t status;
2216         int i;
2217
2218         DEBUG(DEBUG_ERR,("RELOAD ALL IPS on all active nodes\n"));
2219         for (i = 0; i< nodemap->num; i++) {
2220                 if (nodemap->nodes[i].flags != 0) {
2221                         DEBUG(DEBUG_ERR, ("Can not reload ips on all nodes. Node %d is not up and healthy\n", i));
2222                         talloc_free(tmp_ctx);
2223                         return -1;
2224                 }
2225         }
2226
2227         /* send the flags update to all connected nodes */
2228         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2229         status = 0;
2230         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_RELOAD_PUBLIC_IPS,
2231                                         nodes, 0,
2232                                         CONTROL_TIMEOUT(),
2233                                         false, tdb_null,
2234                                         async_reloadips_callback, NULL,
2235                                         &status) != 0) {
2236                 DEBUG(DEBUG_ERR, (__location__ " Failed to reloadips on all nodes.\n"));
2237                 talloc_free(tmp_ctx);
2238                 return -1;
2239         }
2240
2241         if (status != 0) {
2242                 DEBUG(DEBUG_ERR, (__location__ " Failed to reloadips on all nodes.\n"));
2243                 talloc_free(tmp_ctx);
2244                 return -1;
2245         }
2246
2247         ctdb_client_send_message(ctdb, rips->pnn, rips->srvid, tdb_null);
2248
2249         talloc_free(tmp_ctx);
2250         return 0;
2251 }
2252
2253
2254 /*
2255   handler for ip reallocate, just add it to the list of callers and 
2256   handle this later in the monitor_cluster loop so we do not recurse
2257   with other callers to takeover_run()
2258 */
2259 static void ip_reallocate_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2260                              TDB_DATA data, void *private_data)
2261 {
2262         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2263         struct ip_reallocate_list *caller;
2264
2265         if (data.dsize != sizeof(struct rd_memdump_reply)) {
2266                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
2267                 return;
2268         }
2269
2270         if (rec->ip_reallocate_ctx == NULL) {
2271                 rec->ip_reallocate_ctx = talloc_new(rec);
2272                 CTDB_NO_MEMORY_FATAL(ctdb, rec->ip_reallocate_ctx);
2273         }
2274
2275         caller = talloc(rec->ip_reallocate_ctx, struct ip_reallocate_list);
2276         CTDB_NO_MEMORY_FATAL(ctdb, caller);
2277
2278         caller->rd   = (struct rd_memdump_reply *)talloc_steal(caller, data.dptr);
2279         caller->next = rec->reallocate_callers;
2280         rec->reallocate_callers = caller;
2281
2282         return;
2283 }
2284
2285 static void process_ipreallocate_requests(struct ctdb_context *ctdb, struct ctdb_recoverd *rec)
2286 {
2287         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2288         TDB_DATA result;
2289         int32_t ret;
2290         struct ip_reallocate_list *callers;
2291         uint32_t culprit;
2292
2293         DEBUG(DEBUG_INFO, ("recovery master forced ip reallocation\n"));
2294
2295         /* update the list of public ips that a node can handle for
2296            all connected nodes
2297         */
2298         ret = ctdb_reload_remote_public_ips(ctdb, rec, rec->nodemap, &culprit);
2299         if (ret != 0) {
2300                 DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
2301                                  culprit));
2302                 rec->need_takeover_run = true;
2303         }
2304         if (ret == 0) {
2305                 ret = ctdb_takeover_run(ctdb, rec->nodemap);
2306                 if (ret != 0) {
2307                         DEBUG(DEBUG_ERR,("Failed to reallocate addresses: ctdb_takeover_run() failed.\n"));
2308                         rec->need_takeover_run = true;
2309                 }
2310         }
2311
2312         result.dsize = sizeof(int32_t);
2313         result.dptr  = (uint8_t *)&ret;
2314
2315         for (callers=rec->reallocate_callers; callers; callers=callers->next) {
2316
2317                 /* Someone that sent srvid==0 does not want a reply */
2318                 if (callers->rd->srvid == 0) {
2319                         continue;
2320                 }
2321                 DEBUG(DEBUG_INFO,("Sending ip reallocate reply message to "
2322                                   "%u:%llu\n", (unsigned)callers->rd->pnn,
2323                                   (unsigned long long)callers->rd->srvid));
2324                 ret = ctdb_client_send_message(ctdb, callers->rd->pnn, callers->rd->srvid, result);
2325                 if (ret != 0) {
2326                         DEBUG(DEBUG_ERR,("Failed to send ip reallocate reply "
2327                                          "message to %u:%llu\n",
2328                                          (unsigned)callers->rd->pnn,
2329                                          (unsigned long long)callers->rd->srvid));
2330                 }
2331         }
2332
2333         talloc_free(tmp_ctx);
2334         talloc_free(rec->ip_reallocate_ctx);
2335         rec->ip_reallocate_ctx = NULL;
2336         rec->reallocate_callers = NULL;
2337         
2338 }
2339
2340
2341 /*
2342   handler for recovery master elections
2343 */
2344 static void election_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2345                              TDB_DATA data, void *private_data)
2346 {
2347         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2348         int ret;
2349         struct election_message *em = (struct election_message *)data.dptr;
2350         TALLOC_CTX *mem_ctx;
2351
2352         /* we got an election packet - update the timeout for the election */
2353         talloc_free(rec->election_timeout);
2354         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
2355                                                 fast_start ?
2356                                                 timeval_current_ofs(0, 500000) :
2357                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
2358                                                 ctdb_election_timeout, rec);
2359
2360         mem_ctx = talloc_new(ctdb);
2361
2362         /* someone called an election. check their election data
2363            and if we disagree and we would rather be the elected node, 
2364            send a new election message to all other nodes
2365          */
2366         if (ctdb_election_win(rec, em)) {
2367                 if (!rec->send_election_te) {
2368                         rec->send_election_te = event_add_timed(ctdb->ev, rec, 
2369                                                                 timeval_current_ofs(0, 500000),
2370                                                                 election_send_request, rec);
2371                 }
2372                 talloc_free(mem_ctx);
2373                 /*unban_all_nodes(ctdb);*/
2374                 return;
2375         }
2376         
2377         /* we didn't win */
2378         talloc_free(rec->send_election_te);
2379         rec->send_election_te = NULL;
2380
2381         if (ctdb->tunable.verify_recovery_lock != 0) {
2382                 /* release the recmaster lock */
2383                 if (em->pnn != ctdb->pnn &&
2384                     ctdb->recovery_lock_fd != -1) {
2385                         close(ctdb->recovery_lock_fd);
2386                         ctdb->recovery_lock_fd = -1;
2387                         unban_all_nodes(ctdb);
2388                 }
2389         }
2390
2391         /* ok, let that guy become recmaster then */
2392         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), ctdb_get_pnn(ctdb), em->pnn);
2393         if (ret != 0) {
2394                 DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request"));
2395                 talloc_free(mem_ctx);
2396                 return;
2397         }
2398
2399         talloc_free(mem_ctx);
2400         return;
2401 }
2402
2403
2404 /*
2405   force the start of the election process
2406  */
2407 static void force_election(struct ctdb_recoverd *rec, uint32_t pnn, 
2408                            struct ctdb_node_map *nodemap)
2409 {
2410         int ret;
2411         struct ctdb_context *ctdb = rec->ctdb;
2412
2413         DEBUG(DEBUG_INFO,(__location__ " Force an election\n"));
2414
2415         /* set all nodes to recovery mode to stop all internode traffic */
2416         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
2417         if (ret != 0) {
2418                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
2419                 return;
2420         }
2421
2422         talloc_free(rec->election_timeout);
2423         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
2424                                                 fast_start ?
2425                                                 timeval_current_ofs(0, 500000) :
2426                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
2427                                                 ctdb_election_timeout, rec);
2428
2429         ret = send_election_request(rec, pnn, true);
2430         if (ret!=0) {
2431                 DEBUG(DEBUG_ERR, (__location__ " failed to initiate recmaster election"));
2432                 return;
2433         }
2434
2435         /* wait for a few seconds to collect all responses */
2436         ctdb_wait_election(rec);
2437 }
2438
2439
2440
2441 /*
2442   handler for when a node changes its flags
2443 */
2444 static void monitor_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2445                             TDB_DATA data, void *private_data)
2446 {
2447         int ret;
2448         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2449         struct ctdb_node_map *nodemap=NULL;
2450         TALLOC_CTX *tmp_ctx;
2451         int i;
2452         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2453         int disabled_flag_changed;
2454
2455         if (data.dsize != sizeof(*c)) {
2456                 DEBUG(DEBUG_ERR,(__location__ "Invalid data in ctdb_node_flag_change\n"));
2457                 return;
2458         }
2459
2460         tmp_ctx = talloc_new(ctdb);
2461         CTDB_NO_MEMORY_VOID(ctdb, tmp_ctx);
2462
2463         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
2464         if (ret != 0) {
2465                 DEBUG(DEBUG_ERR,(__location__ "ctdb_ctrl_getnodemap failed in monitor_handler\n"));
2466                 talloc_free(tmp_ctx);
2467                 return;         
2468         }
2469
2470
2471         for (i=0;i<nodemap->num;i++) {
2472                 if (nodemap->nodes[i].pnn == c->pnn) break;
2473         }
2474
2475         if (i == nodemap->num) {
2476                 DEBUG(DEBUG_CRIT,(__location__ "Flag change for non-existant node %u\n", c->pnn));
2477                 talloc_free(tmp_ctx);
2478                 return;
2479         }
2480
2481         if (nodemap->nodes[i].flags != c->new_flags) {
2482                 DEBUG(DEBUG_NOTICE,("Node %u has changed flags - now 0x%x  was 0x%x\n", c->pnn, c->new_flags, c->old_flags));
2483         }
2484
2485         disabled_flag_changed =  (nodemap->nodes[i].flags ^ c->new_flags) & NODE_FLAGS_DISABLED;
2486
2487         nodemap->nodes[i].flags = c->new_flags;
2488
2489         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
2490                                      CTDB_CURRENT_NODE, &ctdb->recovery_master);
2491
2492         if (ret == 0) {
2493                 ret = ctdb_ctrl_getrecmode(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
2494                                            CTDB_CURRENT_NODE, &ctdb->recovery_mode);
2495         }
2496         
2497         if (ret == 0 &&
2498             ctdb->recovery_master == ctdb->pnn &&
2499             ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
2500                 /* Only do the takeover run if the perm disabled or unhealthy
2501                    flags changed since these will cause an ip failover but not
2502                    a recovery.
2503                    If the node became disconnected or banned this will also
2504                    lead to an ip address failover but that is handled 
2505                    during recovery
2506                 */
2507                 if (disabled_flag_changed) {
2508                         rec->need_takeover_run = true;
2509                 }
2510         }
2511
2512         talloc_free(tmp_ctx);
2513 }
2514
2515 /*
2516   handler for when we need to push out flag changes ot all other nodes
2517 */
2518 static void push_flags_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2519                             TDB_DATA data, void *private_data)
2520 {
2521         int ret;
2522         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2523         struct ctdb_node_map *nodemap=NULL;
2524         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2525         uint32_t recmaster;
2526         uint32_t *nodes;
2527
2528         /* find the recovery master */
2529         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &recmaster);
2530         if (ret != 0) {
2531                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
2532                 talloc_free(tmp_ctx);
2533                 return;
2534         }
2535
2536         /* read the node flags from the recmaster */
2537         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), recmaster, tmp_ctx, &nodemap);
2538         if (ret != 0) {
2539                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", c->pnn));
2540                 talloc_free(tmp_ctx);
2541                 return;
2542         }
2543         if (c->pnn >= nodemap->num) {
2544                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", c->pnn));
2545                 talloc_free(tmp_ctx);
2546                 return;
2547         }
2548
2549         /* send the flags update to all connected nodes */
2550         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2551
2552         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
2553                                       nodes, 0, CONTROL_TIMEOUT(),
2554                                       false, data,
2555                                       NULL, NULL,
2556                                       NULL) != 0) {
2557                 DEBUG(DEBUG_ERR, (__location__ " ctdb_control to modify node flags failed\n"));
2558
2559                 talloc_free(tmp_ctx);
2560                 return;
2561         }
2562
2563         talloc_free(tmp_ctx);
2564 }
2565
2566
2567 struct verify_recmode_normal_data {
2568         uint32_t count;
2569         enum monitor_result status;
2570 };
2571
2572 static void verify_recmode_normal_callback(struct ctdb_client_control_state *state)
2573 {
2574         struct verify_recmode_normal_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmode_normal_data);
2575
2576
2577         /* one more node has responded with recmode data*/
2578         rmdata->count--;
2579
2580         /* if we failed to get the recmode, then return an error and let
2581            the main loop try again.
2582         */
2583         if (state->state != CTDB_CONTROL_DONE) {
2584                 if (rmdata->status == MONITOR_OK) {
2585                         rmdata->status = MONITOR_FAILED;
2586                 }
2587                 return;
2588         }
2589
2590         /* if we got a response, then the recmode will be stored in the
2591            status field
2592         */
2593         if (state->status != CTDB_RECOVERY_NORMAL) {
2594                 DEBUG(DEBUG_NOTICE, (__location__ " Node:%u was in recovery mode. Restart recovery process\n", state->c->hdr.destnode));
2595                 rmdata->status = MONITOR_RECOVERY_NEEDED;
2596         }
2597
2598         return;
2599 }
2600
2601
2602 /* verify that all nodes are in normal recovery mode */
2603 static enum monitor_result verify_recmode(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
2604 {
2605         struct verify_recmode_normal_data *rmdata;
2606         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2607         struct ctdb_client_control_state *state;
2608         enum monitor_result status;
2609         int j;
2610         
2611         rmdata = talloc(mem_ctx, struct verify_recmode_normal_data);
2612         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2613         rmdata->count  = 0;
2614         rmdata->status = MONITOR_OK;
2615
2616         /* loop over all active nodes and send an async getrecmode call to 
2617            them*/
2618         for (j=0; j<nodemap->num; j++) {
2619                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2620                         continue;
2621                 }
2622                 state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, 
2623                                         CONTROL_TIMEOUT(), 
2624                                         nodemap->nodes[j].pnn);
2625                 if (state == NULL) {
2626                         /* we failed to send the control, treat this as 
2627                            an error and try again next iteration
2628                         */                      
2629                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmode_send during monitoring\n"));
2630                         talloc_free(mem_ctx);
2631                         return MONITOR_FAILED;
2632                 }
2633
2634                 /* set up the callback functions */
2635                 state->async.fn = verify_recmode_normal_callback;
2636                 state->async.private_data = rmdata;
2637
2638                 /* one more control to wait for to complete */
2639                 rmdata->count++;
2640         }
2641
2642
2643         /* now wait for up to the maximum number of seconds allowed
2644            or until all nodes we expect a response from has replied
2645         */
2646         while (rmdata->count > 0) {
2647                 event_loop_once(ctdb->ev);
2648         }
2649
2650         status = rmdata->status;
2651         talloc_free(mem_ctx);
2652         return status;
2653 }
2654
2655
2656 struct verify_recmaster_data {
2657         struct ctdb_recoverd *rec;
2658         uint32_t count;
2659         uint32_t pnn;
2660         enum monitor_result status;
2661 };
2662
2663 static void verify_recmaster_callback(struct ctdb_client_control_state *state)
2664 {
2665         struct verify_recmaster_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmaster_data);
2666
2667
2668         /* one more node has responded with recmaster data*/
2669         rmdata->count--;
2670
2671         /* if we failed to get the recmaster, then return an error and let
2672            the main loop try again.
2673         */
2674         if (state->state != CTDB_CONTROL_DONE) {
2675                 if (rmdata->status == MONITOR_OK) {
2676                         rmdata->status = MONITOR_FAILED;
2677                 }
2678                 return;
2679         }
2680
2681         /* if we got a response, then the recmaster will be stored in the
2682            status field
2683         */
2684         if (state->status != rmdata->pnn) {
2685                 DEBUG(DEBUG_ERR,("Node %d does not agree we are the recmaster. Need a new recmaster election\n", state->c->hdr.destnode));
2686                 ctdb_set_culprit(rmdata->rec, state->c->hdr.destnode);
2687                 rmdata->status = MONITOR_ELECTION_NEEDED;
2688         }
2689
2690         return;
2691 }
2692
2693
2694 /* verify that all nodes agree that we are the recmaster */
2695 static enum monitor_result verify_recmaster(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t pnn)
2696 {
2697         struct ctdb_context *ctdb = rec->ctdb;
2698         struct verify_recmaster_data *rmdata;
2699         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2700         struct ctdb_client_control_state *state;
2701         enum monitor_result status;
2702         int j;
2703         
2704         rmdata = talloc(mem_ctx, struct verify_recmaster_data);
2705         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2706         rmdata->rec    = rec;
2707         rmdata->count  = 0;
2708         rmdata->pnn    = pnn;
2709         rmdata->status = MONITOR_OK;
2710
2711         /* loop over all active nodes and send an async getrecmaster call to 
2712            them*/
2713         for (j=0; j<nodemap->num; j++) {
2714                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2715                         continue;
2716                 }
2717                 state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, 
2718                                         CONTROL_TIMEOUT(),
2719                                         nodemap->nodes[j].pnn);
2720                 if (state == NULL) {
2721                         /* we failed to send the control, treat this as 
2722                            an error and try again next iteration
2723                         */                      
2724                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmaster_send during monitoring\n"));
2725                         talloc_free(mem_ctx);
2726                         return MONITOR_FAILED;
2727                 }
2728
2729                 /* set up the callback functions */
2730                 state->async.fn = verify_recmaster_callback;
2731                 state->async.private_data = rmdata;
2732
2733                 /* one more control to wait for to complete */
2734                 rmdata->count++;
2735         }
2736
2737
2738         /* now wait for up to the maximum number of seconds allowed
2739            or until all nodes we expect a response from has replied
2740         */
2741         while (rmdata->count > 0) {
2742                 event_loop_once(ctdb->ev);
2743         }
2744
2745         status = rmdata->status;
2746         talloc_free(mem_ctx);
2747         return status;
2748 }
2749
2750
2751 /* called to check that the local allocation of public ip addresses is ok.
2752 */
2753 static int verify_local_ip_allocation(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, uint32_t pnn, struct ctdb_node_map *nodemap)
2754 {
2755         TALLOC_CTX *mem_ctx = talloc_new(NULL);
2756         struct ctdb_control_get_ifaces *ifaces = NULL;
2757         struct ctdb_all_public_ips *ips = NULL;
2758         struct ctdb_uptime *uptime1 = NULL;
2759         struct ctdb_uptime *uptime2 = NULL;
2760         int ret, j;
2761         bool need_iface_check = false;
2762         bool need_takeover_run = false;
2763
2764         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
2765                                 CTDB_CURRENT_NODE, &uptime1);
2766         if (ret != 0) {
2767                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
2768                 talloc_free(mem_ctx);
2769                 return -1;
2770         }
2771
2772
2773         /* read the interfaces from the local node */
2774         ret = ctdb_ctrl_get_ifaces(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, &ifaces);
2775         if (ret != 0) {
2776                 DEBUG(DEBUG_ERR, ("Unable to get interfaces from local node %u\n", pnn));
2777                 talloc_free(mem_ctx);
2778                 return -1;
2779         }
2780
2781         if (!rec->ifaces) {
2782                 need_iface_check = true;
2783         } else if (rec->ifaces->num != ifaces->num) {
2784                 need_iface_check = true;
2785         } else if (memcmp(rec->ifaces, ifaces, talloc_get_size(ifaces)) != 0) {
2786                 need_iface_check = true;
2787         }
2788
2789         if (need_iface_check) {
2790                 DEBUG(DEBUG_NOTICE, ("The interfaces status has changed on "
2791                                      "local node %u - force takeover run\n",
2792                                      pnn));
2793                 need_takeover_run = true;
2794         }
2795
2796         /* read the ip allocation from the local node */
2797         ret = ctdb_ctrl_get_public_ips(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, &ips);
2798         if (ret != 0) {
2799                 DEBUG(DEBUG_ERR, ("Unable to get public ips from local node %u\n", pnn));
2800                 talloc_free(mem_ctx);
2801                 return -1;
2802         }
2803
2804         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
2805                                 CTDB_CURRENT_NODE, &uptime2);
2806         if (ret != 0) {
2807                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
2808                 talloc_free(mem_ctx);
2809                 return -1;
2810         }
2811
2812         /* skip the check if the startrecovery time has changed */
2813         if (timeval_compare(&uptime1->last_recovery_started,
2814                             &uptime2->last_recovery_started) != 0) {
2815                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
2816                 talloc_free(mem_ctx);
2817                 return 0;
2818         }
2819
2820         /* skip the check if the endrecovery time has changed */
2821         if (timeval_compare(&uptime1->last_recovery_finished,
2822                             &uptime2->last_recovery_finished) != 0) {
2823                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
2824                 talloc_free(mem_ctx);
2825                 return 0;
2826         }
2827
2828         /* skip the check if we have started but not finished recovery */
2829         if (timeval_compare(&uptime1->last_recovery_finished,
2830                             &uptime1->last_recovery_started) != 1) {
2831                 DEBUG(DEBUG_INFO, (__location__ " in the middle of recovery or ip reallocation. skipping public ip address check\n"));
2832                 talloc_free(mem_ctx);
2833
2834                 return 0;
2835         }
2836
2837         talloc_free(rec->ifaces);
2838         rec->ifaces = talloc_steal(rec, ifaces);
2839
2840         /* verify that we have the ip addresses we should have
2841            and we dont have ones we shouldnt have.
2842            if we find an inconsistency we set recmode to
2843            active on the local node and wait for the recmaster
2844            to do a full blown recovery.
2845            also if the pnn is -1 and we are healthy and can host the ip
2846            we also request a ip reallocation.
2847         */
2848         if (ctdb->tunable.disable_ip_failover == 0) {
2849                 for (j=0; j<ips->num; j++) {
2850                         if (ips->ips[j].pnn == -1 && nodemap->nodes[pnn].flags == 0) {
2851                                 DEBUG(DEBUG_CRIT,("Public address '%s' is not assigned and we could serve this ip\n",
2852                                                 ctdb_addr_to_str(&ips->ips[j].addr)));
2853                                 need_takeover_run = true;
2854                         } else if (ips->ips[j].pnn == pnn) {
2855                                 if (ctdb->do_checkpublicip && !ctdb_sys_have_ip(&ips->ips[j].addr)) {
2856                                         DEBUG(DEBUG_CRIT,("Public address '%s' is missing and we should serve this ip\n",
2857                                                 ctdb_addr_to_str(&ips->ips[j].addr)));
2858                                         need_takeover_run = true;
2859                                 }
2860                         } else {
2861                                 if (ctdb->do_checkpublicip && ctdb_sys_have_ip(&ips->ips[j].addr)) {
2862                                         DEBUG(DEBUG_CRIT,("We are still serving a public address '%s' that we should not be serving.\n", 
2863                                                 ctdb_addr_to_str(&ips->ips[j].addr)));
2864                                         need_takeover_run = true;
2865                                 }
2866                         }
2867                 }
2868         }
2869
2870         if (need_takeover_run) {
2871                 struct takeover_run_reply rd;
2872                 TDB_DATA data;
2873
2874                 DEBUG(DEBUG_CRIT,("Trigger takeoverrun\n"));
2875
2876                 rd.pnn = ctdb->pnn;
2877                 rd.srvid = 0;
2878                 data.dptr = (uint8_t *)&rd;
2879                 data.dsize = sizeof(rd);
2880
2881                 ret = ctdb_client_send_message(ctdb, rec->recmaster, CTDB_SRVID_TAKEOVER_RUN, data);
2882                 if (ret != 0) {
2883                         DEBUG(DEBUG_ERR,(__location__ " Failed to send ipreallocate to recmaster :%d\n", (int)rec->recmaster));
2884                 }
2885         }
2886         talloc_free(mem_ctx);
2887         return 0;
2888 }
2889
2890
2891 static void async_getnodemap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
2892 {
2893         struct ctdb_node_map **remote_nodemaps = callback_data;
2894
2895         if (node_pnn >= ctdb->num_nodes) {
2896                 DEBUG(DEBUG_ERR,(__location__ " pnn from invalid node\n"));
2897                 return;
2898         }
2899
2900         remote_nodemaps[node_pnn] = (struct ctdb_node_map *)talloc_steal(remote_nodemaps, outdata.dptr);
2901
2902 }
2903
2904 static int get_remote_nodemaps(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx,
2905         struct ctdb_node_map *nodemap,
2906         struct ctdb_node_map **remote_nodemaps)
2907 {
2908         uint32_t *nodes;
2909
2910         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
2911         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_NODEMAP,
2912                                         nodes, 0,
2913                                         CONTROL_TIMEOUT(), false, tdb_null,
2914                                         async_getnodemap_callback,
2915                                         NULL,
2916                                         remote_nodemaps) != 0) {
2917                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull all remote nodemaps\n"));
2918
2919                 return -1;
2920         }
2921
2922         return 0;
2923 }
2924
2925 enum reclock_child_status { RECLOCK_CHECKING, RECLOCK_OK, RECLOCK_FAILED, RECLOCK_TIMEOUT};
2926 struct ctdb_check_reclock_state {
2927         struct ctdb_context *ctdb;
2928         struct timeval start_time;
2929         int fd[2];
2930         pid_t child;
2931         struct timed_event *te;
2932         struct fd_event *fde;
2933         enum reclock_child_status status;
2934 };
2935
2936 /* when we free the reclock state we must kill any child process.
2937 */
2938 static int check_reclock_destructor(struct ctdb_check_reclock_state *state)
2939 {
2940         struct ctdb_context *ctdb = state->ctdb;
2941
2942         ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&state->start_time));
2943
2944         if (state->fd[0] != -1) {
2945                 close(state->fd[0]);
2946                 state->fd[0] = -1;
2947         }
2948         if (state->fd[1] != -1) {
2949                 close(state->fd[1]);
2950                 state->fd[1] = -1;
2951         }
2952         kill(state->child, SIGKILL);
2953         return 0;
2954 }
2955
2956 /*
2957   called if our check_reclock child times out. this would happen if
2958   i/o to the reclock file blocks.
2959  */
2960 static void ctdb_check_reclock_timeout(struct event_context *ev, struct timed_event *te, 
2961                                          struct timeval t, void *private_data)
2962 {
2963         struct ctdb_check_reclock_state *state = talloc_get_type(private_data, 
2964                                            struct ctdb_check_reclock_state);
2965
2966         DEBUG(DEBUG_ERR,(__location__ " check_reclock child process hung/timedout CFS slow to grant locks?\n"));
2967         state->status = RECLOCK_TIMEOUT;
2968 }
2969
2970 /* this is called when the child process has completed checking the reclock
2971    file and has written data back to us through the pipe.
2972 */
2973 static void reclock_child_handler(struct event_context *ev, struct fd_event *fde, 
2974                              uint16_t flags, void *private_data)
2975 {
2976         struct ctdb_check_reclock_state *state= talloc_get_type(private_data, 
2977                                              struct ctdb_check_reclock_state);
2978         char c = 0;
2979         int ret;
2980
2981         /* we got a response from our child process so we can abort the
2982            timeout.
2983         */
2984         talloc_free(state->te);
2985         state->te = NULL;
2986
2987         ret = read(state->fd[0], &c, 1);
2988         if (ret != 1 || c != RECLOCK_OK) {
2989                 DEBUG(DEBUG_ERR,(__location__ " reclock child process returned error %d\n", c));
2990                 state->status = RECLOCK_FAILED;
2991
2992                 return;
2993         }
2994
2995         state->status = RECLOCK_OK;
2996         return;
2997 }
2998
2999 static int check_recovery_lock(struct ctdb_context *ctdb)
3000 {
3001         int ret;
3002         struct ctdb_check_reclock_state *state;
3003         pid_t parent = getpid();
3004
3005         if (ctdb->recovery_lock_fd == -1) {
3006                 DEBUG(DEBUG_CRIT,("recovery master doesn't have the recovery lock\n"));
3007                 return -1;
3008         }
3009
3010         state = talloc(ctdb, struct ctdb_check_reclock_state);
3011         CTDB_NO_MEMORY(ctdb, state);
3012
3013         state->ctdb = ctdb;
3014         state->start_time = timeval_current();
3015         state->status = RECLOCK_CHECKING;
3016         state->fd[0] = -1;
3017         state->fd[1] = -1;
3018
3019         ret = pipe(state->fd);
3020         if (ret != 0) {
3021                 talloc_free(state);
3022                 DEBUG(DEBUG_CRIT,(__location__ " Failed to open pipe for check_reclock child\n"));
3023                 return -1;
3024         }
3025
3026         state->child = ctdb_fork(ctdb);
3027         if (state->child == (pid_t)-1) {
3028                 DEBUG(DEBUG_CRIT,(__location__ " fork() failed in check_reclock child\n"));
3029                 close(state->fd[0]);
3030                 state->fd[0] = -1;
3031                 close(state->fd[1]);
3032                 state->fd[1] = -1;
3033                 talloc_free(state);
3034                 return -1;
3035         }
3036
3037         if (state->child == 0) {
3038                 char cc = RECLOCK_OK;
3039                 close(state->fd[0]);
3040                 state->fd[0] = -1;
3041
3042                 debug_extra = talloc_asprintf(NULL, "recovery-lock:");
3043                 if (pread(ctdb->recovery_lock_fd, &cc, 1, 0) == -1) {
3044                         DEBUG(DEBUG_CRIT,("failed read from recovery_lock_fd - %s\n", strerror(errno)));
3045                         cc = RECLOCK_FAILED;
3046                 }
3047
3048                 write(state->fd[1], &cc, 1);
3049                 /* make sure we die when our parent dies */
3050                 while (kill(parent, 0) == 0 || errno != ESRCH) {
3051                         sleep(5);
3052                         write(state->fd[1], &cc, 1);
3053                 }
3054                 _exit(0);
3055         }
3056         close(state->fd[1]);
3057         state->fd[1] = -1;
3058         set_close_on_exec(state->fd[0]);
3059
3060         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d for check_recovery_lock\n", state->fd[0]));
3061
3062         talloc_set_destructor(state, check_reclock_destructor);
3063
3064         state->te = event_add_timed(ctdb->ev, state, timeval_current_ofs(15, 0),
3065                                     ctdb_check_reclock_timeout, state);
3066         if (state->te == NULL) {
3067                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create a timed event for reclock child\n"));
3068                 talloc_free(state);
3069                 return -1;
3070         }
3071
3072         state->fde = event_add_fd(ctdb->ev, state, state->fd[0],
3073                                 EVENT_FD_READ,
3074                                 reclock_child_handler,
3075                                 (void *)state);
3076
3077         if (state->fde == NULL) {
3078                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create an fd event for reclock child\n"));
3079                 talloc_free(state);
3080                 return -1;
3081         }
3082         tevent_fd_set_auto_close(state->fde);
3083
3084         while (state->status == RECLOCK_CHECKING) {
3085                 event_loop_once(ctdb->ev);
3086         }
3087
3088         if (state->status == RECLOCK_FAILED) {
3089                 DEBUG(DEBUG_ERR,(__location__ " reclock child failed when checking file\n"));
3090                 close(ctdb->recovery_lock_fd);
3091                 ctdb->recovery_lock_fd = -1;
3092                 talloc_free(state);
3093                 return -1;
3094         }
3095
3096         talloc_free(state);
3097         return 0;
3098 }
3099
3100 static int update_recovery_lock_file(struct ctdb_context *ctdb)
3101 {
3102         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3103         const char *reclockfile;
3104
3105         if (ctdb_ctrl_getreclock(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &reclockfile) != 0) {
3106                 DEBUG(DEBUG_ERR,("Failed to read reclock file from daemon\n"));
3107                 talloc_free(tmp_ctx);
3108                 return -1;      
3109         }
3110
3111         if (reclockfile == NULL) {
3112                 if (ctdb->recovery_lock_file != NULL) {
3113                         DEBUG(DEBUG_ERR,("Reclock file disabled\n"));
3114                         talloc_free(ctdb->recovery_lock_file);
3115                         ctdb->recovery_lock_file = NULL;
3116                         if (ctdb->recovery_lock_fd != -1) {
3117                                 close(ctdb->recovery_lock_fd);
3118                                 ctdb->recovery_lock_fd = -1;
3119                         }
3120                 }
3121                 ctdb->tunable.verify_recovery_lock = 0;
3122                 talloc_free(tmp_ctx);
3123                 return 0;
3124         }
3125
3126         if (ctdb->recovery_lock_file == NULL) {
3127                 ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
3128                 if (ctdb->recovery_lock_fd != -1) {
3129                         close(ctdb->recovery_lock_fd);
3130                         ctdb->recovery_lock_fd = -1;
3131                 }
3132                 talloc_free(tmp_ctx);
3133                 return 0;
3134         }
3135
3136
3137         if (!strcmp(reclockfile, ctdb->recovery_lock_file)) {
3138                 talloc_free(tmp_ctx);
3139                 return 0;
3140         }
3141
3142         talloc_free(ctdb->recovery_lock_file);
3143         ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
3144         ctdb->tunable.verify_recovery_lock = 0;
3145         if (ctdb->recovery_lock_fd != -1) {
3146                 close(ctdb->recovery_lock_fd);
3147                 ctdb->recovery_lock_fd = -1;
3148         }
3149
3150         talloc_free(tmp_ctx);
3151         return 0;
3152 }
3153
3154 static void main_loop(struct ctdb_context *ctdb, struct ctdb_recoverd *rec,
3155                       TALLOC_CTX *mem_ctx)
3156 {
3157         uint32_t pnn;
3158         struct ctdb_node_map *nodemap=NULL;
3159         struct ctdb_node_map *recmaster_nodemap=NULL;
3160         struct ctdb_node_map **remote_nodemaps=NULL;
3161         struct ctdb_vnn_map *vnnmap=NULL;
3162         struct ctdb_vnn_map *remote_vnnmap=NULL;
3163         int32_t debug_level;
3164         int i, j, ret;
3165
3166
3167
3168         /* verify that the main daemon is still running */
3169         if (kill(ctdb->ctdbd_pid, 0) != 0) {
3170                 DEBUG(DEBUG_CRIT,("CTDB daemon is no longer available. Shutting down recovery daemon\n"));
3171                 exit(-1);
3172         }
3173
3174         /* ping the local daemon to tell it we are alive */
3175         ctdb_ctrl_recd_ping(ctdb);
3176
3177         if (rec->election_timeout) {
3178                 /* an election is in progress */
3179                 return;
3180         }
3181
3182         /* read the debug level from the parent and update locally */
3183         ret = ctdb_ctrl_get_debuglevel(ctdb, CTDB_CURRENT_NODE, &debug_level);
3184         if (ret !=0) {
3185                 DEBUG(DEBUG_ERR, (__location__ " Failed to read debuglevel from parent\n"));
3186                 return;
3187         }
3188         LogLevel = debug_level;
3189
3190
3191         /* We must check if we need to ban a node here but we want to do this
3192            as early as possible so we dont wait until we have pulled the node
3193            map from the local node. thats why we have the hardcoded value 20
3194         */
3195         for (i=0; i<ctdb->num_nodes; i++) {
3196                 struct ctdb_banning_state *ban_state;
3197
3198                 if (ctdb->nodes[i]->ban_state == NULL) {
3199                         continue;
3200                 }
3201                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
3202                 if (ban_state->count < 20) {
3203                         continue;
3204                 }
3205                 DEBUG(DEBUG_NOTICE,("Node %u has caused %u recoveries recently - banning it for %u seconds\n",
3206                         ctdb->nodes[i]->pnn, ban_state->count,
3207                         ctdb->tunable.recovery_ban_period));
3208                 ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
3209                 ban_state->count = 0;
3210         }
3211
3212         /* get relevant tunables */
3213         ret = ctdb_ctrl_get_all_tunables(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->tunable);
3214         if (ret != 0) {
3215                 DEBUG(DEBUG_ERR,("Failed to get tunables - retrying\n"));
3216                 return;
3217         }
3218
3219         /* get the current recovery lock file from the server */
3220         if (update_recovery_lock_file(ctdb) != 0) {
3221                 DEBUG(DEBUG_ERR,("Failed to update the recovery lock file\n"));
3222                 return;
3223         }
3224
3225         /* Make sure that if recovery lock verification becomes disabled when
3226            we close the file
3227         */
3228         if (ctdb->tunable.verify_recovery_lock == 0) {
3229                 if (ctdb->recovery_lock_fd != -1) {
3230                         close(ctdb->recovery_lock_fd);
3231                         ctdb->recovery_lock_fd = -1;
3232                 }
3233         }
3234
3235         pnn = ctdb_ctrl_getpnn(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
3236         if (pnn == (uint32_t)-1) {
3237                 DEBUG(DEBUG_ERR,("Failed to get local pnn - retrying\n"));
3238                 return;
3239         }
3240
3241         /* get the vnnmap */
3242         ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &vnnmap);
3243         if (ret != 0) {
3244                 DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from node %u\n", pnn));
3245                 return;
3246         }
3247
3248
3249         /* get number of nodes */
3250         if (rec->nodemap) {
3251                 talloc_free(rec->nodemap);
3252                 rec->nodemap = NULL;
3253                 nodemap=NULL;
3254         }
3255         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), pnn, rec, &rec->nodemap);
3256         if (ret != 0) {
3257                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", pnn));
3258                 return;
3259         }
3260         nodemap = rec->nodemap;
3261
3262         /* update the capabilities for all nodes */
3263         ret = update_capabilities(ctdb, nodemap);
3264         if (ret != 0) {
3265                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
3266                 return;
3267         }
3268
3269         /* check which node is the recovery master */
3270         ret = ctdb_ctrl_getrecmaster(ctdb, mem_ctx, CONTROL_TIMEOUT(), pnn, &rec->recmaster);
3271         if (ret != 0) {
3272                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from node %u\n", pnn));
3273                 return;
3274         }
3275
3276         /* if we are not the recmaster we can safely ignore any ip reallocate requests */
3277         if (rec->recmaster != pnn) {
3278                 if (rec->ip_reallocate_ctx != NULL) {
3279                         talloc_free(rec->ip_reallocate_ctx);
3280                         rec->ip_reallocate_ctx = NULL;
3281                         rec->reallocate_callers = NULL;
3282                 }
3283         }
3284
3285         if (rec->recmaster == (uint32_t)-1) {
3286                 DEBUG(DEBUG_NOTICE,(__location__ " Initial recovery master set - forcing election\n"));
3287                 force_election(rec, pnn, nodemap);
3288                 return;
3289         }
3290
3291         /* if the local daemon is STOPPED, we verify that the databases are
3292            also frozen and thet the recmode is set to active 
3293         */
3294         if (nodemap->nodes[pnn].flags & NODE_FLAGS_STOPPED) {
3295                 ret = ctdb_ctrl_getrecmode(ctdb, mem_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->recovery_mode);
3296                 if (ret != 0) {
3297                         DEBUG(DEBUG_ERR,(__location__ " Failed to read recmode from local node\n"));
3298                 }
3299                 if (ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
3300                         DEBUG(DEBUG_ERR,("Node is stopped but recovery mode is not active. Activate recovery mode and lock databases\n"));
3301
3302                         ret = ctdb_ctrl_freeze_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, 1);
3303                         if (ret != 0) {
3304                                 DEBUG(DEBUG_ERR,(__location__ " Failed to freeze node due to node being STOPPED\n"));
3305                                 return;
3306                         }
3307                         ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
3308                         if (ret != 0) {
3309                                 DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode due to node being stopped\n"));
3310
3311                                 return;
3312                         }
3313                         return;
3314                 }
3315         }
3316         /* If the local node is stopped, verify we are not the recmaster 
3317            and yield this role if so
3318         */
3319         if ((nodemap->nodes[pnn].flags & NODE_FLAGS_STOPPED) && (rec->recmaster == pnn)) {
3320                 DEBUG(DEBUG_ERR,("Local node is STOPPED. Yielding recmaster role\n"));
3321                 force_election(rec, pnn, nodemap);
3322                 return;
3323         }
3324         
3325         /*
3326          * if the current recmaster do not have CTDB_CAP_RECMASTER,
3327          * but we have force an election and try to become the new
3328          * recmaster
3329          */
3330         if ((rec->ctdb->nodes[rec->recmaster]->capabilities & CTDB_CAP_RECMASTER) == 0 &&
3331             (rec->ctdb->capabilities & CTDB_CAP_RECMASTER) &&
3332              !(nodemap->nodes[pnn].flags & NODE_FLAGS_INACTIVE)) {
3333                 DEBUG(DEBUG_ERR, (__location__ " Current recmaster node %u does not have CAP_RECMASTER,"
3334                                   " but we (node %u) have - force an election\n",
3335                                   rec->recmaster, pnn));
3336                 force_election(rec, pnn, nodemap);
3337                 return;
3338         }
3339
3340         /* check that we (recovery daemon) and the local ctdb daemon
3341            agrees on whether we are banned or not
3342         */
3343 //qqq
3344
3345         /* remember our own node flags */
3346         rec->node_flags = nodemap->nodes[pnn].flags;
3347
3348         /* count how many active nodes there are */
3349         rec->num_active    = 0;
3350         rec->num_connected = 0;
3351         for (i=0; i<nodemap->num; i++) {
3352                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
3353                         rec->num_active++;
3354                 }
3355                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
3356                         rec->num_connected++;
3357                 }
3358         }
3359
3360
3361         /* verify that the recmaster node is still active */
3362         for (j=0; j<nodemap->num; j++) {
3363                 if (nodemap->nodes[j].pnn==rec->recmaster) {
3364                         break;
3365                 }
3366         }
3367
3368         if (j == nodemap->num) {
3369                 DEBUG(DEBUG_ERR, ("Recmaster node %u not in list. Force reelection\n", rec->recmaster));
3370                 force_election(rec, pnn, nodemap);
3371                 return;
3372         }
3373
3374         /* if recovery master is disconnected we must elect a new recmaster */
3375         if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
3376                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u is disconnected. Force reelection\n", nodemap->nodes[j].pnn));
3377                 force_election(rec, pnn, nodemap);
3378                 return;
3379         }
3380
3381         /* grap the nodemap from the recovery master to check if it is banned */
3382         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
3383                                    mem_ctx, &recmaster_nodemap);
3384         if (ret != 0) {
3385                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from recovery master %u\n", 
3386                           nodemap->nodes[j].pnn));
3387                 return;
3388         }
3389
3390
3391         if (recmaster_nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3392                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u no longer available. Force reelection\n", nodemap->nodes[j].pnn));
3393                 force_election(rec, pnn, nodemap);
3394                 return;
3395         }
3396
3397
3398         /* verify that we have all ip addresses we should have and we dont
3399          * have addresses we shouldnt have.
3400          */ 
3401         if (ctdb->tunable.disable_ip_failover == 0) {
3402                 if (rec->ip_check_disable_ctx == NULL) {
3403                         if (verify_local_ip_allocation(ctdb, rec, pnn, nodemap) != 0) {
3404                                 DEBUG(DEBUG_ERR, (__location__ " Public IPs were inconsistent.\n"));
3405                         }
3406                 }
3407         }
3408
3409
3410         /* if we are not the recmaster then we do not need to check
3411            if recovery is needed
3412          */
3413         if (pnn != rec->recmaster) {
3414                 return;
3415         }
3416
3417
3418         /* ensure our local copies of flags are right */
3419         ret = update_local_flags(rec, nodemap);
3420         if (ret == MONITOR_ELECTION_NEEDED) {
3421                 DEBUG(DEBUG_NOTICE,("update_local_flags() called for a re-election.\n"));
3422                 force_election(rec, pnn, nodemap);
3423                 return;
3424         }
3425         if (ret != MONITOR_OK) {
3426                 DEBUG(DEBUG_ERR,("Unable to update local flags\n"));
3427                 return;
3428         }
3429
3430         if (ctdb->num_nodes != nodemap->num) {
3431                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) reloading nodes file\n", ctdb->num_nodes, nodemap->num));
3432                 reload_nodes_file(ctdb);
3433                 return;
3434         }
3435
3436         /* verify that all active nodes agree that we are the recmaster */
3437         switch (verify_recmaster(rec, nodemap, pnn)) {
3438         case MONITOR_RECOVERY_NEEDED:
3439                 /* can not happen */
3440                 return;
3441         case MONITOR_ELECTION_NEEDED:
3442                 force_election(rec, pnn, nodemap);
3443                 return;
3444         case MONITOR_OK:
3445                 break;
3446         case MONITOR_FAILED:
3447                 return;
3448         }
3449
3450
3451         if (rec->need_recovery) {
3452                 /* a previous recovery didn't finish */
3453                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3454                 return;
3455         }
3456
3457         /* verify that all active nodes are in normal mode 
3458            and not in recovery mode 
3459         */
3460         switch (verify_recmode(ctdb, nodemap)) {
3461         case MONITOR_RECOVERY_NEEDED:
3462                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3463                 return;
3464         case MONITOR_FAILED:
3465                 return;
3466         case MONITOR_ELECTION_NEEDED:
3467                 /* can not happen */
3468         case MONITOR_OK:
3469                 break;
3470         }
3471
3472
3473         if (ctdb->tunable.verify_recovery_lock != 0) {
3474                 /* we should have the reclock - check its not stale */
3475                 ret = check_recovery_lock(ctdb);
3476                 if (ret != 0) {
3477                         DEBUG(DEBUG_ERR,("Failed check_recovery_lock. Force a recovery\n"));
3478                         ctdb_set_culprit(rec, ctdb->pnn);
3479                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3480                         return;
3481                 }
3482         }
3483
3484
3485         /* is there a pending reload all ips ? */
3486         if (reload_all_ips_request != NULL) {
3487                 reload_all_ips(ctdb, rec, nodemap, reload_all_ips_request);
3488                 talloc_free(reload_all_ips_request);
3489                 reload_all_ips_request = NULL;
3490         }
3491
3492         /* if there are takeovers requested, perform it and notify the waiters */
3493         if (rec->reallocate_callers) {
3494                 process_ipreallocate_requests(ctdb, rec);
3495         }
3496
3497         /* get the nodemap for all active remote nodes
3498          */
3499         remote_nodemaps = talloc_array(mem_ctx, struct ctdb_node_map *, nodemap->num);
3500         if (remote_nodemaps == NULL) {
3501                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate remote nodemap array\n"));
3502                 return;
3503         }
3504         for(i=0; i<nodemap->num; i++) {
3505                 remote_nodemaps[i] = NULL;
3506         }
3507         if (get_remote_nodemaps(ctdb, mem_ctx, nodemap, remote_nodemaps) != 0) {
3508                 DEBUG(DEBUG_ERR,(__location__ " Failed to read remote nodemaps\n"));
3509                 return;
3510         } 
3511
3512         /* verify that all other nodes have the same nodemap as we have
3513         */
3514         for (j=0; j<nodemap->num; j++) {
3515                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3516                         continue;
3517                 }
3518
3519                 if (remote_nodemaps[j] == NULL) {
3520                         DEBUG(DEBUG_ERR,(__location__ " Did not get a remote nodemap for node %d, restarting monitoring\n", j));
3521                         ctdb_set_culprit(rec, j);
3522
3523                         return;
3524                 }
3525
3526                 /* if the nodes disagree on how many nodes there are
3527                    then this is a good reason to try recovery
3528                  */
3529                 if (remote_nodemaps[j]->num != nodemap->num) {
3530                         DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different node count. %u vs %u of the local node\n",
3531                                   nodemap->nodes[j].pnn, remote_nodemaps[j]->num, nodemap->num));
3532                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3533                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3534                         return;
3535                 }
3536
3537                 /* if the nodes disagree on which nodes exist and are
3538                    active, then that is also a good reason to do recovery
3539                  */
3540                 for (i=0;i<nodemap->num;i++) {
3541                         if (remote_nodemaps[j]->nodes[i].pnn != nodemap->nodes[i].pnn) {
3542                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different nodemap pnn for %d (%u vs %u).\n", 
3543                                           nodemap->nodes[j].pnn, i, 
3544                                           remote_nodemaps[j]->nodes[i].pnn, nodemap->nodes[i].pnn));
3545                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3546                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3547                                             vnnmap);
3548                                 return;
3549                         }
3550                 }
3551
3552                 /* verify the flags are consistent
3553                 */
3554                 for (i=0; i<nodemap->num; i++) {
3555                         if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3556                                 continue;
3557                         }
3558                         
3559                         if (nodemap->nodes[i].flags != remote_nodemaps[j]->nodes[i].flags) {
3560                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different flags for node %u. It has 0x%02x vs our 0x%02x\n", 
3561                                   nodemap->nodes[j].pnn, 
3562                                   nodemap->nodes[i].pnn, 
3563                                   remote_nodemaps[j]->nodes[i].flags,
3564                                   nodemap->nodes[j].flags));
3565                                 if (i == j) {
3566                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from remote node %d for cluster update of its own flags\n", remote_nodemaps[j]->nodes[i].flags, j));
3567                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, remote_nodemaps[j]->nodes[i].flags);
3568                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3569                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3570                                                     vnnmap);
3571                                         return;
3572                                 } else {
3573                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from local recmaster node for cluster update of node %d flags\n", nodemap->nodes[i].flags, i));
3574                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, nodemap->nodes[i].flags);
3575                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3576                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3577                                                     vnnmap);
3578                                         return;
3579                                 }
3580                         }
3581                 }
3582         }
3583
3584
3585         /* there better be the same number of lmasters in the vnn map
3586            as there are active nodes or we will have to do a recovery
3587          */
3588         if (vnnmap->size != rec->num_active) {
3589                 DEBUG(DEBUG_ERR, (__location__ " The vnnmap count is different from the number of active nodes. %u vs %u\n", 
3590                           vnnmap->size, rec->num_active));
3591                 ctdb_set_culprit(rec, ctdb->pnn);
3592                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3593                 return;
3594         }
3595
3596         /* verify that all active nodes in the nodemap also exist in 
3597            the vnnmap.
3598          */
3599         for (j=0; j<nodemap->num; j++) {
3600                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3601                         continue;
3602                 }
3603                 if (nodemap->nodes[j].pnn == pnn) {
3604                         continue;
3605                 }
3606
3607                 for (i=0; i<vnnmap->size; i++) {
3608                         if (vnnmap->map[i] == nodemap->nodes[j].pnn) {
3609                                 break;
3610                         }
3611                 }
3612                 if (i == vnnmap->size) {
3613                         DEBUG(DEBUG_ERR, (__location__ " Node %u is active in the nodemap but did not exist in the vnnmap\n", 
3614                                   nodemap->nodes[j].pnn));
3615                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3616                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3617                         return;
3618                 }
3619         }
3620
3621         
3622         /* verify that all other nodes have the same vnnmap
3623            and are from the same generation
3624          */
3625         for (j=0; j<nodemap->num; j++) {
3626                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3627                         continue;
3628                 }
3629                 if (nodemap->nodes[j].pnn == pnn) {
3630                         continue;
3631                 }
3632
3633                 ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
3634                                           mem_ctx, &remote_vnnmap);
3635                 if (ret != 0) {
3636                         DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from remote node %u\n", 
3637                                   nodemap->nodes[j].pnn));
3638                         return;
3639                 }
3640
3641                 /* verify the vnnmap generation is the same */
3642                 if (vnnmap->generation != remote_vnnmap->generation) {
3643                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different generation of vnnmap. %u vs %u (ours)\n", 
3644                                   nodemap->nodes[j].pnn, remote_vnnmap->generation, vnnmap->generation));
3645                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3646                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3647                         return;
3648                 }
3649
3650                 /* verify the vnnmap size is the same */
3651                 if (vnnmap->size != remote_vnnmap->size) {
3652                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different size of vnnmap. %u vs %u (ours)\n", 
3653                                   nodemap->nodes[j].pnn, remote_vnnmap->size, vnnmap->size));
3654                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3655                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3656                         return;
3657                 }
3658
3659                 /* verify the vnnmap is the same */
3660                 for (i=0;i<vnnmap->size;i++) {
3661                         if (remote_vnnmap->map[i] != vnnmap->map[i]) {
3662                                 DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different vnnmap.\n", 
3663                                           nodemap->nodes[j].pnn));
3664                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3665                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3666                                             vnnmap);
3667                                 return;
3668                         }
3669                 }
3670         }
3671
3672         /* we might need to change who has what IP assigned */
3673         if (rec->need_takeover_run) {
3674                 uint32_t culprit = (uint32_t)-1;
3675
3676                 rec->need_takeover_run = false;
3677
3678                 /* update the list of public ips that a node can handle for
3679                    all connected nodes
3680                 */
3681                 ret = ctdb_reload_remote_public_ips(ctdb, rec, nodemap, &culprit);
3682                 if (ret != 0) {
3683                         DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
3684                                          culprit));
3685                         rec->need_takeover_run = true;
3686                         return;
3687                 }
3688
3689                 /* execute the "startrecovery" event script on all nodes */
3690                 ret = run_startrecovery_eventscript(rec, nodemap);
3691                 if (ret!=0) {
3692                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
3693                         ctdb_set_culprit(rec, ctdb->pnn);
3694                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3695                         return;
3696                 }
3697
3698                 ret = ctdb_takeover_run(ctdb, nodemap);
3699                 if (ret != 0) {
3700                         DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses. Try again later\n"));
3701                         return;
3702                 }
3703
3704                 /* execute the "recovered" event script on all nodes */
3705                 ret = run_recovered_eventscript(ctdb, nodemap, "monitor_cluster");
3706 #if 0
3707 // we cant check whether the event completed successfully
3708 // since this script WILL fail if the node is in recovery mode
3709 // and if that race happens, the code here would just cause a second
3710 // cascading recovery.
3711                 if (ret!=0) {
3712                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Update of public ips failed.\n"));
3713                         ctdb_set_culprit(rec, ctdb->pnn);
3714                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3715                 }
3716 #endif
3717         }
3718 }
3719
3720 /*
3721   the main monitoring loop
3722  */
3723 static void monitor_cluster(struct ctdb_context *ctdb)
3724 {
3725         struct ctdb_recoverd *rec;
3726
3727         DEBUG(DEBUG_NOTICE,("monitor_cluster starting\n"));
3728
3729         rec = talloc_zero(ctdb, struct ctdb_recoverd);
3730         CTDB_NO_MEMORY_FATAL(ctdb, rec);
3731
3732         rec->ctdb = ctdb;
3733
3734         rec->priority_time = timeval_current();
3735
3736         /* register a message port for sending memory dumps */
3737         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_MEM_DUMP, mem_dump_handler, rec);
3738
3739         /* register a message port for recovery elections */
3740         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RECOVERY, election_handler, rec);
3741
3742         /* when nodes are disabled/enabled */
3743         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_SET_NODE_FLAGS, monitor_handler, rec);
3744
3745         /* when we are asked to puch out a flag change */
3746         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_PUSH_NODE_FLAGS, push_flags_handler, rec);
3747
3748         /* register a message port for vacuum fetch */
3749         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_VACUUM_FETCH, vacuum_fetch_handler, rec);
3750
3751         /* register a message port for reloadnodes  */
3752         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RELOAD_NODES, reload_nodes_handler, rec);
3753
3754         /* register a message port for performing a takeover run */
3755         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_TAKEOVER_RUN, ip_reallocate_handler, rec);
3756
3757         /* register a message port for performing a reload all ips */
3758         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RELOAD_ALL_IPS, ip_reloadall_handler, rec);
3759
3760         /* register a message port for disabling the ip check for a short while */
3761         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_DISABLE_IP_CHECK, disable_ip_check_handler, rec);
3762
3763         /* register a message port for updating the recovery daemons node assignment for an ip */
3764         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RECD_UPDATE_IP, recd_update_ip_handler, rec);
3765
3766         /* register a message port for forcing a rebalance of a node next
3767            reallocation */
3768         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_REBALANCE_NODE, recd_node_rebalance_handler, rec);
3769
3770         for (;;) {
3771                 TALLOC_CTX *mem_ctx = talloc_new(ctdb);
3772                 struct timeval start;
3773                 double elapsed;
3774
3775                 if (!mem_ctx) {
3776                         DEBUG(DEBUG_CRIT,(__location__
3777                                           " Failed to create temp context\n"));
3778                         exit(-1);
3779                 }
3780
3781                 start = timeval_current();
3782                 main_loop(ctdb, rec, mem_ctx);
3783                 talloc_free(mem_ctx);
3784
3785                 /* we only check for recovery once every second */
3786                 elapsed = timeval_elapsed(&start);
3787                 if (elapsed < ctdb->tunable.recover_interval) {
3788                         ctdb_wait_timeout(ctdb, ctdb->tunable.recover_interval
3789                                           - elapsed);
3790                 }
3791         }
3792 }
3793
3794 /*
3795   event handler for when the main ctdbd dies
3796  */
3797 static void ctdb_recoverd_parent(struct event_context *ev, struct fd_event *fde, 
3798                                  uint16_t flags, void *private_data)
3799 {
3800         DEBUG(DEBUG_ALERT,("recovery daemon parent died - exiting\n"));
3801         _exit(1);
3802 }
3803
3804 /*
3805   called regularly to verify that the recovery daemon is still running
3806  */
3807 static void ctdb_check_recd(struct event_context *ev, struct timed_event *te, 
3808                               struct timeval yt, void *p)
3809 {
3810         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
3811
3812         if (kill(ctdb->recoverd_pid, 0) != 0) {
3813                 DEBUG(DEBUG_ERR,("Recovery daemon (pid:%d) is no longer running. Trying to restart recovery daemon.\n", (int)ctdb->recoverd_pid));
3814
3815                 event_add_timed(ctdb->ev, ctdb, timeval_zero(), 
3816                                 ctdb_restart_recd, ctdb);
3817
3818                 return;
3819         }
3820
3821         event_add_timed(ctdb->ev, ctdb, 
3822                         timeval_current_ofs(30, 0),
3823                         ctdb_check_recd, ctdb);
3824 }
3825
3826 static void recd_sig_child_handler(struct event_context *ev,
3827         struct signal_event *se, int signum, int count,
3828         void *dont_care, 
3829         void *private_data)
3830 {
3831 //      struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
3832         int status;
3833         pid_t pid = -1;
3834
3835         while (pid != 0) {
3836                 pid = waitpid(-1, &status, WNOHANG);
3837                 if (pid == -1) {
3838                         if (errno != ECHILD) {
3839                                 DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%s(%d)\n", strerror(errno),errno));
3840                         }
3841                         return;
3842                 }
3843                 if (pid > 0) {
3844                         DEBUG(DEBUG_DEBUG, ("RECD SIGCHLD from %d\n", (int)pid));
3845                 }
3846         }
3847 }
3848
3849 /*
3850   startup the recovery daemon as a child of the main ctdb daemon
3851  */
3852 int ctdb_start_recoverd(struct ctdb_context *ctdb)
3853 {
3854         int fd[2];
3855         struct signal_event *se;
3856         struct tevent_fd *fde;
3857
3858         if (pipe(fd) != 0) {
3859                 return -1;
3860         }
3861
3862         ctdb->ctdbd_pid = getpid();
3863
3864         ctdb->recoverd_pid = fork();
3865         if (ctdb->recoverd_pid == -1) {
3866                 return -1;
3867         }
3868         
3869         if (ctdb->recoverd_pid != 0) {
3870                 close(fd[0]);
3871                 event_add_timed(ctdb->ev, ctdb, 
3872                                 timeval_current_ofs(30, 0),
3873                                 ctdb_check_recd, ctdb);
3874                 return 0;
3875         }
3876
3877         close(fd[1]);
3878
3879         srandom(getpid() ^ time(NULL));
3880
3881         if (switch_from_server_to_client(ctdb, "recoverd") != 0) {
3882                 DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch recovery daemon into client mode. shutting down.\n"));
3883                 exit(1);
3884         }
3885
3886         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d to recovery daemon\n", fd[0]));
3887
3888         fde = event_add_fd(ctdb->ev, ctdb, fd[0], EVENT_FD_READ,
3889                      ctdb_recoverd_parent, &fd[0]);     
3890         tevent_fd_set_auto_close(fde);
3891
3892         /* set up a handler to pick up sigchld */
3893         se = event_add_signal(ctdb->ev, ctdb,
3894                                      SIGCHLD, 0,
3895                                      recd_sig_child_handler,
3896                                      ctdb);
3897         if (se == NULL) {
3898                 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD in recovery daemon\n"));
3899                 exit(1);
3900         }
3901
3902         monitor_cluster(ctdb);
3903
3904         DEBUG(DEBUG_ALERT,("ERROR: ctdb_recoverd finished!?\n"));
3905         return -1;
3906 }
3907
3908 /*
3909   shutdown the recovery daemon
3910  */
3911 void ctdb_stop_recoverd(struct ctdb_context *ctdb)
3912 {
3913         if (ctdb->recoverd_pid == 0) {
3914                 return;
3915         }
3916
3917         DEBUG(DEBUG_NOTICE,("Shutting down recovery daemon\n"));
3918         kill(ctdb->recoverd_pid, SIGTERM);
3919 }
3920
3921 static void ctdb_restart_recd(struct event_context *ev, struct timed_event *te, 
3922                        struct timeval t, void *private_data)
3923 {
3924         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
3925
3926         DEBUG(DEBUG_ERR,("Restarting recovery daemon\n"));
3927         ctdb_stop_recoverd(ctdb);
3928         ctdb_start_recoverd(ctdb);
3929 }