recoverd: Don't continue if the current node gets banned
[amitay/ctdb.git] / server / ctdb_recoverd.c
1 /* 
2    ctdb recovery daemon
3
4    Copyright (C) Ronnie Sahlberg  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "system/filesys.h"
22 #include "system/time.h"
23 #include "system/network.h"
24 #include "system/wait.h"
25 #include "popt.h"
26 #include "cmdline.h"
27 #include "../include/ctdb_client.h"
28 #include "../include/ctdb_private.h"
29 #include "db_wrap.h"
30 #include "dlinklist.h"
31
32
33 /* most recent reload all ips request we need to perform during the 
34    next monitoring loop
35 */
36 struct reloadips_all_reply *reload_all_ips_request = NULL;
37
38 /* list of "ctdb ipreallocate" processes to call back when we have
39    finished the takeover run.
40 */
41 struct ip_reallocate_list {
42         struct ip_reallocate_list *next;
43         struct rd_memdump_reply *rd;
44 };
45
46 struct ctdb_banning_state {
47         uint32_t count;
48         struct timeval last_reported_time;
49 };
50
51 /*
52   private state of recovery daemon
53  */
54 struct ctdb_recoverd {
55         struct ctdb_context *ctdb;
56         uint32_t recmaster;
57         uint32_t num_active;
58         uint32_t num_connected;
59         uint32_t last_culprit_node;
60         struct ctdb_node_map *nodemap;
61         struct timeval priority_time;
62         bool need_takeover_run;
63         bool need_recovery;
64         uint32_t node_flags;
65         struct timed_event *send_election_te;
66         struct timed_event *election_timeout;
67         struct vacuum_info *vacuum_info;
68         TALLOC_CTX *ip_reallocate_ctx;
69         struct ip_reallocate_list *reallocate_callers;
70         TALLOC_CTX *ip_check_disable_ctx;
71         struct ctdb_control_get_ifaces *ifaces;
72         TALLOC_CTX *deferred_rebalance_ctx;
73 };
74
75 #define CONTROL_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_timeout, 0)
76 #define MONITOR_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_interval, 0)
77
78 static void ctdb_restart_recd(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data);
79
80 /*
81   ban a node for a period of time
82  */
83 static void ctdb_ban_node(struct ctdb_recoverd *rec, uint32_t pnn, uint32_t ban_time)
84 {
85         int ret;
86         struct ctdb_context *ctdb = rec->ctdb;
87         struct ctdb_ban_time bantime;
88        
89         if (!ctdb_validate_pnn(ctdb, pnn)) {
90                 DEBUG(DEBUG_ERR,("Bad pnn %u in ctdb_ban_node\n", pnn));
91                 return;
92         }
93
94         DEBUG(DEBUG_NOTICE,("Banning node %u for %u seconds\n", pnn, ban_time));
95
96         bantime.pnn  = pnn;
97         bantime.time = ban_time;
98
99         ret = ctdb_ctrl_set_ban(ctdb, CONTROL_TIMEOUT(), pnn, &bantime);
100         if (ret != 0) {
101                 DEBUG(DEBUG_ERR,(__location__ " Failed to ban node %d\n", pnn));
102                 return;
103         }
104
105 }
106
107 enum monitor_result { MONITOR_OK, MONITOR_RECOVERY_NEEDED, MONITOR_ELECTION_NEEDED, MONITOR_FAILED};
108
109
110 /*
111   remember the trouble maker
112  */
113 static void ctdb_set_culprit_count(struct ctdb_recoverd *rec, uint32_t culprit, uint32_t count)
114 {
115         struct ctdb_context *ctdb = talloc_get_type(rec->ctdb, struct ctdb_context);
116         struct ctdb_banning_state *ban_state;
117
118         if (culprit > ctdb->num_nodes) {
119                 DEBUG(DEBUG_ERR,("Trying to set culprit %d but num_nodes is %d\n", culprit, ctdb->num_nodes));
120                 return;
121         }
122
123         /* If we are banned or stopped, do not set other nodes as culprits */
124         if (rec->node_flags & NODE_FLAGS_INACTIVE) {
125                 DEBUG(DEBUG_NOTICE, ("This node is INACTIVE, cannot set culprit node %d\n", culprit));
126                 return;
127         }
128
129         if (ctdb->nodes[culprit]->ban_state == NULL) {
130                 ctdb->nodes[culprit]->ban_state = talloc_zero(ctdb->nodes[culprit], struct ctdb_banning_state);
131                 CTDB_NO_MEMORY_VOID(ctdb, ctdb->nodes[culprit]->ban_state);
132
133                 
134         }
135         ban_state = ctdb->nodes[culprit]->ban_state;
136         if (timeval_elapsed(&ban_state->last_reported_time) > ctdb->tunable.recovery_grace_period) {
137                 /* this was the first time in a long while this node
138                    misbehaved so we will forgive any old transgressions.
139                 */
140                 ban_state->count = 0;
141         }
142
143         ban_state->count += count;
144         ban_state->last_reported_time = timeval_current();
145         rec->last_culprit_node = culprit;
146 }
147
148 /*
149   remember the trouble maker
150  */
151 static void ctdb_set_culprit(struct ctdb_recoverd *rec, uint32_t culprit)
152 {
153         ctdb_set_culprit_count(rec, culprit, 1);
154 }
155
156
157 /* this callback is called for every node that failed to execute the
158    recovered event
159 */
160 static void recovered_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
161 {
162         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
163
164         DEBUG(DEBUG_ERR, (__location__ " Node %u failed the recovered event. Setting it as recovery fail culprit\n", node_pnn));
165
166         ctdb_set_culprit(rec, node_pnn);
167 }
168
169 /*
170   run the "recovered" eventscript on all nodes
171  */
172 static int run_recovered_eventscript(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, const char *caller)
173 {
174         TALLOC_CTX *tmp_ctx;
175         uint32_t *nodes;
176         struct ctdb_context *ctdb = rec->ctdb;
177
178         tmp_ctx = talloc_new(ctdb);
179         CTDB_NO_MEMORY(ctdb, tmp_ctx);
180
181         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
182         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_END_RECOVERY,
183                                         nodes, 0,
184                                         CONTROL_TIMEOUT(), false, tdb_null,
185                                         NULL, recovered_fail_callback,
186                                         rec) != 0) {
187                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event when called from %s\n", caller));
188
189                 talloc_free(tmp_ctx);
190                 return -1;
191         }
192
193         talloc_free(tmp_ctx);
194         return 0;
195 }
196
197 /* this callback is called for every node that failed to execute the
198    start recovery event
199 */
200 static void startrecovery_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
201 {
202         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
203
204         DEBUG(DEBUG_ERR, (__location__ " Node %u failed the startrecovery event. Setting it as recovery fail culprit\n", node_pnn));
205
206         ctdb_set_culprit(rec, node_pnn);
207 }
208
209 /*
210   run the "startrecovery" eventscript on all nodes
211  */
212 static int run_startrecovery_eventscript(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
213 {
214         TALLOC_CTX *tmp_ctx;
215         uint32_t *nodes;
216         struct ctdb_context *ctdb = rec->ctdb;
217
218         tmp_ctx = talloc_new(ctdb);
219         CTDB_NO_MEMORY(ctdb, tmp_ctx);
220
221         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
222         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_START_RECOVERY,
223                                         nodes, 0,
224                                         CONTROL_TIMEOUT(), false, tdb_null,
225                                         NULL,
226                                         startrecovery_fail_callback,
227                                         rec) != 0) {
228                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event. Recovery failed.\n"));
229                 talloc_free(tmp_ctx);
230                 return -1;
231         }
232
233         talloc_free(tmp_ctx);
234         return 0;
235 }
236
237 static void async_getcap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
238 {
239         if ( (outdata.dsize != sizeof(uint32_t)) || (outdata.dptr == NULL) ) {
240                 DEBUG(DEBUG_ERR, (__location__ " Invalid length/pointer for getcap callback : %u %p\n",  (unsigned)outdata.dsize, outdata.dptr));
241                 return;
242         }
243         if (node_pnn < ctdb->num_nodes) {
244                 ctdb->nodes[node_pnn]->capabilities = *((uint32_t *)outdata.dptr);
245         }
246
247         if (node_pnn == ctdb->pnn) {
248                 ctdb->capabilities = ctdb->nodes[node_pnn]->capabilities;
249         }
250 }
251
252 /*
253   update the node capabilities for all connected nodes
254  */
255 static int update_capabilities(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
256 {
257         uint32_t *nodes;
258         TALLOC_CTX *tmp_ctx;
259
260         tmp_ctx = talloc_new(ctdb);
261         CTDB_NO_MEMORY(ctdb, tmp_ctx);
262
263         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
264         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_CAPABILITIES,
265                                         nodes, 0,
266                                         CONTROL_TIMEOUT(),
267                                         false, tdb_null,
268                                         async_getcap_callback, NULL,
269                                         NULL) != 0) {
270                 DEBUG(DEBUG_ERR, (__location__ " Failed to read node capabilities.\n"));
271                 talloc_free(tmp_ctx);
272                 return -1;
273         }
274
275         talloc_free(tmp_ctx);
276         return 0;
277 }
278
279 static void set_recmode_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
280 {
281         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
282
283         DEBUG(DEBUG_ERR,("Failed to freeze node %u during recovery. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
284         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
285 }
286
287 static void transaction_start_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
288 {
289         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
290
291         DEBUG(DEBUG_ERR,("Failed to start recovery transaction on node %u. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
292         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
293 }
294
295 /*
296   change recovery mode on all nodes
297  */
298 static int set_recovery_mode(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t rec_mode)
299 {
300         TDB_DATA data;
301         uint32_t *nodes;
302         TALLOC_CTX *tmp_ctx;
303
304         tmp_ctx = talloc_new(ctdb);
305         CTDB_NO_MEMORY(ctdb, tmp_ctx);
306
307         /* freeze all nodes */
308         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
309         if (rec_mode == CTDB_RECOVERY_ACTIVE) {
310                 int i;
311
312                 for (i=1; i<=NUM_DB_PRIORITIES; i++) {
313                         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
314                                                 nodes, i,
315                                                 CONTROL_TIMEOUT(),
316                                                 false, tdb_null,
317                                                 NULL,
318                                                 set_recmode_fail_callback,
319                                                 rec) != 0) {
320                                 DEBUG(DEBUG_ERR, (__location__ " Unable to freeze nodes. Recovery failed.\n"));
321                                 talloc_free(tmp_ctx);
322                                 return -1;
323                         }
324                 }
325         }
326
327
328         data.dsize = sizeof(uint32_t);
329         data.dptr = (unsigned char *)&rec_mode;
330
331         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMODE,
332                                         nodes, 0,
333                                         CONTROL_TIMEOUT(),
334                                         false, data,
335                                         NULL, NULL,
336                                         NULL) != 0) {
337                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode. Recovery failed.\n"));
338                 talloc_free(tmp_ctx);
339                 return -1;
340         }
341
342         talloc_free(tmp_ctx);
343         return 0;
344 }
345
346 /*
347   change recovery master on all node
348  */
349 static int set_recovery_master(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn)
350 {
351         TDB_DATA data;
352         TALLOC_CTX *tmp_ctx;
353         uint32_t *nodes;
354
355         tmp_ctx = talloc_new(ctdb);
356         CTDB_NO_MEMORY(ctdb, tmp_ctx);
357
358         data.dsize = sizeof(uint32_t);
359         data.dptr = (unsigned char *)&pnn;
360
361         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
362         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMASTER,
363                                         nodes, 0,
364                                         CONTROL_TIMEOUT(), false, data,
365                                         NULL, NULL,
366                                         NULL) != 0) {
367                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recmaster. Recovery failed.\n"));
368                 talloc_free(tmp_ctx);
369                 return -1;
370         }
371
372         talloc_free(tmp_ctx);
373         return 0;
374 }
375
376 /* update all remote nodes to use the same db priority that we have
377    this can fail if the remove node has not yet been upgraded to 
378    support this function, so we always return success and never fail
379    a recovery if this call fails.
380 */
381 static int update_db_priority_on_remote_nodes(struct ctdb_context *ctdb,
382         struct ctdb_node_map *nodemap, 
383         uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
384 {
385         int db;
386         uint32_t *nodes;
387
388         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
389
390         /* step through all local databases */
391         for (db=0; db<dbmap->num;db++) {
392                 TDB_DATA data;
393                 struct ctdb_db_priority db_prio;
394                 int ret;
395
396                 db_prio.db_id     = dbmap->dbs[db].dbid;
397                 ret = ctdb_ctrl_get_db_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, dbmap->dbs[db].dbid, &db_prio.priority);
398                 if (ret != 0) {
399                         DEBUG(DEBUG_ERR,(__location__ " Failed to read database priority from local node for db 0x%08x\n", dbmap->dbs[db].dbid));
400                         continue;
401                 }
402
403                 DEBUG(DEBUG_INFO,("Update DB priority for db 0x%08x to %u\n", dbmap->dbs[db].dbid, db_prio.priority)); 
404
405                 data.dptr  = (uint8_t *)&db_prio;
406                 data.dsize = sizeof(db_prio);
407
408                 if (ctdb_client_async_control(ctdb,
409                                         CTDB_CONTROL_SET_DB_PRIORITY,
410                                         nodes, 0,
411                                         CONTROL_TIMEOUT(), false, data,
412                                         NULL, NULL,
413                                         NULL) != 0) {
414                         DEBUG(DEBUG_ERR,(__location__ " Failed to set DB priority for 0x%08x\n", db_prio.db_id));
415                 }
416         }
417
418         return 0;
419 }                       
420
421 /*
422   ensure all other nodes have attached to any databases that we have
423  */
424 static int create_missing_remote_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
425                                            uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
426 {
427         int i, j, db, ret;
428         struct ctdb_dbid_map *remote_dbmap;
429
430         /* verify that all other nodes have all our databases */
431         for (j=0; j<nodemap->num; j++) {
432                 /* we dont need to ourself ourselves */
433                 if (nodemap->nodes[j].pnn == pnn) {
434                         continue;
435                 }
436                 /* dont check nodes that are unavailable */
437                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
438                         continue;
439                 }
440
441                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
442                                          mem_ctx, &remote_dbmap);
443                 if (ret != 0) {
444                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
445                         return -1;
446                 }
447
448                 /* step through all local databases */
449                 for (db=0; db<dbmap->num;db++) {
450                         const char *name;
451
452
453                         for (i=0;i<remote_dbmap->num;i++) {
454                                 if (dbmap->dbs[db].dbid == remote_dbmap->dbs[i].dbid) {
455                                         break;
456                                 }
457                         }
458                         /* the remote node already have this database */
459                         if (i!=remote_dbmap->num) {
460                                 continue;
461                         }
462                         /* ok so we need to create this database */
463                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), pnn, dbmap->dbs[db].dbid, 
464                                             mem_ctx, &name);
465                         if (ret != 0) {
466                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", pnn));
467                                 return -1;
468                         }
469                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
470                                            mem_ctx, name,
471                                            dbmap->dbs[db].flags & CTDB_DB_FLAGS_PERSISTENT);
472                         if (ret != 0) {
473                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create remote db:%s\n", name));
474                                 return -1;
475                         }
476                 }
477         }
478
479         return 0;
480 }
481
482
483 /*
484   ensure we are attached to any databases that anyone else is attached to
485  */
486 static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
487                                           uint32_t pnn, struct ctdb_dbid_map **dbmap, TALLOC_CTX *mem_ctx)
488 {
489         int i, j, db, ret;
490         struct ctdb_dbid_map *remote_dbmap;
491
492         /* verify that we have all database any other node has */
493         for (j=0; j<nodemap->num; j++) {
494                 /* we dont need to ourself ourselves */
495                 if (nodemap->nodes[j].pnn == pnn) {
496                         continue;
497                 }
498                 /* dont check nodes that are unavailable */
499                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
500                         continue;
501                 }
502
503                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
504                                          mem_ctx, &remote_dbmap);
505                 if (ret != 0) {
506                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
507                         return -1;
508                 }
509
510                 /* step through all databases on the remote node */
511                 for (db=0; db<remote_dbmap->num;db++) {
512                         const char *name;
513
514                         for (i=0;i<(*dbmap)->num;i++) {
515                                 if (remote_dbmap->dbs[db].dbid == (*dbmap)->dbs[i].dbid) {
516                                         break;
517                                 }
518                         }
519                         /* we already have this db locally */
520                         if (i!=(*dbmap)->num) {
521                                 continue;
522                         }
523                         /* ok so we need to create this database and
524                            rebuild dbmap
525                          */
526                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
527                                             remote_dbmap->dbs[db].dbid, mem_ctx, &name);
528                         if (ret != 0) {
529                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", 
530                                           nodemap->nodes[j].pnn));
531                                 return -1;
532                         }
533                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, name, 
534                                            remote_dbmap->dbs[db].flags & CTDB_DB_FLAGS_PERSISTENT);
535                         if (ret != 0) {
536                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create local db:%s\n", name));
537                                 return -1;
538                         }
539                         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, dbmap);
540                         if (ret != 0) {
541                                 DEBUG(DEBUG_ERR, (__location__ " Unable to reread dbmap on node %u\n", pnn));
542                                 return -1;
543                         }
544                 }
545         }
546
547         return 0;
548 }
549
550
551 /*
552   pull the remote database contents from one node into the recdb
553  */
554 static int pull_one_remote_database(struct ctdb_context *ctdb, uint32_t srcnode, 
555                                     struct tdb_wrap *recdb, uint32_t dbid)
556 {
557         int ret;
558         TDB_DATA outdata;
559         struct ctdb_marshall_buffer *reply;
560         struct ctdb_rec_data *rec;
561         int i;
562         TALLOC_CTX *tmp_ctx = talloc_new(recdb);
563
564         ret = ctdb_ctrl_pulldb(ctdb, srcnode, dbid, CTDB_LMASTER_ANY, tmp_ctx,
565                                CONTROL_TIMEOUT(), &outdata);
566         if (ret != 0) {
567                 DEBUG(DEBUG_ERR,(__location__ " Unable to copy db from node %u\n", srcnode));
568                 talloc_free(tmp_ctx);
569                 return -1;
570         }
571
572         reply = (struct ctdb_marshall_buffer *)outdata.dptr;
573
574         if (outdata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
575                 DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
576                 talloc_free(tmp_ctx);
577                 return -1;
578         }
579         
580         rec = (struct ctdb_rec_data *)&reply->data[0];
581         
582         for (i=0;
583              i<reply->count;
584              rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec), i++) {
585                 TDB_DATA key, data;
586                 struct ctdb_ltdb_header *hdr;
587                 TDB_DATA existing;
588                 
589                 key.dptr = &rec->data[0];
590                 key.dsize = rec->keylen;
591                 data.dptr = &rec->data[key.dsize];
592                 data.dsize = rec->datalen;
593                 
594                 hdr = (struct ctdb_ltdb_header *)data.dptr;
595
596                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
597                         DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
598                         talloc_free(tmp_ctx);
599                         return -1;
600                 }
601
602                 /* fetch the existing record, if any */
603                 existing = tdb_fetch(recdb->tdb, key);
604                 
605                 if (existing.dptr != NULL) {
606                         struct ctdb_ltdb_header header;
607                         if (existing.dsize < sizeof(struct ctdb_ltdb_header)) {
608                                 DEBUG(DEBUG_CRIT,(__location__ " Bad record size %u from node %u\n", 
609                                          (unsigned)existing.dsize, srcnode));
610                                 free(existing.dptr);
611                                 talloc_free(tmp_ctx);
612                                 return -1;
613                         }
614                         header = *(struct ctdb_ltdb_header *)existing.dptr;
615                         free(existing.dptr);
616                         if (!(header.rsn < hdr->rsn ||
617                               (header.dmaster != ctdb->recovery_master && header.rsn == hdr->rsn))) {
618                                 continue;
619                         }
620                 }
621                 
622                 if (tdb_store(recdb->tdb, key, data, TDB_REPLACE) != 0) {
623                         DEBUG(DEBUG_CRIT,(__location__ " Failed to store record\n"));
624                         talloc_free(tmp_ctx);
625                         return -1;                              
626                 }
627         }
628
629         talloc_free(tmp_ctx);
630
631         return 0;
632 }
633
634
635 struct pull_seqnum_cbdata {
636         int failed;
637         uint32_t pnn;
638         uint64_t seqnum;
639 };
640
641 static void pull_seqnum_cb(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
642 {
643         struct pull_seqnum_cbdata *cb_data = talloc_get_type(callback_data, struct pull_seqnum_cbdata);
644         uint64_t seqnum;
645
646         if (cb_data->failed != 0) {
647                 DEBUG(DEBUG_ERR, ("Got seqnum from node %d but we have already failed the entire operation\n", node_pnn));
648                 return;
649         }
650
651         if (res != 0) {
652                 DEBUG(DEBUG_ERR, ("Error when pulling seqnum from node %d\n", node_pnn));
653                 cb_data->failed = 1;
654                 return;
655         }
656
657         if (outdata.dsize != sizeof(uint64_t)) {
658                 DEBUG(DEBUG_ERR, ("Error when reading pull seqnum from node %d, got %d bytes but expected %d\n", node_pnn, (int)outdata.dsize, (int)sizeof(uint64_t)));
659                 cb_data->failed = -1;
660                 return;
661         }
662
663         seqnum = *((uint64_t *)outdata.dptr);
664
665         if (seqnum > cb_data->seqnum) {
666                 cb_data->seqnum = seqnum;
667                 cb_data->pnn = node_pnn;
668         }
669 }
670
671 static void pull_seqnum_fail_cb(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
672 {
673         struct pull_seqnum_cbdata *cb_data = talloc_get_type(callback_data, struct pull_seqnum_cbdata);
674
675         DEBUG(DEBUG_ERR, ("Failed to pull db seqnum from node %d\n", node_pnn));
676         cb_data->failed = 1;
677 }
678
679 static int pull_highest_seqnum_pdb(struct ctdb_context *ctdb,
680                                 struct ctdb_recoverd *rec, 
681                                 struct ctdb_node_map *nodemap, 
682                                 struct tdb_wrap *recdb, uint32_t dbid)
683 {
684         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
685         uint32_t *nodes;
686         TDB_DATA data;
687         uint32_t outdata[2];
688         struct pull_seqnum_cbdata *cb_data;
689
690         DEBUG(DEBUG_NOTICE, ("Scan for highest seqnum pdb for db:0x%08x\n", dbid));
691
692         outdata[0] = dbid;
693         outdata[1] = 0;
694
695         data.dsize = sizeof(outdata);
696         data.dptr  = (uint8_t *)&outdata[0];
697
698         cb_data = talloc(tmp_ctx, struct pull_seqnum_cbdata);
699         if (cb_data == NULL) {
700                 DEBUG(DEBUG_ERR, ("Failed to allocate pull highest seqnum cb_data structure\n"));
701                 talloc_free(tmp_ctx);
702                 return -1;
703         }
704
705         cb_data->failed = 0;
706         cb_data->pnn    = -1;
707         cb_data->seqnum = 0;
708         
709         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
710         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_DB_SEQNUM,
711                                         nodes, 0,
712                                         CONTROL_TIMEOUT(), false, data,
713                                         pull_seqnum_cb,
714                                         pull_seqnum_fail_cb,
715                                         cb_data) != 0) {
716                 DEBUG(DEBUG_ERR, (__location__ " Failed to run async GET_DB_SEQNUM\n"));
717
718                 talloc_free(tmp_ctx);
719                 return -1;
720         }
721
722         if (cb_data->failed != 0) {
723                 DEBUG(DEBUG_NOTICE, ("Failed to pull sequence numbers for DB 0x%08x\n", dbid));
724                 talloc_free(tmp_ctx);
725                 return -1;
726         }
727
728         if (cb_data->seqnum == 0 || cb_data->pnn == -1) {
729                 DEBUG(DEBUG_NOTICE, ("Failed to find a node with highest sequence numbers for DB 0x%08x\n", dbid));
730                 talloc_free(tmp_ctx);
731                 return -1;
732         }
733
734         DEBUG(DEBUG_NOTICE, ("Pull persistent db:0x%08x from node %d with highest seqnum:%lld\n", dbid, cb_data->pnn, (long long)cb_data->seqnum)); 
735
736         if (pull_one_remote_database(ctdb, cb_data->pnn, recdb, dbid) != 0) {
737                 DEBUG(DEBUG_ERR, ("Failed to pull higest seqnum database 0x%08x from node %d\n", dbid, cb_data->pnn));
738                 talloc_free(tmp_ctx);
739                 return -1;
740         }
741
742         talloc_free(tmp_ctx);
743         return 0;
744 }
745
746
747 /*
748   pull all the remote database contents into the recdb
749  */
750 static int pull_remote_database(struct ctdb_context *ctdb,
751                                 struct ctdb_recoverd *rec, 
752                                 struct ctdb_node_map *nodemap, 
753                                 struct tdb_wrap *recdb, uint32_t dbid,
754                                 bool persistent)
755 {
756         int j;
757
758         if (persistent && ctdb->tunable.recover_pdb_by_seqnum != 0) {
759                 int ret;
760                 ret = pull_highest_seqnum_pdb(ctdb, rec, nodemap, recdb, dbid);
761                 if (ret == 0) {
762                         return 0;
763                 }
764         }
765
766         /* pull all records from all other nodes across onto this node
767            (this merges based on rsn)
768         */
769         for (j=0; j<nodemap->num; j++) {
770                 /* dont merge from nodes that are unavailable */
771                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
772                         continue;
773                 }
774                 if (pull_one_remote_database(ctdb, nodemap->nodes[j].pnn, recdb, dbid) != 0) {
775                         DEBUG(DEBUG_ERR,(__location__ " Failed to pull remote database from node %u\n", 
776                                  nodemap->nodes[j].pnn));
777                         ctdb_set_culprit_count(rec, nodemap->nodes[j].pnn, nodemap->num);
778                         return -1;
779                 }
780         }
781         
782         return 0;
783 }
784
785
786 /*
787   update flags on all active nodes
788  */
789 static int update_flags_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn, uint32_t flags)
790 {
791         int ret;
792
793         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, flags, ~flags);
794                 if (ret != 0) {
795                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
796                 return -1;
797         }
798
799         return 0;
800 }
801
802 /*
803   ensure all nodes have the same vnnmap we do
804  */
805 static int update_vnnmap_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
806                                       uint32_t pnn, struct ctdb_vnn_map *vnnmap, TALLOC_CTX *mem_ctx)
807 {
808         int j, ret;
809
810         /* push the new vnn map out to all the nodes */
811         for (j=0; j<nodemap->num; j++) {
812                 /* dont push to nodes that are unavailable */
813                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
814                         continue;
815                 }
816
817                 ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, mem_ctx, vnnmap);
818                 if (ret != 0) {
819                         DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
820                         return -1;
821                 }
822         }
823
824         return 0;
825 }
826
827
828 struct vacuum_info {
829         struct vacuum_info *next, *prev;
830         struct ctdb_recoverd *rec;
831         uint32_t srcnode;
832         struct ctdb_db_context *ctdb_db;
833         struct ctdb_marshall_buffer *recs;
834         struct ctdb_rec_data *r;
835 };
836
837 static void vacuum_fetch_next(struct vacuum_info *v);
838
839 /*
840   called when a vacuum fetch has completed - just free it and do the next one
841  */
842 static void vacuum_fetch_callback(struct ctdb_client_call_state *state)
843 {
844         struct vacuum_info *v = talloc_get_type(state->async.private_data, struct vacuum_info);
845         talloc_free(state);
846         vacuum_fetch_next(v);
847 }
848
849
850 /*
851   process the next element from the vacuum list
852 */
853 static void vacuum_fetch_next(struct vacuum_info *v)
854 {
855         struct ctdb_call call;
856         struct ctdb_rec_data *r;
857
858         while (v->recs->count) {
859                 struct ctdb_client_call_state *state;
860                 TDB_DATA data;
861                 struct ctdb_ltdb_header *hdr;
862
863                 ZERO_STRUCT(call);
864                 call.call_id = CTDB_NULL_FUNC;
865                 call.flags = CTDB_IMMEDIATE_MIGRATION;
866                 call.flags |= CTDB_CALL_FLAG_VACUUM_MIGRATION;
867
868                 r = v->r;
869                 v->r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
870                 v->recs->count--;
871
872                 call.key.dptr = &r->data[0];
873                 call.key.dsize = r->keylen;
874
875                 /* ensure we don't block this daemon - just skip a record if we can't get
876                    the chainlock */
877                 if (tdb_chainlock_nonblock(v->ctdb_db->ltdb->tdb, call.key) != 0) {
878                         continue;
879                 }
880
881                 data = tdb_fetch(v->ctdb_db->ltdb->tdb, call.key);
882                 if (data.dptr == NULL) {
883                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
884                         continue;
885                 }
886
887                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
888                         free(data.dptr);
889                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
890                         continue;
891                 }
892                 
893                 hdr = (struct ctdb_ltdb_header *)data.dptr;
894                 if (hdr->dmaster == v->rec->ctdb->pnn) {
895                         /* its already local */
896                         free(data.dptr);
897                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
898                         continue;
899                 }
900
901                 free(data.dptr);
902
903                 state = ctdb_call_send(v->ctdb_db, &call);
904                 tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
905                 if (state == NULL) {
906                         DEBUG(DEBUG_ERR,(__location__ " Failed to setup vacuum fetch call\n"));
907                         talloc_free(v);
908                         return;
909                 }
910                 state->async.fn = vacuum_fetch_callback;
911                 state->async.private_data = v;
912                 return;
913         }
914
915         talloc_free(v);
916 }
917
918
919 /*
920   destroy a vacuum info structure
921  */
922 static int vacuum_info_destructor(struct vacuum_info *v)
923 {
924         DLIST_REMOVE(v->rec->vacuum_info, v);
925         return 0;
926 }
927
928
929 /*
930   handler for vacuum fetch
931 */
932 static void vacuum_fetch_handler(struct ctdb_context *ctdb, uint64_t srvid, 
933                                  TDB_DATA data, void *private_data)
934 {
935         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
936         struct ctdb_marshall_buffer *recs;
937         int ret, i;
938         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
939         const char *name;
940         struct ctdb_dbid_map *dbmap=NULL;
941         bool persistent = false;
942         struct ctdb_db_context *ctdb_db;
943         struct ctdb_rec_data *r;
944         uint32_t srcnode;
945         struct vacuum_info *v;
946
947         recs = (struct ctdb_marshall_buffer *)data.dptr;
948         r = (struct ctdb_rec_data *)&recs->data[0];
949
950         if (recs->count == 0) {
951                 talloc_free(tmp_ctx);
952                 return;
953         }
954
955         srcnode = r->reqid;
956
957         for (v=rec->vacuum_info;v;v=v->next) {
958                 if (srcnode == v->srcnode && recs->db_id == v->ctdb_db->db_id) {
959                         /* we're already working on records from this node */
960                         talloc_free(tmp_ctx);
961                         return;
962                 }
963         }
964
965         /* work out if the database is persistent */
966         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &dbmap);
967         if (ret != 0) {
968                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from local node\n"));
969                 talloc_free(tmp_ctx);
970                 return;
971         }
972
973         for (i=0;i<dbmap->num;i++) {
974                 if (dbmap->dbs[i].dbid == recs->db_id) {
975                         persistent = dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT;
976                         break;
977                 }
978         }
979         if (i == dbmap->num) {
980                 DEBUG(DEBUG_ERR, (__location__ " Unable to find db_id 0x%x on local node\n", recs->db_id));
981                 talloc_free(tmp_ctx);
982                 return;         
983         }
984
985         /* find the name of this database */
986         if (ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, recs->db_id, tmp_ctx, &name) != 0) {
987                 DEBUG(DEBUG_ERR,(__location__ " Failed to get name of db 0x%x\n", recs->db_id));
988                 talloc_free(tmp_ctx);
989                 return;
990         }
991
992         /* attach to it */
993         ctdb_db = ctdb_attach(ctdb, CONTROL_TIMEOUT(), name, persistent, 0);
994         if (ctdb_db == NULL) {
995                 DEBUG(DEBUG_ERR,(__location__ " Failed to attach to database '%s'\n", name));
996                 talloc_free(tmp_ctx);
997                 return;
998         }
999
1000         v = talloc_zero(rec, struct vacuum_info);
1001         if (v == NULL) {
1002                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
1003                 talloc_free(tmp_ctx);
1004                 return;
1005         }
1006
1007         v->rec = rec;
1008         v->srcnode = srcnode;
1009         v->ctdb_db = ctdb_db;
1010         v->recs = talloc_memdup(v, recs, data.dsize);
1011         if (v->recs == NULL) {
1012                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
1013                 talloc_free(v);
1014                 talloc_free(tmp_ctx);
1015                 return;         
1016         }
1017         v->r =  (struct ctdb_rec_data *)&v->recs->data[0];
1018
1019         DLIST_ADD(rec->vacuum_info, v);
1020
1021         talloc_set_destructor(v, vacuum_info_destructor);
1022
1023         vacuum_fetch_next(v);
1024         talloc_free(tmp_ctx);
1025 }
1026
1027
1028 /*
1029   called when ctdb_wait_timeout should finish
1030  */
1031 static void ctdb_wait_handler(struct event_context *ev, struct timed_event *te, 
1032                               struct timeval yt, void *p)
1033 {
1034         uint32_t *timed_out = (uint32_t *)p;
1035         (*timed_out) = 1;
1036 }
1037
1038 /*
1039   wait for a given number of seconds
1040  */
1041 static void ctdb_wait_timeout(struct ctdb_context *ctdb, double secs)
1042 {
1043         uint32_t timed_out = 0;
1044         time_t usecs = (secs - (time_t)secs) * 1000000;
1045         event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(secs, usecs), ctdb_wait_handler, &timed_out);
1046         while (!timed_out) {
1047                 event_loop_once(ctdb->ev);
1048         }
1049 }
1050
1051 /*
1052   called when an election times out (ends)
1053  */
1054 static void ctdb_election_timeout(struct event_context *ev, struct timed_event *te, 
1055                                   struct timeval t, void *p)
1056 {
1057         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1058         rec->election_timeout = NULL;
1059         fast_start = false;
1060
1061         DEBUG(DEBUG_WARNING,(__location__ " Election timed out\n"));
1062 }
1063
1064
1065 /*
1066   wait for an election to finish. It finished election_timeout seconds after
1067   the last election packet is received
1068  */
1069 static void ctdb_wait_election(struct ctdb_recoverd *rec)
1070 {
1071         struct ctdb_context *ctdb = rec->ctdb;
1072         while (rec->election_timeout) {
1073                 event_loop_once(ctdb->ev);
1074         }
1075 }
1076
1077 /*
1078   Update our local flags from all remote connected nodes. 
1079   This is only run when we are or we belive we are the recovery master
1080  */
1081 static int update_local_flags(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
1082 {
1083         int j;
1084         struct ctdb_context *ctdb = rec->ctdb;
1085         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
1086
1087         /* get the nodemap for all active remote nodes and verify
1088            they are the same as for this node
1089          */
1090         for (j=0; j<nodemap->num; j++) {
1091                 struct ctdb_node_map *remote_nodemap=NULL;
1092                 int ret;
1093
1094                 if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
1095                         continue;
1096                 }
1097                 if (nodemap->nodes[j].pnn == ctdb->pnn) {
1098                         continue;
1099                 }
1100
1101                 ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
1102                                            mem_ctx, &remote_nodemap);
1103                 if (ret != 0) {
1104                         DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from remote node %u\n", 
1105                                   nodemap->nodes[j].pnn));
1106                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
1107                         talloc_free(mem_ctx);
1108                         return MONITOR_FAILED;
1109                 }
1110                 if (nodemap->nodes[j].flags != remote_nodemap->nodes[j].flags) {
1111                         /* We should tell our daemon about this so it
1112                            updates its flags or else we will log the same 
1113                            message again in the next iteration of recovery.
1114                            Since we are the recovery master we can just as
1115                            well update the flags on all nodes.
1116                         */
1117                         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags, ~remote_nodemap->nodes[j].flags);
1118                         if (ret != 0) {
1119                                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
1120                                 return -1;
1121                         }
1122
1123                         /* Update our local copy of the flags in the recovery
1124                            daemon.
1125                         */
1126                         DEBUG(DEBUG_NOTICE,("Remote node %u had flags 0x%x, local had 0x%x - updating local\n",
1127                                  nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags,
1128                                  nodemap->nodes[j].flags));
1129                         nodemap->nodes[j].flags = remote_nodemap->nodes[j].flags;
1130                 }
1131                 talloc_free(remote_nodemap);
1132         }
1133         talloc_free(mem_ctx);
1134         return MONITOR_OK;
1135 }
1136
1137
1138 /* Create a new random generation ip. 
1139    The generation id can not be the INVALID_GENERATION id
1140 */
1141 static uint32_t new_generation(void)
1142 {
1143         uint32_t generation;
1144
1145         while (1) {
1146                 generation = random();
1147
1148                 if (generation != INVALID_GENERATION) {
1149                         break;
1150                 }
1151         }
1152
1153         return generation;
1154 }
1155
1156
1157 /*
1158   create a temporary working database
1159  */
1160 static struct tdb_wrap *create_recdb(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx)
1161 {
1162         char *name;
1163         struct tdb_wrap *recdb;
1164         unsigned tdb_flags;
1165
1166         /* open up the temporary recovery database */
1167         name = talloc_asprintf(mem_ctx, "%s/recdb.tdb.%u",
1168                                ctdb->db_directory_state,
1169                                ctdb->pnn);
1170         if (name == NULL) {
1171                 return NULL;
1172         }
1173         unlink(name);
1174
1175         tdb_flags = TDB_NOLOCK;
1176         if (ctdb->valgrinding) {
1177                 tdb_flags |= TDB_NOMMAP;
1178         }
1179         tdb_flags |= TDB_DISALLOW_NESTING;
1180
1181         recdb = tdb_wrap_open(mem_ctx, name, ctdb->tunable.database_hash_size, 
1182                               tdb_flags, O_RDWR|O_CREAT|O_EXCL, 0600);
1183         if (recdb == NULL) {
1184                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create temp recovery database '%s'\n", name));
1185         }
1186
1187         talloc_free(name);
1188
1189         return recdb;
1190 }
1191
1192
1193 /* 
1194    a traverse function for pulling all relevant records from recdb
1195  */
1196 struct recdb_data {
1197         struct ctdb_context *ctdb;
1198         struct ctdb_marshall_buffer *recdata;
1199         uint32_t len;
1200         uint32_t allocated_len;
1201         bool failed;
1202         bool persistent;
1203 };
1204
1205 static int traverse_recdb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
1206 {
1207         struct recdb_data *params = (struct recdb_data *)p;
1208         struct ctdb_rec_data *rec;
1209         struct ctdb_ltdb_header *hdr;
1210
1211         /*
1212          * skip empty records - but NOT for persistent databases:
1213          *
1214          * The record-by-record mode of recovery deletes empty records.
1215          * For persistent databases, this can lead to data corruption
1216          * by deleting records that should be there:
1217          *
1218          * - Assume the cluster has been running for a while.
1219          *
1220          * - A record R in a persistent database has been created and
1221          *   deleted a couple of times, the last operation being deletion,
1222          *   leaving an empty record with a high RSN, say 10.
1223          *
1224          * - Now a node N is turned off.
1225          *
1226          * - This leaves the local database copy of D on N with the empty
1227          *   copy of R and RSN 10. On all other nodes, the recovery has deleted
1228          *   the copy of record R.
1229          *
1230          * - Now the record is created again while node N is turned off.
1231          *   This creates R with RSN = 1 on all nodes except for N.
1232          *
1233          * - Now node N is turned on again. The following recovery will chose
1234          *   the older empty copy of R due to RSN 10 > RSN 1.
1235          *
1236          * ==> Hence the record is gone after the recovery.
1237          *
1238          * On databases like Samba's registry, this can damage the higher-level
1239          * data structures built from the various tdb-level records.
1240          */
1241         if (!params->persistent && data.dsize <= sizeof(struct ctdb_ltdb_header)) {
1242                 return 0;
1243         }
1244
1245         /* update the dmaster field to point to us */
1246         hdr = (struct ctdb_ltdb_header *)data.dptr;
1247         if (!params->persistent) {
1248                 hdr->dmaster = params->ctdb->pnn;
1249                 hdr->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
1250         }
1251
1252         /* add the record to the blob ready to send to the nodes */
1253         rec = ctdb_marshall_record(params->recdata, 0, key, NULL, data);
1254         if (rec == NULL) {
1255                 params->failed = true;
1256                 return -1;
1257         }
1258         if (params->len + rec->length >= params->allocated_len) {
1259                 params->allocated_len = rec->length + params->len + params->ctdb->tunable.pulldb_preallocation_size;
1260                 params->recdata = talloc_realloc_size(NULL, params->recdata, params->allocated_len);
1261         }
1262         if (params->recdata == NULL) {
1263                 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata to %u (%u records)\n", 
1264                          rec->length + params->len, params->recdata->count));
1265                 params->failed = true;
1266                 return -1;
1267         }
1268         params->recdata->count++;
1269         memcpy(params->len+(uint8_t *)params->recdata, rec, rec->length);
1270         params->len += rec->length;
1271         talloc_free(rec);
1272
1273         return 0;
1274 }
1275
1276 /*
1277   push the recdb database out to all nodes
1278  */
1279 static int push_recdb_database(struct ctdb_context *ctdb, uint32_t dbid,
1280                                bool persistent,
1281                                struct tdb_wrap *recdb, struct ctdb_node_map *nodemap)
1282 {
1283         struct recdb_data params;
1284         struct ctdb_marshall_buffer *recdata;
1285         TDB_DATA outdata;
1286         TALLOC_CTX *tmp_ctx;
1287         uint32_t *nodes;
1288
1289         tmp_ctx = talloc_new(ctdb);
1290         CTDB_NO_MEMORY(ctdb, tmp_ctx);
1291
1292         recdata = talloc_zero(recdb, struct ctdb_marshall_buffer);
1293         CTDB_NO_MEMORY(ctdb, recdata);
1294
1295         recdata->db_id = dbid;
1296
1297         params.ctdb = ctdb;
1298         params.recdata = recdata;
1299         params.len = offsetof(struct ctdb_marshall_buffer, data);
1300         params.allocated_len = params.len;
1301         params.failed = false;
1302         params.persistent = persistent;
1303
1304         if (tdb_traverse_read(recdb->tdb, traverse_recdb, &params) == -1) {
1305                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1306                 talloc_free(params.recdata);
1307                 talloc_free(tmp_ctx);
1308                 return -1;
1309         }
1310
1311         if (params.failed) {
1312                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1313                 talloc_free(params.recdata);
1314                 talloc_free(tmp_ctx);
1315                 return -1;              
1316         }
1317
1318         recdata = params.recdata;
1319
1320         outdata.dptr = (void *)recdata;
1321         outdata.dsize = params.len;
1322
1323         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
1324         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_PUSH_DB,
1325                                         nodes, 0,
1326                                         CONTROL_TIMEOUT(), false, outdata,
1327                                         NULL, NULL,
1328                                         NULL) != 0) {
1329                 DEBUG(DEBUG_ERR,(__location__ " Failed to push recdb records to nodes for db 0x%x\n", dbid));
1330                 talloc_free(recdata);
1331                 talloc_free(tmp_ctx);
1332                 return -1;
1333         }
1334
1335         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pushed remote database 0x%x of size %u\n", 
1336                   dbid, recdata->count));
1337
1338         talloc_free(recdata);
1339         talloc_free(tmp_ctx);
1340
1341         return 0;
1342 }
1343
1344
1345 /*
1346   go through a full recovery on one database 
1347  */
1348 static int recover_database(struct ctdb_recoverd *rec, 
1349                             TALLOC_CTX *mem_ctx,
1350                             uint32_t dbid,
1351                             bool persistent,
1352                             uint32_t pnn, 
1353                             struct ctdb_node_map *nodemap,
1354                             uint32_t transaction_id)
1355 {
1356         struct tdb_wrap *recdb;
1357         int ret;
1358         struct ctdb_context *ctdb = rec->ctdb;
1359         TDB_DATA data;
1360         struct ctdb_control_wipe_database w;
1361         uint32_t *nodes;
1362
1363         recdb = create_recdb(ctdb, mem_ctx);
1364         if (recdb == NULL) {
1365                 return -1;
1366         }
1367
1368         /* pull all remote databases onto the recdb */
1369         ret = pull_remote_database(ctdb, rec, nodemap, recdb, dbid, persistent);
1370         if (ret != 0) {
1371                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull remote database 0x%x\n", dbid));
1372                 return -1;
1373         }
1374
1375         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pulled remote database 0x%x\n", dbid));
1376
1377         /* wipe all the remote databases. This is safe as we are in a transaction */
1378         w.db_id = dbid;
1379         w.transaction_id = transaction_id;
1380
1381         data.dptr = (void *)&w;
1382         data.dsize = sizeof(w);
1383
1384         nodes = list_of_active_nodes(ctdb, nodemap, recdb, true);
1385         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
1386                                         nodes, 0,
1387                                         CONTROL_TIMEOUT(), false, data,
1388                                         NULL, NULL,
1389                                         NULL) != 0) {
1390                 DEBUG(DEBUG_ERR, (__location__ " Unable to wipe database. Recovery failed.\n"));
1391                 talloc_free(recdb);
1392                 return -1;
1393         }
1394         
1395         /* push out the correct database. This sets the dmaster and skips 
1396            the empty records */
1397         ret = push_recdb_database(ctdb, dbid, persistent, recdb, nodemap);
1398         if (ret != 0) {
1399                 talloc_free(recdb);
1400                 return -1;
1401         }
1402
1403         /* all done with this database */
1404         talloc_free(recdb);
1405
1406         return 0;
1407 }
1408
1409 /*
1410   reload the nodes file 
1411 */
1412 static void reload_nodes_file(struct ctdb_context *ctdb)
1413 {
1414         ctdb->nodes = NULL;
1415         ctdb_load_nodes_file(ctdb);
1416 }
1417
1418 static int ctdb_reload_remote_public_ips(struct ctdb_context *ctdb,
1419                                          struct ctdb_recoverd *rec,
1420                                          struct ctdb_node_map *nodemap,
1421                                          uint32_t *culprit)
1422 {
1423         int j;
1424         int ret;
1425
1426         if (ctdb->num_nodes != nodemap->num) {
1427                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) invalid param\n",
1428                                   ctdb->num_nodes, nodemap->num));
1429                 if (culprit) {
1430                         *culprit = ctdb->pnn;
1431                 }
1432                 return -1;
1433         }
1434
1435         for (j=0; j<nodemap->num; j++) {
1436                 /* release any existing data */
1437                 if (ctdb->nodes[j]->known_public_ips) {
1438                         talloc_free(ctdb->nodes[j]->known_public_ips);
1439                         ctdb->nodes[j]->known_public_ips = NULL;
1440                 }
1441                 if (ctdb->nodes[j]->available_public_ips) {
1442                         talloc_free(ctdb->nodes[j]->available_public_ips);
1443                         ctdb->nodes[j]->available_public_ips = NULL;
1444                 }
1445
1446                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1447                         continue;
1448                 }
1449
1450                 /* grab a new shiny list of public ips from the node */
1451                 ret = ctdb_ctrl_get_public_ips_flags(ctdb,
1452                                         CONTROL_TIMEOUT(),
1453                                         ctdb->nodes[j]->pnn,
1454                                         ctdb->nodes,
1455                                         0,
1456                                         &ctdb->nodes[j]->known_public_ips);
1457                 if (ret != 0) {
1458                         DEBUG(DEBUG_ERR,("Failed to read known public ips from node : %u\n",
1459                                 ctdb->nodes[j]->pnn));
1460                         if (culprit) {
1461                                 *culprit = ctdb->nodes[j]->pnn;
1462                         }
1463                         return -1;
1464                 }
1465
1466                 if (ctdb->do_checkpublicip) {
1467                         if (rec->ip_check_disable_ctx == NULL) {
1468                                 if (verify_remote_ip_allocation(ctdb, ctdb->nodes[j]->known_public_ips)) {
1469                                         DEBUG(DEBUG_ERR,("Node %d has inconsistent public ip allocation and needs update.\n", ctdb->nodes[j]->pnn));
1470                                         rec->need_takeover_run = true;
1471                                 }
1472                         }
1473                 }
1474
1475                 /* grab a new shiny list of public ips from the node */
1476                 ret = ctdb_ctrl_get_public_ips_flags(ctdb,
1477                                         CONTROL_TIMEOUT(),
1478                                         ctdb->nodes[j]->pnn,
1479                                         ctdb->nodes,
1480                                         CTDB_PUBLIC_IP_FLAGS_ONLY_AVAILABLE,
1481                                         &ctdb->nodes[j]->available_public_ips);
1482                 if (ret != 0) {
1483                         DEBUG(DEBUG_ERR,("Failed to read available public ips from node : %u\n",
1484                                 ctdb->nodes[j]->pnn));
1485                         if (culprit) {
1486                                 *culprit = ctdb->nodes[j]->pnn;
1487                         }
1488                         return -1;
1489                 }
1490         }
1491
1492         return 0;
1493 }
1494
1495 /* when we start a recovery, make sure all nodes use the same reclock file
1496    setting
1497 */
1498 static int sync_recovery_lock_file_across_cluster(struct ctdb_recoverd *rec)
1499 {
1500         struct ctdb_context *ctdb = rec->ctdb;
1501         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
1502         TDB_DATA data;
1503         uint32_t *nodes;
1504
1505         if (ctdb->recovery_lock_file == NULL) {
1506                 data.dptr  = NULL;
1507                 data.dsize = 0;
1508         } else {
1509                 data.dsize = strlen(ctdb->recovery_lock_file) + 1;
1510                 data.dptr  = (uint8_t *)ctdb->recovery_lock_file;
1511         }
1512
1513         nodes = list_of_active_nodes(ctdb, rec->nodemap, tmp_ctx, true);
1514         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECLOCK_FILE,
1515                                         nodes, 0,
1516                                         CONTROL_TIMEOUT(),
1517                                         false, data,
1518                                         NULL, NULL,
1519                                         rec) != 0) {
1520                 DEBUG(DEBUG_ERR, (__location__ " Failed to sync reclock file settings\n"));
1521                 talloc_free(tmp_ctx);
1522                 return -1;
1523         }
1524
1525         talloc_free(tmp_ctx);
1526         return 0;
1527 }
1528
1529
1530 /*
1531  * this callback is called for every node that failed to execute ctdb_takeover_run()
1532  * and set flag to re-run takeover run.
1533  */
1534 static void takeover_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
1535 {
1536         DEBUG(DEBUG_ERR, ("Node %u failed the takeover run\n", node_pnn));
1537
1538         if (callback_data != NULL) {
1539                 struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
1540
1541                 DEBUG(DEBUG_ERR, ("Setting node %u as recovery fail culprit\n", node_pnn));
1542
1543                 ctdb_set_culprit(rec, node_pnn);
1544                 rec->need_takeover_run = true;
1545         }
1546 }
1547
1548
1549 static void ban_misbehaving_nodes(struct ctdb_recoverd *rec, bool *self_ban)
1550 {
1551         struct ctdb_context *ctdb = rec->ctdb;
1552         int i;
1553         struct ctdb_banning_state *ban_state;
1554
1555         *self_ban = false;
1556         for (i=0; i<ctdb->num_nodes; i++) {
1557                 if (ctdb->nodes[i]->ban_state == NULL) {
1558                         continue;
1559                 }
1560                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
1561                 if (ban_state->count < 2*ctdb->num_nodes) {
1562                         continue;
1563                 }
1564
1565                 DEBUG(DEBUG_NOTICE,("Node %u reached %u banning credits - banning it for %u seconds\n",
1566                         ctdb->nodes[i]->pnn, ban_state->count,
1567                         ctdb->tunable.recovery_ban_period));
1568                 ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
1569                 ban_state->count = 0;
1570
1571                 /* Banning ourself? */
1572                 if (ctdb->nodes[i]->pnn == rec->ctdb->pnn) {
1573                         *self_ban = true;
1574                 }
1575         }
1576 }
1577
1578
1579 /*
1580   we are the recmaster, and recovery is needed - start a recovery run
1581  */
1582 static int do_recovery(struct ctdb_recoverd *rec, 
1583                        TALLOC_CTX *mem_ctx, uint32_t pnn,
1584                        struct ctdb_node_map *nodemap, struct ctdb_vnn_map *vnnmap)
1585 {
1586         struct ctdb_context *ctdb = rec->ctdb;
1587         int i, j, ret;
1588         uint32_t generation;
1589         struct ctdb_dbid_map *dbmap;
1590         TDB_DATA data;
1591         uint32_t *nodes;
1592         struct timeval start_time;
1593         uint32_t culprit = (uint32_t)-1;
1594         bool self_ban;
1595
1596         DEBUG(DEBUG_NOTICE, (__location__ " Starting do_recovery\n"));
1597
1598         /* if recovery fails, force it again */
1599         rec->need_recovery = true;
1600
1601         ban_misbehaving_nodes(rec, &self_ban);
1602         if (self_ban) {
1603                 DEBUG(DEBUG_NOTICE, ("This node was banned, aborting recovery\n"));
1604                 return -1;
1605         }
1606
1607         if (ctdb->tunable.verify_recovery_lock != 0) {
1608                 DEBUG(DEBUG_ERR,("Taking out recovery lock from recovery daemon\n"));
1609                 start_time = timeval_current();
1610                 if (!ctdb_recovery_lock(ctdb, true)) {
1611                         DEBUG(DEBUG_ERR,("Unable to get recovery lock - aborting recovery "
1612                                          "and ban ourself for %u seconds\n",
1613                                          ctdb->tunable.recovery_ban_period));
1614                         ctdb_ban_node(rec, pnn, ctdb->tunable.recovery_ban_period);
1615                         return -1;
1616                 }
1617                 ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&start_time));
1618                 DEBUG(DEBUG_NOTICE,("Recovery lock taken successfully by recovery daemon\n"));
1619         }
1620
1621         DEBUG(DEBUG_NOTICE, (__location__ " Recovery initiated due to problem with node %u\n", rec->last_culprit_node));
1622
1623         /* get a list of all databases */
1624         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &dbmap);
1625         if (ret != 0) {
1626                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node :%u\n", pnn));
1627                 return -1;
1628         }
1629
1630         /* we do the db creation before we set the recovery mode, so the freeze happens
1631            on all databases we will be dealing with. */
1632
1633         /* verify that we have all the databases any other node has */
1634         ret = create_missing_local_databases(ctdb, nodemap, pnn, &dbmap, mem_ctx);
1635         if (ret != 0) {
1636                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing local databases\n"));
1637                 return -1;
1638         }
1639
1640         /* verify that all other nodes have all our databases */
1641         ret = create_missing_remote_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
1642         if (ret != 0) {
1643                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing remote databases\n"));
1644                 return -1;
1645         }
1646         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - created remote databases\n"));
1647
1648         /* update the database priority for all remote databases */
1649         ret = update_db_priority_on_remote_nodes(ctdb, nodemap, pnn, dbmap, mem_ctx);
1650         if (ret != 0) {
1651                 DEBUG(DEBUG_ERR, (__location__ " Unable to set db priority on remote nodes\n"));
1652         }
1653         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated db priority for all databases\n"));
1654
1655
1656         /* update all other nodes to use the same setting for reclock files
1657            as the local recovery master.
1658         */
1659         sync_recovery_lock_file_across_cluster(rec);
1660
1661         /* set recovery mode to active on all nodes */
1662         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
1663         if (ret != 0) {
1664                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
1665                 return -1;
1666         }
1667
1668         /* execute the "startrecovery" event script on all nodes */
1669         ret = run_startrecovery_eventscript(rec, nodemap);
1670         if (ret!=0) {
1671                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
1672                 return -1;
1673         }
1674
1675         /*
1676           update all nodes to have the same flags that we have
1677          */
1678         for (i=0;i<nodemap->num;i++) {
1679                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1680                         continue;
1681                 }
1682
1683                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
1684                 if (ret != 0) {
1685                         DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
1686                         return -1;
1687                 }
1688         }
1689
1690         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
1691
1692         /* pick a new generation number */
1693         generation = new_generation();
1694
1695         /* change the vnnmap on this node to use the new generation 
1696            number but not on any other nodes.
1697            this guarantees that if we abort the recovery prematurely
1698            for some reason (a node stops responding?)
1699            that we can just return immediately and we will reenter
1700            recovery shortly again.
1701            I.e. we deliberately leave the cluster with an inconsistent
1702            generation id to allow us to abort recovery at any stage and
1703            just restart it from scratch.
1704          */
1705         vnnmap->generation = generation;
1706         ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, vnnmap);
1707         if (ret != 0) {
1708                 DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
1709                 return -1;
1710         }
1711
1712         data.dptr = (void *)&generation;
1713         data.dsize = sizeof(uint32_t);
1714
1715         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
1716         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
1717                                         nodes, 0,
1718                                         CONTROL_TIMEOUT(), false, data,
1719                                         NULL,
1720                                         transaction_start_fail_callback,
1721                                         rec) != 0) {
1722                 DEBUG(DEBUG_ERR, (__location__ " Unable to start transactions. Recovery failed.\n"));
1723                 if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_CANCEL,
1724                                         nodes, 0,
1725                                         CONTROL_TIMEOUT(), false, tdb_null,
1726                                         NULL,
1727                                         NULL,
1728                                         NULL) != 0) {
1729                         DEBUG(DEBUG_ERR,("Failed to cancel recovery transaction\n"));
1730                 }
1731                 return -1;
1732         }
1733
1734         DEBUG(DEBUG_NOTICE,(__location__ " started transactions on all nodes\n"));
1735
1736         for (i=0;i<dbmap->num;i++) {
1737                 ret = recover_database(rec, mem_ctx,
1738                                        dbmap->dbs[i].dbid,
1739                                        dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT,
1740                                        pnn, nodemap, generation);
1741                 if (ret != 0) {
1742                         DEBUG(DEBUG_ERR, (__location__ " Failed to recover database 0x%x\n", dbmap->dbs[i].dbid));
1743                         return -1;
1744                 }
1745         }
1746
1747         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - starting database commits\n"));
1748
1749         /* commit all the changes */
1750         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
1751                                         nodes, 0,
1752                                         CONTROL_TIMEOUT(), false, data,
1753                                         NULL, NULL,
1754                                         NULL) != 0) {
1755                 DEBUG(DEBUG_ERR, (__location__ " Unable to commit recovery changes. Recovery failed.\n"));
1756                 return -1;
1757         }
1758
1759         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - committed databases\n"));
1760         
1761
1762         /* update the capabilities for all nodes */
1763         ret = update_capabilities(ctdb, nodemap);
1764         if (ret!=0) {
1765                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
1766                 return -1;
1767         }
1768
1769         /* build a new vnn map with all the currently active and
1770            unbanned nodes */
1771         generation = new_generation();
1772         vnnmap = talloc(mem_ctx, struct ctdb_vnn_map);
1773         CTDB_NO_MEMORY(ctdb, vnnmap);
1774         vnnmap->generation = generation;
1775         vnnmap->size = 0;
1776         vnnmap->map = talloc_zero_array(vnnmap, uint32_t, vnnmap->size);
1777         CTDB_NO_MEMORY(ctdb, vnnmap->map);
1778         for (i=j=0;i<nodemap->num;i++) {
1779                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1780                         continue;
1781                 }
1782                 if (!(ctdb->nodes[i]->capabilities & CTDB_CAP_LMASTER)) {
1783                         /* this node can not be an lmaster */
1784                         DEBUG(DEBUG_DEBUG, ("Node %d cant be a LMASTER, skipping it\n", i));
1785                         continue;
1786                 }
1787
1788                 vnnmap->size++;
1789                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1790                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1791                 vnnmap->map[j++] = nodemap->nodes[i].pnn;
1792
1793         }
1794         if (vnnmap->size == 0) {
1795                 DEBUG(DEBUG_NOTICE, ("No suitable lmasters found. Adding local node (recmaster) anyway.\n"));
1796                 vnnmap->size++;
1797                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1798                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1799                 vnnmap->map[0] = pnn;
1800         }       
1801
1802         /* update to the new vnnmap on all nodes */
1803         ret = update_vnnmap_on_all_nodes(ctdb, nodemap, pnn, vnnmap, mem_ctx);
1804         if (ret != 0) {
1805                 DEBUG(DEBUG_ERR, (__location__ " Unable to update vnnmap on all nodes\n"));
1806                 return -1;
1807         }
1808
1809         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated vnnmap\n"));
1810
1811         /* update recmaster to point to us for all nodes */
1812         ret = set_recovery_master(ctdb, nodemap, pnn);
1813         if (ret!=0) {
1814                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery master\n"));
1815                 return -1;
1816         }
1817
1818         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated recmaster\n"));
1819
1820         /*
1821           update all nodes to have the same flags that we have
1822          */
1823         for (i=0;i<nodemap->num;i++) {
1824                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1825                         continue;
1826                 }
1827
1828                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
1829                 if (ret != 0) {
1830                         DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
1831                         return -1;
1832                 }
1833         }
1834
1835         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
1836
1837         /* disable recovery mode */
1838         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_NORMAL);
1839         if (ret != 0) {
1840                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to normal on cluster\n"));
1841                 return -1;
1842         }
1843
1844         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - disabled recovery mode\n"));
1845
1846         /*
1847           tell nodes to takeover their public IPs
1848          */
1849         ret = ctdb_reload_remote_public_ips(ctdb, rec, nodemap, &culprit);
1850         if (ret != 0) {
1851                 DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
1852                                  culprit));
1853                 rec->need_takeover_run = true;
1854                 return -1;
1855         }
1856         rec->need_takeover_run = false;
1857         ret = ctdb_takeover_run(ctdb, nodemap, takeover_fail_callback, NULL);
1858         if (ret != 0) {
1859                 DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses. ctdb_takeover_run() failed.\n"));
1860                 rec->need_takeover_run = true;
1861         }
1862
1863         /* execute the "recovered" event script on all nodes */
1864         ret = run_recovered_eventscript(rec, nodemap, "do_recovery");
1865         if (ret!=0) {
1866                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Recovery process failed.\n"));
1867                 return -1;
1868         }
1869
1870         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - finished the recovered event\n"));
1871
1872         /* send a message to all clients telling them that the cluster 
1873            has been reconfigured */
1874         ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_RECONFIGURE, tdb_null);
1875
1876         DEBUG(DEBUG_NOTICE, (__location__ " Recovery complete\n"));
1877
1878         rec->need_recovery = false;
1879
1880         /* we managed to complete a full recovery, make sure to forgive
1881            any past sins by the nodes that could now participate in the
1882            recovery.
1883         */
1884         DEBUG(DEBUG_ERR,("Resetting ban count to 0 for all nodes\n"));
1885         for (i=0;i<nodemap->num;i++) {
1886                 struct ctdb_banning_state *ban_state;
1887
1888                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1889                         continue;
1890                 }
1891
1892                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[nodemap->nodes[i].pnn]->ban_state;
1893                 if (ban_state == NULL) {
1894                         continue;
1895                 }
1896
1897                 ban_state->count = 0;
1898         }
1899
1900
1901         /* We just finished a recovery successfully. 
1902            We now wait for rerecovery_timeout before we allow 
1903            another recovery to take place.
1904         */
1905         DEBUG(DEBUG_NOTICE, ("Just finished a recovery. New recoveries will now be supressed for the rerecovery timeout (%d seconds)\n", ctdb->tunable.rerecovery_timeout));
1906         ctdb_wait_timeout(ctdb, ctdb->tunable.rerecovery_timeout);
1907         DEBUG(DEBUG_NOTICE, ("The rerecovery timeout has elapsed. We now allow recoveries to trigger again.\n"));
1908
1909         return 0;
1910 }
1911
1912
1913 /*
1914   elections are won by first checking the number of connected nodes, then
1915   the priority time, then the pnn
1916  */
1917 struct election_message {
1918         uint32_t num_connected;
1919         struct timeval priority_time;
1920         uint32_t pnn;
1921         uint32_t node_flags;
1922 };
1923
1924 /*
1925   form this nodes election data
1926  */
1927 static void ctdb_election_data(struct ctdb_recoverd *rec, struct election_message *em)
1928 {
1929         int ret, i;
1930         struct ctdb_node_map *nodemap;
1931         struct ctdb_context *ctdb = rec->ctdb;
1932
1933         ZERO_STRUCTP(em);
1934
1935         em->pnn = rec->ctdb->pnn;
1936         em->priority_time = rec->priority_time;
1937
1938         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, rec, &nodemap);
1939         if (ret != 0) {
1940                 DEBUG(DEBUG_ERR,(__location__ " unable to get election data\n"));
1941                 return;
1942         }
1943
1944         rec->node_flags = nodemap->nodes[ctdb->pnn].flags;
1945         em->node_flags = rec->node_flags;
1946
1947         for (i=0;i<nodemap->num;i++) {
1948                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1949                         em->num_connected++;
1950                 }
1951         }
1952
1953         /* we shouldnt try to win this election if we cant be a recmaster */
1954         if ((ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1955                 em->num_connected = 0;
1956                 em->priority_time = timeval_current();
1957         }
1958
1959         talloc_free(nodemap);
1960 }
1961
1962 /*
1963   see if the given election data wins
1964  */
1965 static bool ctdb_election_win(struct ctdb_recoverd *rec, struct election_message *em)
1966 {
1967         struct election_message myem;
1968         int cmp = 0;
1969
1970         ctdb_election_data(rec, &myem);
1971
1972         /* we cant win if we dont have the recmaster capability */
1973         if ((rec->ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1974                 return false;
1975         }
1976
1977         /* we cant win if we are banned */
1978         if (rec->node_flags & NODE_FLAGS_BANNED) {
1979                 return false;
1980         }       
1981
1982         /* we cant win if we are stopped */
1983         if (rec->node_flags & NODE_FLAGS_STOPPED) {
1984                 return false;
1985         }       
1986
1987         /* we will automatically win if the other node is banned */
1988         if (em->node_flags & NODE_FLAGS_BANNED) {
1989                 return true;
1990         }
1991
1992         /* we will automatically win if the other node is banned */
1993         if (em->node_flags & NODE_FLAGS_STOPPED) {
1994                 return true;
1995         }
1996
1997         /* try to use the most connected node */
1998         if (cmp == 0) {
1999                 cmp = (int)myem.num_connected - (int)em->num_connected;
2000         }
2001
2002         /* then the longest running node */
2003         if (cmp == 0) {
2004                 cmp = timeval_compare(&em->priority_time, &myem.priority_time);
2005         }
2006
2007         if (cmp == 0) {
2008                 cmp = (int)myem.pnn - (int)em->pnn;
2009         }
2010
2011         return cmp > 0;
2012 }
2013
2014 /*
2015   send out an election request
2016  */
2017 static int send_election_request(struct ctdb_recoverd *rec, uint32_t pnn, bool update_recmaster)
2018 {
2019         int ret;
2020         TDB_DATA election_data;
2021         struct election_message emsg;
2022         uint64_t srvid;
2023         struct ctdb_context *ctdb = rec->ctdb;
2024
2025         srvid = CTDB_SRVID_RECOVERY;
2026
2027         ctdb_election_data(rec, &emsg);
2028
2029         election_data.dsize = sizeof(struct election_message);
2030         election_data.dptr  = (unsigned char *)&emsg;
2031
2032
2033         /* send an election message to all active nodes */
2034         DEBUG(DEBUG_INFO,(__location__ " Send election request to all active nodes\n"));
2035         ctdb_client_send_message(ctdb, CTDB_BROADCAST_ALL, srvid, election_data);
2036
2037
2038         /* A new node that is already frozen has entered the cluster.
2039            The existing nodes are not frozen and dont need to be frozen
2040            until the election has ended and we start the actual recovery
2041         */
2042         if (update_recmaster == true) {
2043                 /* first we assume we will win the election and set 
2044                    recoverymaster to be ourself on the current node
2045                  */
2046                 ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), pnn, pnn);
2047                 if (ret != 0) {
2048                         DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request\n"));
2049                         return -1;
2050                 }
2051         }
2052
2053
2054         return 0;
2055 }
2056
2057 /*
2058   this function will unban all nodes in the cluster
2059 */
2060 static void unban_all_nodes(struct ctdb_context *ctdb)
2061 {
2062         int ret, i;
2063         struct ctdb_node_map *nodemap;
2064         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2065         
2066         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
2067         if (ret != 0) {
2068                 DEBUG(DEBUG_ERR,(__location__ " failed to get nodemap to unban all nodes\n"));
2069                 return;
2070         }
2071
2072         for (i=0;i<nodemap->num;i++) {
2073                 if ( (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED))
2074                   && (nodemap->nodes[i].flags & NODE_FLAGS_BANNED) ) {
2075                         ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[i].pnn, 0, NODE_FLAGS_BANNED);
2076                 }
2077         }
2078
2079         talloc_free(tmp_ctx);
2080 }
2081
2082
2083 /*
2084   we think we are winning the election - send a broadcast election request
2085  */
2086 static void election_send_request(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
2087 {
2088         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
2089         int ret;
2090
2091         ret = send_election_request(rec, ctdb_get_pnn(rec->ctdb), false);
2092         if (ret != 0) {
2093                 DEBUG(DEBUG_ERR,("Failed to send election request!\n"));
2094         }
2095
2096         talloc_free(rec->send_election_te);
2097         rec->send_election_te = NULL;
2098 }
2099
2100 /*
2101   handler for memory dumps
2102 */
2103 static void mem_dump_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2104                              TDB_DATA data, void *private_data)
2105 {
2106         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2107         TDB_DATA *dump;
2108         int ret;
2109         struct rd_memdump_reply *rd;
2110
2111         if (data.dsize != sizeof(struct rd_memdump_reply)) {
2112                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
2113                 talloc_free(tmp_ctx);
2114                 return;
2115         }
2116         rd = (struct rd_memdump_reply *)data.dptr;
2117
2118         dump = talloc_zero(tmp_ctx, TDB_DATA);
2119         if (dump == NULL) {
2120                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate memory for memdump\n"));
2121                 talloc_free(tmp_ctx);
2122                 return;
2123         }
2124         ret = ctdb_dump_memory(ctdb, dump);
2125         if (ret != 0) {
2126                 DEBUG(DEBUG_ERR, (__location__ " ctdb_dump_memory() failed\n"));
2127                 talloc_free(tmp_ctx);
2128                 return;
2129         }
2130
2131 DEBUG(DEBUG_ERR, ("recovery master memory dump\n"));            
2132
2133         ret = ctdb_client_send_message(ctdb, rd->pnn, rd->srvid, *dump);
2134         if (ret != 0) {
2135                 DEBUG(DEBUG_ERR,("Failed to send rd memdump reply message\n"));
2136                 talloc_free(tmp_ctx);
2137                 return;
2138         }
2139
2140         talloc_free(tmp_ctx);
2141 }
2142
2143 /*
2144   handler for getlog
2145 */
2146 static void getlog_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2147                            TDB_DATA data, void *private_data)
2148 {
2149         struct ctdb_get_log_addr *log_addr;
2150         pid_t child;
2151
2152         if (data.dsize != sizeof(struct ctdb_get_log_addr)) {
2153                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
2154                 return;
2155         }
2156         log_addr = (struct ctdb_get_log_addr *)data.dptr;
2157
2158         child = ctdb_fork_no_free_ringbuffer(ctdb);
2159         if (child == (pid_t)-1) {
2160                 DEBUG(DEBUG_ERR,("Failed to fork a log collector child\n"));
2161                 return;
2162         }
2163
2164         if (child == 0) {
2165                 if (switch_from_server_to_client(ctdb, "recoverd-log-collector") != 0) {
2166                         DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch log collector child into client mode.\n"));
2167                         _exit(1);
2168                 }
2169                 ctdb_collect_log(ctdb, log_addr);
2170                 _exit(0);
2171         }
2172 }
2173
2174 /*
2175   handler for clearlog
2176 */
2177 static void clearlog_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2178                              TDB_DATA data, void *private_data)
2179 {
2180         ctdb_clear_log(ctdb);
2181 }
2182
2183 /*
2184   handler for reload_nodes
2185 */
2186 static void reload_nodes_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2187                              TDB_DATA data, void *private_data)
2188 {
2189         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2190
2191         DEBUG(DEBUG_ERR, (__location__ " Reload nodes file from recovery daemon\n"));
2192
2193         reload_nodes_file(rec->ctdb);
2194 }
2195
2196
2197 static void reenable_ip_check(struct event_context *ev, struct timed_event *te, 
2198                               struct timeval yt, void *p)
2199 {
2200         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
2201
2202         talloc_free(rec->ip_check_disable_ctx);
2203         rec->ip_check_disable_ctx = NULL;
2204 }
2205
2206
2207 static void ctdb_rebalance_timeout(struct event_context *ev, struct timed_event *te, 
2208                                   struct timeval t, void *p)
2209 {
2210         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
2211         struct ctdb_context *ctdb = rec->ctdb;
2212         int ret;
2213
2214         DEBUG(DEBUG_NOTICE,("Rebalance all nodes that have had ip assignment changes.\n"));
2215
2216         ret = ctdb_takeover_run(ctdb, rec->nodemap, takeover_fail_callback, NULL);
2217         if (ret != 0) {
2218                 DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses. ctdb_takeover_run() failed.\n"));
2219                 rec->need_takeover_run = true;
2220         }
2221
2222         talloc_free(rec->deferred_rebalance_ctx);
2223         rec->deferred_rebalance_ctx = NULL;
2224 }
2225
2226         
2227 static void recd_node_rebalance_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2228                              TDB_DATA data, void *private_data)
2229 {
2230         uint32_t pnn;
2231         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2232
2233         if (data.dsize != sizeof(uint32_t)) {
2234                 DEBUG(DEBUG_ERR,(__location__ " Incorrect size of node rebalance message. Was %zd but expected %zd bytes\n", data.dsize, sizeof(uint32_t)));
2235                 return;
2236         }
2237
2238         if (ctdb->tunable.deferred_rebalance_on_node_add == 0) {
2239                 return;
2240         }
2241
2242         pnn = *(uint32_t *)&data.dptr[0];
2243
2244         lcp2_forcerebalance(ctdb, pnn);
2245         DEBUG(DEBUG_NOTICE,("Received message to perform node rebalancing for node %d\n", pnn));
2246
2247         if (rec->deferred_rebalance_ctx != NULL) {
2248                 talloc_free(rec->deferred_rebalance_ctx);
2249         }
2250         rec->deferred_rebalance_ctx = talloc_new(rec);
2251         event_add_timed(ctdb->ev, rec->deferred_rebalance_ctx, 
2252                         timeval_current_ofs(ctdb->tunable.deferred_rebalance_on_node_add, 0),
2253                         ctdb_rebalance_timeout, rec);
2254 }
2255
2256
2257
2258 static void recd_update_ip_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2259                              TDB_DATA data, void *private_data)
2260 {
2261         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2262         struct ctdb_public_ip *ip;
2263
2264         if (rec->recmaster != rec->ctdb->pnn) {
2265                 DEBUG(DEBUG_INFO,("Not recmaster, ignore update ip message\n"));
2266                 return;
2267         }
2268
2269         if (data.dsize != sizeof(struct ctdb_public_ip)) {
2270                 DEBUG(DEBUG_ERR,(__location__ " Incorrect size of recd update ip message. Was %zd but expected %zd bytes\n", data.dsize, sizeof(struct ctdb_public_ip)));
2271                 return;
2272         }
2273
2274         ip = (struct ctdb_public_ip *)data.dptr;
2275
2276         update_ip_assignment_tree(rec->ctdb, ip);
2277 }
2278
2279
2280 static void disable_ip_check_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2281                              TDB_DATA data, void *private_data)
2282 {
2283         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2284         uint32_t timeout;
2285
2286         if (rec->ip_check_disable_ctx != NULL) {
2287                 talloc_free(rec->ip_check_disable_ctx);
2288                 rec->ip_check_disable_ctx = NULL;
2289         }
2290
2291         if (data.dsize != sizeof(uint32_t)) {
2292                 DEBUG(DEBUG_ERR,(__location__ " Wrong size for data :%lu "
2293                                  "expexting %lu\n", (long unsigned)data.dsize,
2294                                  (long unsigned)sizeof(uint32_t)));
2295                 return;
2296         }
2297         if (data.dptr == NULL) {
2298                 DEBUG(DEBUG_ERR,(__location__ " No data recaived\n"));
2299                 return;
2300         }
2301
2302         timeout = *((uint32_t *)data.dptr);
2303
2304         if (timeout == 0) {
2305                 DEBUG(DEBUG_NOTICE,("Reenabling ip check\n"));
2306                 return;
2307         }
2308                 
2309         DEBUG(DEBUG_NOTICE,("Disabling ip check for %u seconds\n", timeout));
2310
2311         rec->ip_check_disable_ctx = talloc_new(rec);
2312         CTDB_NO_MEMORY_VOID(ctdb, rec->ip_check_disable_ctx);
2313
2314         event_add_timed(ctdb->ev, rec->ip_check_disable_ctx, timeval_current_ofs(timeout, 0), reenable_ip_check, rec);
2315 }
2316
2317
2318 /*
2319   handler for reload all ips.
2320 */
2321 static void ip_reloadall_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2322                              TDB_DATA data, void *private_data)
2323 {
2324         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2325
2326         if (data.dsize != sizeof(struct reloadips_all_reply)) {
2327                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
2328                 return;
2329         }
2330
2331         reload_all_ips_request = (struct reloadips_all_reply *)talloc_steal(rec, data.dptr);
2332
2333         DEBUG(DEBUG_NOTICE,("RELOAD_ALL_IPS message received from node:%d srvid:%d\n", reload_all_ips_request->pnn, (int)reload_all_ips_request->srvid));
2334         return;
2335 }
2336
2337 static void async_reloadips_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
2338 {
2339         uint32_t *status = callback_data;
2340
2341         if (res != 0) {
2342                 DEBUG(DEBUG_ERR,("Reload ips all failed on node %d\n", node_pnn));
2343                 *status = 1;
2344         }
2345 }
2346
2347 static int
2348 reload_all_ips(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, struct reloadips_all_reply *rips)
2349 {
2350         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2351         uint32_t *nodes;
2352         uint32_t status;
2353         int i;
2354
2355         DEBUG(DEBUG_ERR,("RELOAD ALL IPS on all active nodes\n"));
2356         for (i = 0; i< nodemap->num; i++) {
2357                 if (nodemap->nodes[i].flags != 0) {
2358                         DEBUG(DEBUG_ERR, ("Can not reload ips on all nodes. Node %d is not up and healthy\n", i));
2359                         talloc_free(tmp_ctx);
2360                         return -1;
2361                 }
2362         }
2363
2364         /* send the flags update to all connected nodes */
2365         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2366         status = 0;
2367         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_RELOAD_PUBLIC_IPS,
2368                                         nodes, 0,
2369                                         CONTROL_TIMEOUT(),
2370                                         false, tdb_null,
2371                                         async_reloadips_callback, NULL,
2372                                         &status) != 0) {
2373                 DEBUG(DEBUG_ERR, (__location__ " Failed to reloadips on all nodes.\n"));
2374                 talloc_free(tmp_ctx);
2375                 return -1;
2376         }
2377
2378         if (status != 0) {
2379                 DEBUG(DEBUG_ERR, (__location__ " Failed to reloadips on all nodes.\n"));
2380                 talloc_free(tmp_ctx);
2381                 return -1;
2382         }
2383
2384         ctdb_client_send_message(ctdb, rips->pnn, rips->srvid, tdb_null);
2385
2386         talloc_free(tmp_ctx);
2387         return 0;
2388 }
2389
2390
2391 /*
2392   handler for ip reallocate, just add it to the list of callers and 
2393   handle this later in the monitor_cluster loop so we do not recurse
2394   with other callers to takeover_run()
2395 */
2396 static void ip_reallocate_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2397                              TDB_DATA data, void *private_data)
2398 {
2399         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2400         struct ip_reallocate_list *caller;
2401
2402         if (data.dsize != sizeof(struct rd_memdump_reply)) {
2403                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
2404                 return;
2405         }
2406
2407         if (rec->ip_reallocate_ctx == NULL) {
2408                 rec->ip_reallocate_ctx = talloc_new(rec);
2409                 CTDB_NO_MEMORY_FATAL(ctdb, rec->ip_reallocate_ctx);
2410         }
2411
2412         caller = talloc(rec->ip_reallocate_ctx, struct ip_reallocate_list);
2413         CTDB_NO_MEMORY_FATAL(ctdb, caller);
2414
2415         caller->rd   = (struct rd_memdump_reply *)talloc_steal(caller, data.dptr);
2416         caller->next = rec->reallocate_callers;
2417         rec->reallocate_callers = caller;
2418
2419         return;
2420 }
2421
2422 static void process_ipreallocate_requests(struct ctdb_context *ctdb, struct ctdb_recoverd *rec)
2423 {
2424         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2425         TDB_DATA result;
2426         int32_t ret;
2427         struct ip_reallocate_list *callers;
2428         uint32_t culprit;
2429
2430         DEBUG(DEBUG_INFO, ("recovery master forced ip reallocation\n"));
2431
2432         /* update the list of public ips that a node can handle for
2433            all connected nodes
2434         */
2435         ret = ctdb_reload_remote_public_ips(ctdb, rec, rec->nodemap, &culprit);
2436         if (ret != 0) {
2437                 DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
2438                                  culprit));
2439                 rec->need_takeover_run = true;
2440         }
2441         if (ret == 0) {
2442                 ret = ctdb_takeover_run(ctdb, rec->nodemap, takeover_fail_callback, NULL);
2443                 if (ret != 0) {
2444                         DEBUG(DEBUG_ERR,("Failed to reallocate addresses: ctdb_takeover_run() failed.\n"));
2445                         rec->need_takeover_run = true;
2446                 }
2447         }
2448
2449         result.dsize = sizeof(int32_t);
2450         result.dptr  = (uint8_t *)&ret;
2451
2452         for (callers=rec->reallocate_callers; callers; callers=callers->next) {
2453
2454                 /* Someone that sent srvid==0 does not want a reply */
2455                 if (callers->rd->srvid == 0) {
2456                         continue;
2457                 }
2458                 DEBUG(DEBUG_INFO,("Sending ip reallocate reply message to "
2459                                   "%u:%llu\n", (unsigned)callers->rd->pnn,
2460                                   (unsigned long long)callers->rd->srvid));
2461                 ret = ctdb_client_send_message(ctdb, callers->rd->pnn, callers->rd->srvid, result);
2462                 if (ret != 0) {
2463                         DEBUG(DEBUG_ERR,("Failed to send ip reallocate reply "
2464                                          "message to %u:%llu\n",
2465                                          (unsigned)callers->rd->pnn,
2466                                          (unsigned long long)callers->rd->srvid));
2467                 }
2468         }
2469
2470         talloc_free(tmp_ctx);
2471         talloc_free(rec->ip_reallocate_ctx);
2472         rec->ip_reallocate_ctx = NULL;
2473         rec->reallocate_callers = NULL;
2474         
2475 }
2476
2477
2478 /*
2479   handler for recovery master elections
2480 */
2481 static void election_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2482                              TDB_DATA data, void *private_data)
2483 {
2484         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2485         int ret;
2486         struct election_message *em = (struct election_message *)data.dptr;
2487         TALLOC_CTX *mem_ctx;
2488
2489         /* we got an election packet - update the timeout for the election */
2490         talloc_free(rec->election_timeout);
2491         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
2492                                                 fast_start ?
2493                                                 timeval_current_ofs(0, 500000) :
2494                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
2495                                                 ctdb_election_timeout, rec);
2496
2497         mem_ctx = talloc_new(ctdb);
2498
2499         /* someone called an election. check their election data
2500            and if we disagree and we would rather be the elected node, 
2501            send a new election message to all other nodes
2502          */
2503         if (ctdb_election_win(rec, em)) {
2504                 if (!rec->send_election_te) {
2505                         rec->send_election_te = event_add_timed(ctdb->ev, rec, 
2506                                                                 timeval_current_ofs(0, 500000),
2507                                                                 election_send_request, rec);
2508                 }
2509                 talloc_free(mem_ctx);
2510                 /*unban_all_nodes(ctdb);*/
2511                 return;
2512         }
2513         
2514         /* we didn't win */
2515         talloc_free(rec->send_election_te);
2516         rec->send_election_te = NULL;
2517
2518         if (ctdb->tunable.verify_recovery_lock != 0) {
2519                 /* release the recmaster lock */
2520                 if (em->pnn != ctdb->pnn &&
2521                     ctdb->recovery_lock_fd != -1) {
2522                         close(ctdb->recovery_lock_fd);
2523                         ctdb->recovery_lock_fd = -1;
2524                         unban_all_nodes(ctdb);
2525                 }
2526         }
2527
2528         /* ok, let that guy become recmaster then */
2529         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), ctdb_get_pnn(ctdb), em->pnn);
2530         if (ret != 0) {
2531                 DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request"));
2532                 talloc_free(mem_ctx);
2533                 return;
2534         }
2535
2536         talloc_free(mem_ctx);
2537         return;
2538 }
2539
2540
2541 /*
2542   force the start of the election process
2543  */
2544 static void force_election(struct ctdb_recoverd *rec, uint32_t pnn, 
2545                            struct ctdb_node_map *nodemap)
2546 {
2547         int ret;
2548         struct ctdb_context *ctdb = rec->ctdb;
2549
2550         DEBUG(DEBUG_INFO,(__location__ " Force an election\n"));
2551
2552         /* set all nodes to recovery mode to stop all internode traffic */
2553         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
2554         if (ret != 0) {
2555                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
2556                 return;
2557         }
2558
2559         talloc_free(rec->election_timeout);
2560         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
2561                                                 fast_start ?
2562                                                 timeval_current_ofs(0, 500000) :
2563                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
2564                                                 ctdb_election_timeout, rec);
2565
2566         ret = send_election_request(rec, pnn, true);
2567         if (ret!=0) {
2568                 DEBUG(DEBUG_ERR, (__location__ " failed to initiate recmaster election"));
2569                 return;
2570         }
2571
2572         /* wait for a few seconds to collect all responses */
2573         ctdb_wait_election(rec);
2574 }
2575
2576
2577
2578 /*
2579   handler for when a node changes its flags
2580 */
2581 static void monitor_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2582                             TDB_DATA data, void *private_data)
2583 {
2584         int ret;
2585         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2586         struct ctdb_node_map *nodemap=NULL;
2587         TALLOC_CTX *tmp_ctx;
2588         int i;
2589         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2590         int disabled_flag_changed;
2591
2592         if (data.dsize != sizeof(*c)) {
2593                 DEBUG(DEBUG_ERR,(__location__ "Invalid data in ctdb_node_flag_change\n"));
2594                 return;
2595         }
2596
2597         tmp_ctx = talloc_new(ctdb);
2598         CTDB_NO_MEMORY_VOID(ctdb, tmp_ctx);
2599
2600         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
2601         if (ret != 0) {
2602                 DEBUG(DEBUG_ERR,(__location__ "ctdb_ctrl_getnodemap failed in monitor_handler\n"));
2603                 talloc_free(tmp_ctx);
2604                 return;         
2605         }
2606
2607
2608         for (i=0;i<nodemap->num;i++) {
2609                 if (nodemap->nodes[i].pnn == c->pnn) break;
2610         }
2611
2612         if (i == nodemap->num) {
2613                 DEBUG(DEBUG_CRIT,(__location__ "Flag change for non-existant node %u\n", c->pnn));
2614                 talloc_free(tmp_ctx);
2615                 return;
2616         }
2617
2618         if (nodemap->nodes[i].flags != c->new_flags) {
2619                 DEBUG(DEBUG_NOTICE,("Node %u has changed flags - now 0x%x  was 0x%x\n", c->pnn, c->new_flags, nodemap->nodes[i].flags));
2620         }
2621
2622         disabled_flag_changed =  (nodemap->nodes[i].flags ^ c->new_flags) & NODE_FLAGS_DISABLED;
2623
2624         nodemap->nodes[i].flags = c->new_flags;
2625
2626         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
2627                                      CTDB_CURRENT_NODE, &ctdb->recovery_master);
2628
2629         if (ret == 0) {
2630                 ret = ctdb_ctrl_getrecmode(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
2631                                            CTDB_CURRENT_NODE, &ctdb->recovery_mode);
2632         }
2633         
2634         if (ret == 0 &&
2635             ctdb->recovery_master == ctdb->pnn &&
2636             ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
2637                 /* Only do the takeover run if the perm disabled or unhealthy
2638                    flags changed since these will cause an ip failover but not
2639                    a recovery.
2640                    If the node became disconnected or banned this will also
2641                    lead to an ip address failover but that is handled 
2642                    during recovery
2643                 */
2644                 if (disabled_flag_changed) {
2645                         rec->need_takeover_run = true;
2646                 }
2647         }
2648
2649         talloc_free(tmp_ctx);
2650 }
2651
2652 /*
2653   handler for when we need to push out flag changes ot all other nodes
2654 */
2655 static void push_flags_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2656                             TDB_DATA data, void *private_data)
2657 {
2658         int ret;
2659         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2660         struct ctdb_node_map *nodemap=NULL;
2661         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2662         uint32_t recmaster;
2663         uint32_t *nodes;
2664
2665         /* find the recovery master */
2666         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &recmaster);
2667         if (ret != 0) {
2668                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
2669                 talloc_free(tmp_ctx);
2670                 return;
2671         }
2672
2673         /* read the node flags from the recmaster */
2674         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), recmaster, tmp_ctx, &nodemap);
2675         if (ret != 0) {
2676                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", c->pnn));
2677                 talloc_free(tmp_ctx);
2678                 return;
2679         }
2680         if (c->pnn >= nodemap->num) {
2681                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", c->pnn));
2682                 talloc_free(tmp_ctx);
2683                 return;
2684         }
2685
2686         /* send the flags update to all connected nodes */
2687         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2688
2689         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
2690                                       nodes, 0, CONTROL_TIMEOUT(),
2691                                       false, data,
2692                                       NULL, NULL,
2693                                       NULL) != 0) {
2694                 DEBUG(DEBUG_ERR, (__location__ " ctdb_control to modify node flags failed\n"));
2695
2696                 talloc_free(tmp_ctx);
2697                 return;
2698         }
2699
2700         talloc_free(tmp_ctx);
2701 }
2702
2703
2704 struct verify_recmode_normal_data {
2705         uint32_t count;
2706         enum monitor_result status;
2707 };
2708
2709 static void verify_recmode_normal_callback(struct ctdb_client_control_state *state)
2710 {
2711         struct verify_recmode_normal_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmode_normal_data);
2712
2713
2714         /* one more node has responded with recmode data*/
2715         rmdata->count--;
2716
2717         /* if we failed to get the recmode, then return an error and let
2718            the main loop try again.
2719         */
2720         if (state->state != CTDB_CONTROL_DONE) {
2721                 if (rmdata->status == MONITOR_OK) {
2722                         rmdata->status = MONITOR_FAILED;
2723                 }
2724                 return;
2725         }
2726
2727         /* if we got a response, then the recmode will be stored in the
2728            status field
2729         */
2730         if (state->status != CTDB_RECOVERY_NORMAL) {
2731                 DEBUG(DEBUG_NOTICE, (__location__ " Node:%u was in recovery mode. Restart recovery process\n", state->c->hdr.destnode));
2732                 rmdata->status = MONITOR_RECOVERY_NEEDED;
2733         }
2734
2735         return;
2736 }
2737
2738
2739 /* verify that all nodes are in normal recovery mode */
2740 static enum monitor_result verify_recmode(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
2741 {
2742         struct verify_recmode_normal_data *rmdata;
2743         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2744         struct ctdb_client_control_state *state;
2745         enum monitor_result status;
2746         int j;
2747         
2748         rmdata = talloc(mem_ctx, struct verify_recmode_normal_data);
2749         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2750         rmdata->count  = 0;
2751         rmdata->status = MONITOR_OK;
2752
2753         /* loop over all active nodes and send an async getrecmode call to 
2754            them*/
2755         for (j=0; j<nodemap->num; j++) {
2756                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2757                         continue;
2758                 }
2759                 state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, 
2760                                         CONTROL_TIMEOUT(), 
2761                                         nodemap->nodes[j].pnn);
2762                 if (state == NULL) {
2763                         /* we failed to send the control, treat this as 
2764                            an error and try again next iteration
2765                         */                      
2766                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmode_send during monitoring\n"));
2767                         talloc_free(mem_ctx);
2768                         return MONITOR_FAILED;
2769                 }
2770
2771                 /* set up the callback functions */
2772                 state->async.fn = verify_recmode_normal_callback;
2773                 state->async.private_data = rmdata;
2774
2775                 /* one more control to wait for to complete */
2776                 rmdata->count++;
2777         }
2778
2779
2780         /* now wait for up to the maximum number of seconds allowed
2781            or until all nodes we expect a response from has replied
2782         */
2783         while (rmdata->count > 0) {
2784                 event_loop_once(ctdb->ev);
2785         }
2786
2787         status = rmdata->status;
2788         talloc_free(mem_ctx);
2789         return status;
2790 }
2791
2792
2793 struct verify_recmaster_data {
2794         struct ctdb_recoverd *rec;
2795         uint32_t count;
2796         uint32_t pnn;
2797         enum monitor_result status;
2798 };
2799
2800 static void verify_recmaster_callback(struct ctdb_client_control_state *state)
2801 {
2802         struct verify_recmaster_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmaster_data);
2803
2804
2805         /* one more node has responded with recmaster data*/
2806         rmdata->count--;
2807
2808         /* if we failed to get the recmaster, then return an error and let
2809            the main loop try again.
2810         */
2811         if (state->state != CTDB_CONTROL_DONE) {
2812                 if (rmdata->status == MONITOR_OK) {
2813                         rmdata->status = MONITOR_FAILED;
2814                 }
2815                 return;
2816         }
2817
2818         /* if we got a response, then the recmaster will be stored in the
2819            status field
2820         */
2821         if (state->status != rmdata->pnn) {
2822                 DEBUG(DEBUG_ERR,("Node %d does not agree we are the recmaster. Need a new recmaster election\n", state->c->hdr.destnode));
2823                 ctdb_set_culprit(rmdata->rec, state->c->hdr.destnode);
2824                 rmdata->status = MONITOR_ELECTION_NEEDED;
2825         }
2826
2827         return;
2828 }
2829
2830
2831 /* verify that all nodes agree that we are the recmaster */
2832 static enum monitor_result verify_recmaster(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t pnn)
2833 {
2834         struct ctdb_context *ctdb = rec->ctdb;
2835         struct verify_recmaster_data *rmdata;
2836         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2837         struct ctdb_client_control_state *state;
2838         enum monitor_result status;
2839         int j;
2840         
2841         rmdata = talloc(mem_ctx, struct verify_recmaster_data);
2842         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2843         rmdata->rec    = rec;
2844         rmdata->count  = 0;
2845         rmdata->pnn    = pnn;
2846         rmdata->status = MONITOR_OK;
2847
2848         /* loop over all active nodes and send an async getrecmaster call to 
2849            them*/
2850         for (j=0; j<nodemap->num; j++) {
2851                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2852                         continue;
2853                 }
2854                 state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, 
2855                                         CONTROL_TIMEOUT(),
2856                                         nodemap->nodes[j].pnn);
2857                 if (state == NULL) {
2858                         /* we failed to send the control, treat this as 
2859                            an error and try again next iteration
2860                         */                      
2861                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmaster_send during monitoring\n"));
2862                         talloc_free(mem_ctx);
2863                         return MONITOR_FAILED;
2864                 }
2865
2866                 /* set up the callback functions */
2867                 state->async.fn = verify_recmaster_callback;
2868                 state->async.private_data = rmdata;
2869
2870                 /* one more control to wait for to complete */
2871                 rmdata->count++;
2872         }
2873
2874
2875         /* now wait for up to the maximum number of seconds allowed
2876            or until all nodes we expect a response from has replied
2877         */
2878         while (rmdata->count > 0) {
2879                 event_loop_once(ctdb->ev);
2880         }
2881
2882         status = rmdata->status;
2883         talloc_free(mem_ctx);
2884         return status;
2885 }
2886
2887 static bool interfaces_have_changed(struct ctdb_context *ctdb,
2888                                     struct ctdb_recoverd *rec)
2889 {
2890         struct ctdb_control_get_ifaces *ifaces = NULL;
2891         TALLOC_CTX *mem_ctx;
2892         bool ret = false;
2893
2894         mem_ctx = talloc_new(NULL);
2895
2896         /* Read the interfaces from the local node */
2897         if (ctdb_ctrl_get_ifaces(ctdb, CONTROL_TIMEOUT(),
2898                                  CTDB_CURRENT_NODE, mem_ctx, &ifaces) != 0) {
2899                 DEBUG(DEBUG_ERR, ("Unable to get interfaces from local node %u\n", ctdb->pnn));
2900                 /* We could return an error.  However, this will be
2901                  * rare so we'll decide that the interfaces have
2902                  * actually changed, just in case.
2903                  */
2904                 talloc_free(mem_ctx);
2905                 return true;
2906         }
2907
2908         if (!rec->ifaces) {
2909                 /* We haven't been here before so things have changed */
2910                 ret = true;
2911         } else if (rec->ifaces->num != ifaces->num) {
2912                 /* Number of interfaces has changed */
2913                 ret = true;
2914         } else {
2915                 /* See if interface names or link states have changed */
2916                 int i;
2917                 for (i = 0; i < rec->ifaces->num; i++) {
2918                         struct ctdb_control_iface_info * iface = &rec->ifaces->ifaces[i];
2919                         if (strcmp(iface->name, ifaces->ifaces[i].name) != 0 ||
2920                             iface->link_state != ifaces->ifaces[i].link_state) {
2921                                 ret = true;
2922                                 break;
2923                         }
2924                 }
2925         }
2926
2927         talloc_free(rec->ifaces);
2928         rec->ifaces = talloc_steal(rec, ifaces);
2929
2930         talloc_free(mem_ctx);
2931         return ret;
2932 }
2933
2934 /* called to check that the local allocation of public ip addresses is ok.
2935 */
2936 static int verify_local_ip_allocation(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, uint32_t pnn, struct ctdb_node_map *nodemap)
2937 {
2938         TALLOC_CTX *mem_ctx = talloc_new(NULL);
2939         struct ctdb_uptime *uptime1 = NULL;
2940         struct ctdb_uptime *uptime2 = NULL;
2941         int ret, j;
2942         bool need_takeover_run = false;
2943
2944         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
2945                                 CTDB_CURRENT_NODE, &uptime1);
2946         if (ret != 0) {
2947                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
2948                 talloc_free(mem_ctx);
2949                 return -1;
2950         }
2951
2952         if (interfaces_have_changed(ctdb, rec)) {
2953                 DEBUG(DEBUG_NOTICE, ("The interfaces status has changed on "
2954                                      "local node %u - force takeover run\n",
2955                                      pnn));
2956                 need_takeover_run = true;
2957         }
2958
2959         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
2960                                 CTDB_CURRENT_NODE, &uptime2);
2961         if (ret != 0) {
2962                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
2963                 talloc_free(mem_ctx);
2964                 return -1;
2965         }
2966
2967         /* skip the check if the startrecovery time has changed */
2968         if (timeval_compare(&uptime1->last_recovery_started,
2969                             &uptime2->last_recovery_started) != 0) {
2970                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
2971                 talloc_free(mem_ctx);
2972                 return 0;
2973         }
2974
2975         /* skip the check if the endrecovery time has changed */
2976         if (timeval_compare(&uptime1->last_recovery_finished,
2977                             &uptime2->last_recovery_finished) != 0) {
2978                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
2979                 talloc_free(mem_ctx);
2980                 return 0;
2981         }
2982
2983         /* skip the check if we have started but not finished recovery */
2984         if (timeval_compare(&uptime1->last_recovery_finished,
2985                             &uptime1->last_recovery_started) != 1) {
2986                 DEBUG(DEBUG_INFO, (__location__ " in the middle of recovery or ip reallocation. skipping public ip address check\n"));
2987                 talloc_free(mem_ctx);
2988
2989                 return 0;
2990         }
2991
2992         /* verify that we have the ip addresses we should have
2993            and we dont have ones we shouldnt have.
2994            if we find an inconsistency we set recmode to
2995            active on the local node and wait for the recmaster
2996            to do a full blown recovery.
2997            also if the pnn is -1 and we are healthy and can host the ip
2998            we also request a ip reallocation.
2999         */
3000         if (ctdb->tunable.disable_ip_failover == 0) {
3001                 struct ctdb_all_public_ips *ips = NULL;
3002
3003                 /* read the *available* IPs from the local node */
3004                 ret = ctdb_ctrl_get_public_ips_flags(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, CTDB_PUBLIC_IP_FLAGS_ONLY_AVAILABLE, &ips);
3005                 if (ret != 0) {
3006                         DEBUG(DEBUG_ERR, ("Unable to get available public IPs from local node %u\n", pnn));
3007                         talloc_free(mem_ctx);
3008                         return -1;
3009                 }
3010
3011                 for (j=0; j<ips->num; j++) {
3012                         if (ips->ips[j].pnn == -1 &&
3013                             nodemap->nodes[pnn].flags == 0) {
3014                                 DEBUG(DEBUG_CRIT,("Public IP '%s' is not assigned and we could serve it\n",
3015                                                   ctdb_addr_to_str(&ips->ips[j].addr)));
3016                                 need_takeover_run = true;
3017                         }
3018                 }
3019
3020                 talloc_free(ips);
3021
3022                 /* read the *known* IPs from the local node */
3023                 ret = ctdb_ctrl_get_public_ips_flags(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, 0, &ips);
3024                 if (ret != 0) {
3025                         DEBUG(DEBUG_ERR, ("Unable to get known public IPs from local node %u\n", pnn));
3026                         talloc_free(mem_ctx);
3027                         return -1;
3028                 }
3029
3030                 for (j=0; j<ips->num; j++) {
3031                         if (ips->ips[j].pnn == pnn) {
3032                                 if (ctdb->do_checkpublicip && !ctdb_sys_have_ip(&ips->ips[j].addr)) {
3033                                         DEBUG(DEBUG_CRIT,("Public IP '%s' is assigned to us but not on an interface\n",
3034                                                 ctdb_addr_to_str(&ips->ips[j].addr)));
3035                                         need_takeover_run = true;
3036                                 }
3037                         } else {
3038                                 if (ctdb->do_checkpublicip &&
3039                                     ctdb_sys_have_ip(&ips->ips[j].addr)) {
3040
3041                                         DEBUG(DEBUG_CRIT,("We are still serving a public IP '%s' that we should not be serving. Removing it\n", 
3042                                                 ctdb_addr_to_str(&ips->ips[j].addr)));
3043
3044                                         if (ctdb_ctrl_release_ip(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ips->ips[j]) != 0) {
3045                                                 DEBUG(DEBUG_ERR,("Failed to release local IP address\n"));
3046                                         }
3047                                 }
3048                         }
3049                 }
3050         }
3051
3052         if (need_takeover_run) {
3053                 struct takeover_run_reply rd;
3054                 TDB_DATA data;
3055
3056                 DEBUG(DEBUG_CRIT,("Trigger takeoverrun\n"));
3057
3058                 rd.pnn = ctdb->pnn;
3059                 rd.srvid = 0;
3060                 data.dptr = (uint8_t *)&rd;
3061                 data.dsize = sizeof(rd);
3062
3063                 ret = ctdb_client_send_message(ctdb, rec->recmaster, CTDB_SRVID_TAKEOVER_RUN, data);
3064                 if (ret != 0) {
3065                         DEBUG(DEBUG_ERR,(__location__ " Failed to send ipreallocate to recmaster :%d\n", (int)rec->recmaster));
3066                 }
3067         }
3068         talloc_free(mem_ctx);
3069         return 0;
3070 }
3071
3072
3073 static void async_getnodemap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
3074 {
3075         struct ctdb_node_map **remote_nodemaps = callback_data;
3076
3077         if (node_pnn >= ctdb->num_nodes) {
3078                 DEBUG(DEBUG_ERR,(__location__ " pnn from invalid node\n"));
3079                 return;
3080         }
3081
3082         remote_nodemaps[node_pnn] = (struct ctdb_node_map *)talloc_steal(remote_nodemaps, outdata.dptr);
3083
3084 }
3085
3086 static int get_remote_nodemaps(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx,
3087         struct ctdb_node_map *nodemap,
3088         struct ctdb_node_map **remote_nodemaps)
3089 {
3090         uint32_t *nodes;
3091
3092         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
3093         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_NODEMAP,
3094                                         nodes, 0,
3095                                         CONTROL_TIMEOUT(), false, tdb_null,
3096                                         async_getnodemap_callback,
3097                                         NULL,
3098                                         remote_nodemaps) != 0) {
3099                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull all remote nodemaps\n"));
3100
3101                 return -1;
3102         }
3103
3104         return 0;
3105 }
3106
3107 enum reclock_child_status { RECLOCK_CHECKING, RECLOCK_OK, RECLOCK_FAILED, RECLOCK_TIMEOUT};
3108 struct ctdb_check_reclock_state {
3109         struct ctdb_context *ctdb;
3110         struct timeval start_time;
3111         int fd[2];
3112         pid_t child;
3113         struct timed_event *te;
3114         struct fd_event *fde;
3115         enum reclock_child_status status;
3116 };
3117
3118 /* when we free the reclock state we must kill any child process.
3119 */
3120 static int check_reclock_destructor(struct ctdb_check_reclock_state *state)
3121 {
3122         struct ctdb_context *ctdb = state->ctdb;
3123
3124         ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&state->start_time));
3125
3126         if (state->fd[0] != -1) {
3127                 close(state->fd[0]);
3128                 state->fd[0] = -1;
3129         }
3130         if (state->fd[1] != -1) {
3131                 close(state->fd[1]);
3132                 state->fd[1] = -1;
3133         }
3134         ctdb_kill(ctdb, state->child, SIGKILL);
3135         return 0;
3136 }
3137
3138 /*
3139   called if our check_reclock child times out. this would happen if
3140   i/o to the reclock file blocks.
3141  */
3142 static void ctdb_check_reclock_timeout(struct event_context *ev, struct timed_event *te, 
3143                                          struct timeval t, void *private_data)
3144 {
3145         struct ctdb_check_reclock_state *state = talloc_get_type(private_data, 
3146                                            struct ctdb_check_reclock_state);
3147
3148         DEBUG(DEBUG_ERR,(__location__ " check_reclock child process hung/timedout CFS slow to grant locks?\n"));
3149         state->status = RECLOCK_TIMEOUT;
3150 }
3151
3152 /* this is called when the child process has completed checking the reclock
3153    file and has written data back to us through the pipe.
3154 */
3155 static void reclock_child_handler(struct event_context *ev, struct fd_event *fde, 
3156                              uint16_t flags, void *private_data)
3157 {
3158         struct ctdb_check_reclock_state *state= talloc_get_type(private_data, 
3159                                              struct ctdb_check_reclock_state);
3160         char c = 0;
3161         int ret;
3162
3163         /* we got a response from our child process so we can abort the
3164            timeout.
3165         */
3166         talloc_free(state->te);
3167         state->te = NULL;
3168
3169         ret = read(state->fd[0], &c, 1);
3170         if (ret != 1 || c != RECLOCK_OK) {
3171                 DEBUG(DEBUG_ERR,(__location__ " reclock child process returned error %d\n", c));
3172                 state->status = RECLOCK_FAILED;
3173
3174                 return;
3175         }
3176
3177         state->status = RECLOCK_OK;
3178         return;
3179 }
3180
3181 static int check_recovery_lock(struct ctdb_context *ctdb)
3182 {
3183         int ret;
3184         struct ctdb_check_reclock_state *state;
3185         pid_t parent = getpid();
3186
3187         if (ctdb->recovery_lock_fd == -1) {
3188                 DEBUG(DEBUG_CRIT,("recovery master doesn't have the recovery lock\n"));
3189                 return -1;
3190         }
3191
3192         state = talloc(ctdb, struct ctdb_check_reclock_state);
3193         CTDB_NO_MEMORY(ctdb, state);
3194
3195         state->ctdb = ctdb;
3196         state->start_time = timeval_current();
3197         state->status = RECLOCK_CHECKING;
3198         state->fd[0] = -1;
3199         state->fd[1] = -1;
3200
3201         ret = pipe(state->fd);
3202         if (ret != 0) {
3203                 talloc_free(state);
3204                 DEBUG(DEBUG_CRIT,(__location__ " Failed to open pipe for check_reclock child\n"));
3205                 return -1;
3206         }
3207
3208         state->child = ctdb_fork(ctdb);
3209         if (state->child == (pid_t)-1) {
3210                 DEBUG(DEBUG_CRIT,(__location__ " fork() failed in check_reclock child\n"));
3211                 close(state->fd[0]);
3212                 state->fd[0] = -1;
3213                 close(state->fd[1]);
3214                 state->fd[1] = -1;
3215                 talloc_free(state);
3216                 return -1;
3217         }
3218
3219         if (state->child == 0) {
3220                 char cc = RECLOCK_OK;
3221                 close(state->fd[0]);
3222                 state->fd[0] = -1;
3223
3224                 debug_extra = talloc_asprintf(NULL, "recovery-lock:");
3225                 if (pread(ctdb->recovery_lock_fd, &cc, 1, 0) == -1) {
3226                         DEBUG(DEBUG_CRIT,("failed read from recovery_lock_fd - %s\n", strerror(errno)));
3227                         cc = RECLOCK_FAILED;
3228                 }
3229
3230                 write(state->fd[1], &cc, 1);
3231                 /* make sure we die when our parent dies */
3232                 while (ctdb_kill(ctdb, parent, 0) == 0 || errno != ESRCH) {
3233                         sleep(5);
3234                         write(state->fd[1], &cc, 1);
3235                 }
3236                 _exit(0);
3237         }
3238         close(state->fd[1]);
3239         state->fd[1] = -1;
3240         set_close_on_exec(state->fd[0]);
3241
3242         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d for check_recovery_lock\n", state->fd[0]));
3243
3244         talloc_set_destructor(state, check_reclock_destructor);
3245
3246         state->te = event_add_timed(ctdb->ev, state, timeval_current_ofs(15, 0),
3247                                     ctdb_check_reclock_timeout, state);
3248         if (state->te == NULL) {
3249                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create a timed event for reclock child\n"));
3250                 talloc_free(state);
3251                 return -1;
3252         }
3253
3254         state->fde = event_add_fd(ctdb->ev, state, state->fd[0],
3255                                 EVENT_FD_READ,
3256                                 reclock_child_handler,
3257                                 (void *)state);
3258
3259         if (state->fde == NULL) {
3260                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create an fd event for reclock child\n"));
3261                 talloc_free(state);
3262                 return -1;
3263         }
3264         tevent_fd_set_auto_close(state->fde);
3265
3266         while (state->status == RECLOCK_CHECKING) {
3267                 event_loop_once(ctdb->ev);
3268         }
3269
3270         if (state->status == RECLOCK_FAILED) {
3271                 DEBUG(DEBUG_ERR,(__location__ " reclock child failed when checking file\n"));
3272                 close(ctdb->recovery_lock_fd);
3273                 ctdb->recovery_lock_fd = -1;
3274                 talloc_free(state);
3275                 return -1;
3276         }
3277
3278         talloc_free(state);
3279         return 0;
3280 }
3281
3282 static int update_recovery_lock_file(struct ctdb_context *ctdb)
3283 {
3284         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3285         const char *reclockfile;
3286
3287         if (ctdb_ctrl_getreclock(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &reclockfile) != 0) {
3288                 DEBUG(DEBUG_ERR,("Failed to read reclock file from daemon\n"));
3289                 talloc_free(tmp_ctx);
3290                 return -1;      
3291         }
3292
3293         if (reclockfile == NULL) {
3294                 if (ctdb->recovery_lock_file != NULL) {
3295                         DEBUG(DEBUG_ERR,("Reclock file disabled\n"));
3296                         talloc_free(ctdb->recovery_lock_file);
3297                         ctdb->recovery_lock_file = NULL;
3298                         if (ctdb->recovery_lock_fd != -1) {
3299                                 close(ctdb->recovery_lock_fd);
3300                                 ctdb->recovery_lock_fd = -1;
3301                         }
3302                 }
3303                 ctdb->tunable.verify_recovery_lock = 0;
3304                 talloc_free(tmp_ctx);
3305                 return 0;
3306         }
3307
3308         if (ctdb->recovery_lock_file == NULL) {
3309                 ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
3310                 if (ctdb->recovery_lock_fd != -1) {
3311                         close(ctdb->recovery_lock_fd);
3312                         ctdb->recovery_lock_fd = -1;
3313                 }
3314                 talloc_free(tmp_ctx);
3315                 return 0;
3316         }
3317
3318
3319         if (!strcmp(reclockfile, ctdb->recovery_lock_file)) {
3320                 talloc_free(tmp_ctx);
3321                 return 0;
3322         }
3323
3324         talloc_free(ctdb->recovery_lock_file);
3325         ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
3326         ctdb->tunable.verify_recovery_lock = 0;
3327         if (ctdb->recovery_lock_fd != -1) {
3328                 close(ctdb->recovery_lock_fd);
3329                 ctdb->recovery_lock_fd = -1;
3330         }
3331
3332         talloc_free(tmp_ctx);
3333         return 0;
3334 }
3335
3336 static void main_loop(struct ctdb_context *ctdb, struct ctdb_recoverd *rec,
3337                       TALLOC_CTX *mem_ctx)
3338 {
3339         uint32_t pnn;
3340         struct ctdb_node_map *nodemap=NULL;
3341         struct ctdb_node_map *recmaster_nodemap=NULL;
3342         struct ctdb_node_map **remote_nodemaps=NULL;
3343         struct ctdb_vnn_map *vnnmap=NULL;
3344         struct ctdb_vnn_map *remote_vnnmap=NULL;
3345         int32_t debug_level;
3346         int i, j, ret;
3347         bool self_ban;
3348
3349
3350         /* verify that the main daemon is still running */
3351         if (ctdb_kill(ctdb, ctdb->ctdbd_pid, 0) != 0) {
3352                 DEBUG(DEBUG_CRIT,("CTDB daemon is no longer available. Shutting down recovery daemon\n"));
3353                 exit(-1);
3354         }
3355
3356         /* ping the local daemon to tell it we are alive */
3357         ctdb_ctrl_recd_ping(ctdb);
3358
3359         if (rec->election_timeout) {
3360                 /* an election is in progress */
3361                 return;
3362         }
3363
3364         /* read the debug level from the parent and update locally */
3365         ret = ctdb_ctrl_get_debuglevel(ctdb, CTDB_CURRENT_NODE, &debug_level);
3366         if (ret !=0) {
3367                 DEBUG(DEBUG_ERR, (__location__ " Failed to read debuglevel from parent\n"));
3368                 return;
3369         }
3370         LogLevel = debug_level;
3371
3372         /* get relevant tunables */
3373         ret = ctdb_ctrl_get_all_tunables(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->tunable);
3374         if (ret != 0) {
3375                 DEBUG(DEBUG_ERR,("Failed to get tunables - retrying\n"));
3376                 return;
3377         }
3378
3379         /* get the current recovery lock file from the server */
3380         if (update_recovery_lock_file(ctdb) != 0) {
3381                 DEBUG(DEBUG_ERR,("Failed to update the recovery lock file\n"));
3382                 return;
3383         }
3384
3385         /* Make sure that if recovery lock verification becomes disabled when
3386            we close the file
3387         */
3388         if (ctdb->tunable.verify_recovery_lock == 0) {
3389                 if (ctdb->recovery_lock_fd != -1) {
3390                         close(ctdb->recovery_lock_fd);
3391                         ctdb->recovery_lock_fd = -1;
3392                 }
3393         }
3394
3395         pnn = ctdb_ctrl_getpnn(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
3396         if (pnn == (uint32_t)-1) {
3397                 DEBUG(DEBUG_ERR,("Failed to get local pnn - retrying\n"));
3398                 return;
3399         }
3400
3401         /* get the vnnmap */
3402         ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &vnnmap);
3403         if (ret != 0) {
3404                 DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from node %u\n", pnn));
3405                 return;
3406         }
3407
3408
3409         /* get number of nodes */
3410         if (rec->nodemap) {
3411                 talloc_free(rec->nodemap);
3412                 rec->nodemap = NULL;
3413                 nodemap=NULL;
3414         }
3415         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), pnn, rec, &rec->nodemap);
3416         if (ret != 0) {
3417                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", pnn));
3418                 return;
3419         }
3420         nodemap = rec->nodemap;
3421
3422         /* remember our own node flags */
3423         rec->node_flags = nodemap->nodes[pnn].flags;
3424
3425         ban_misbehaving_nodes(rec, &self_ban);
3426         if (self_ban) {
3427                 DEBUG(DEBUG_NOTICE, ("This node was banned, restart main_loop\n"));
3428                 return;
3429         }
3430
3431         /* if the local daemon is STOPPED or BANNED, we verify that the databases are
3432            also frozen and thet the recmode is set to active.
3433         */
3434         if (rec->node_flags & (NODE_FLAGS_STOPPED | NODE_FLAGS_BANNED)) {
3435                 ret = ctdb_ctrl_getrecmode(ctdb, mem_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->recovery_mode);
3436                 if (ret != 0) {
3437                         DEBUG(DEBUG_ERR,(__location__ " Failed to read recmode from local node\n"));
3438                 }
3439                 if (ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
3440                         DEBUG(DEBUG_ERR,("Node is stopped or banned but recovery mode is not active. Activate recovery mode and lock databases\n"));
3441
3442                         ret = ctdb_ctrl_freeze_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, 1);
3443                         if (ret != 0) {
3444                                 DEBUG(DEBUG_ERR,(__location__ " Failed to freeze node in STOPPED or BANNED state\n"));
3445                                 return;
3446                         }
3447                         ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
3448                         if (ret != 0) {
3449                                 DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode in STOPPED or BANNED state\n"));
3450
3451                                 return;
3452                         }
3453                 }
3454
3455                 /* If this node is stopped or banned then it is not the recovery
3456                  * master, so don't do anything. This prevents stopped or banned
3457                  * node from starting election and sending unnecessary controls.
3458                  */
3459                 return;
3460         }
3461
3462         /* check which node is the recovery master */
3463         ret = ctdb_ctrl_getrecmaster(ctdb, mem_ctx, CONTROL_TIMEOUT(), pnn, &rec->recmaster);
3464         if (ret != 0) {
3465                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from node %u\n", pnn));
3466                 return;
3467         }
3468
3469         /* if we are not the recmaster we can safely ignore any ip reallocate requests */
3470         if (rec->recmaster != pnn) {
3471                 if (rec->ip_reallocate_ctx != NULL) {
3472                         talloc_free(rec->ip_reallocate_ctx);
3473                         rec->ip_reallocate_ctx = NULL;
3474                         rec->reallocate_callers = NULL;
3475                 }
3476         }
3477
3478         /* This is a special case.  When recovery daemon is started, recmaster
3479          * is set to -1.  If a node is not started in stopped state, then
3480          * start election to decide recovery master
3481          */
3482         if (rec->recmaster == (uint32_t)-1) {
3483                 DEBUG(DEBUG_NOTICE,(__location__ " Initial recovery master set - forcing election\n"));
3484                 force_election(rec, pnn, nodemap);
3485                 return;
3486         }
3487
3488         /* update the capabilities for all nodes */
3489         ret = update_capabilities(ctdb, nodemap);
3490         if (ret != 0) {
3491                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
3492                 return;
3493         }
3494
3495         /*
3496          * if the current recmaster do not have CTDB_CAP_RECMASTER,
3497          * but we have force an election and try to become the new
3498          * recmaster
3499          */
3500         if ((rec->ctdb->nodes[rec->recmaster]->capabilities & CTDB_CAP_RECMASTER) == 0 &&
3501             (rec->ctdb->capabilities & CTDB_CAP_RECMASTER) &&
3502              !(nodemap->nodes[pnn].flags & NODE_FLAGS_INACTIVE)) {
3503                 DEBUG(DEBUG_ERR, (__location__ " Current recmaster node %u does not have CAP_RECMASTER,"
3504                                   " but we (node %u) have - force an election\n",
3505                                   rec->recmaster, pnn));
3506                 force_election(rec, pnn, nodemap);
3507                 return;
3508         }
3509
3510         /* count how many active nodes there are */
3511         rec->num_active    = 0;
3512         rec->num_connected = 0;
3513         for (i=0; i<nodemap->num; i++) {
3514                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
3515                         rec->num_active++;
3516                 }
3517                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
3518                         rec->num_connected++;
3519                 }
3520         }
3521
3522
3523         /* verify that the recmaster node is still active */
3524         for (j=0; j<nodemap->num; j++) {
3525                 if (nodemap->nodes[j].pnn==rec->recmaster) {
3526                         break;
3527                 }
3528         }
3529
3530         if (j == nodemap->num) {
3531                 DEBUG(DEBUG_ERR, ("Recmaster node %u not in list. Force reelection\n", rec->recmaster));
3532                 force_election(rec, pnn, nodemap);
3533                 return;
3534         }
3535
3536         /* if recovery master is disconnected we must elect a new recmaster */
3537         if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
3538                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u is disconnected. Force reelection\n", nodemap->nodes[j].pnn));
3539                 force_election(rec, pnn, nodemap);
3540                 return;
3541         }
3542
3543         /* get nodemap from the recovery master to check if it is inactive */
3544         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
3545                                    mem_ctx, &recmaster_nodemap);
3546         if (ret != 0) {
3547                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from recovery master %u\n", 
3548                           nodemap->nodes[j].pnn));
3549                 return;
3550         }
3551
3552
3553         if ((recmaster_nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) &&
3554             (rec->node_flags & NODE_FLAGS_INACTIVE) == 0) {
3555                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u no longer available. Force reelection\n", nodemap->nodes[j].pnn));
3556                 force_election(rec, pnn, nodemap);
3557                 return;
3558         }
3559
3560         /* verify that we have all ip addresses we should have and we dont
3561          * have addresses we shouldnt have.
3562          */ 
3563         if (ctdb->tunable.disable_ip_failover == 0) {
3564                 if (rec->ip_check_disable_ctx == NULL) {
3565                         if (verify_local_ip_allocation(ctdb, rec, pnn, nodemap) != 0) {
3566                                 DEBUG(DEBUG_ERR, (__location__ " Public IPs were inconsistent.\n"));
3567                         }
3568                 }
3569         }
3570
3571
3572         /* if we are not the recmaster then we do not need to check
3573            if recovery is needed
3574          */
3575         if (pnn != rec->recmaster) {
3576                 return;
3577         }
3578
3579
3580         /* ensure our local copies of flags are right */
3581         ret = update_local_flags(rec, nodemap);
3582         if (ret == MONITOR_ELECTION_NEEDED) {
3583                 DEBUG(DEBUG_NOTICE,("update_local_flags() called for a re-election.\n"));
3584                 force_election(rec, pnn, nodemap);
3585                 return;
3586         }
3587         if (ret != MONITOR_OK) {
3588                 DEBUG(DEBUG_ERR,("Unable to update local flags\n"));
3589                 return;
3590         }
3591
3592         if (ctdb->num_nodes != nodemap->num) {
3593                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) reloading nodes file\n", ctdb->num_nodes, nodemap->num));
3594                 reload_nodes_file(ctdb);
3595                 return;
3596         }
3597
3598         /* verify that all active nodes agree that we are the recmaster */
3599         switch (verify_recmaster(rec, nodemap, pnn)) {
3600         case MONITOR_RECOVERY_NEEDED:
3601                 /* can not happen */
3602                 return;
3603         case MONITOR_ELECTION_NEEDED:
3604                 force_election(rec, pnn, nodemap);
3605                 return;
3606         case MONITOR_OK:
3607                 break;
3608         case MONITOR_FAILED:
3609                 return;
3610         }
3611
3612
3613         if (rec->need_recovery) {
3614                 /* a previous recovery didn't finish */
3615                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3616                 return;
3617         }
3618
3619         /* verify that all active nodes are in normal mode 
3620            and not in recovery mode 
3621         */
3622         switch (verify_recmode(ctdb, nodemap)) {
3623         case MONITOR_RECOVERY_NEEDED:
3624                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3625                 return;
3626         case MONITOR_FAILED:
3627                 return;
3628         case MONITOR_ELECTION_NEEDED:
3629                 /* can not happen */
3630         case MONITOR_OK:
3631                 break;
3632         }
3633
3634
3635         if (ctdb->tunable.verify_recovery_lock != 0) {
3636                 /* we should have the reclock - check its not stale */
3637                 ret = check_recovery_lock(ctdb);
3638                 if (ret != 0) {
3639                         DEBUG(DEBUG_ERR,("Failed check_recovery_lock. Force a recovery\n"));
3640                         ctdb_set_culprit(rec, ctdb->pnn);
3641                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3642                         return;
3643                 }
3644         }
3645
3646
3647         /* is there a pending reload all ips ? */
3648         if (reload_all_ips_request != NULL) {
3649                 reload_all_ips(ctdb, rec, nodemap, reload_all_ips_request);
3650                 talloc_free(reload_all_ips_request);
3651                 reload_all_ips_request = NULL;
3652         }
3653
3654         /* if there are takeovers requested, perform it and notify the waiters */
3655         if (rec->reallocate_callers) {
3656                 process_ipreallocate_requests(ctdb, rec);
3657         }
3658
3659         /* get the nodemap for all active remote nodes
3660          */
3661         remote_nodemaps = talloc_array(mem_ctx, struct ctdb_node_map *, nodemap->num);
3662         if (remote_nodemaps == NULL) {
3663                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate remote nodemap array\n"));
3664                 return;
3665         }
3666         for(i=0; i<nodemap->num; i++) {
3667                 remote_nodemaps[i] = NULL;
3668         }
3669         if (get_remote_nodemaps(ctdb, mem_ctx, nodemap, remote_nodemaps) != 0) {
3670                 DEBUG(DEBUG_ERR,(__location__ " Failed to read remote nodemaps\n"));
3671                 return;
3672         } 
3673
3674         /* verify that all other nodes have the same nodemap as we have
3675         */
3676         for (j=0; j<nodemap->num; j++) {
3677                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3678                         continue;
3679                 }
3680
3681                 if (remote_nodemaps[j] == NULL) {
3682                         DEBUG(DEBUG_ERR,(__location__ " Did not get a remote nodemap for node %d, restarting monitoring\n", j));
3683                         ctdb_set_culprit(rec, j);
3684
3685                         return;
3686                 }
3687
3688                 /* if the nodes disagree on how many nodes there are
3689                    then this is a good reason to try recovery
3690                  */
3691                 if (remote_nodemaps[j]->num != nodemap->num) {
3692                         DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different node count. %u vs %u of the local node\n",
3693                                   nodemap->nodes[j].pnn, remote_nodemaps[j]->num, nodemap->num));
3694                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3695                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3696                         return;
3697                 }
3698
3699                 /* if the nodes disagree on which nodes exist and are
3700                    active, then that is also a good reason to do recovery
3701                  */
3702                 for (i=0;i<nodemap->num;i++) {
3703                         if (remote_nodemaps[j]->nodes[i].pnn != nodemap->nodes[i].pnn) {
3704                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different nodemap pnn for %d (%u vs %u).\n", 
3705                                           nodemap->nodes[j].pnn, i, 
3706                                           remote_nodemaps[j]->nodes[i].pnn, nodemap->nodes[i].pnn));
3707                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3708                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3709                                             vnnmap);
3710                                 return;
3711                         }
3712                 }
3713
3714                 /* verify the flags are consistent
3715                 */
3716                 for (i=0; i<nodemap->num; i++) {
3717                         if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3718                                 continue;
3719                         }
3720                         
3721                         if (nodemap->nodes[i].flags != remote_nodemaps[j]->nodes[i].flags) {
3722                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different flags for node %u. It has 0x%02x vs our 0x%02x\n", 
3723                                   nodemap->nodes[j].pnn, 
3724                                   nodemap->nodes[i].pnn, 
3725                                   remote_nodemaps[j]->nodes[i].flags,
3726                                   nodemap->nodes[i].flags));
3727                                 if (i == j) {
3728                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from remote node %d for cluster update of its own flags\n", remote_nodemaps[j]->nodes[i].flags, j));
3729                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, remote_nodemaps[j]->nodes[i].flags);
3730                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3731                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3732                                                     vnnmap);
3733                                         return;
3734                                 } else {
3735                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from local recmaster node for cluster update of node %d flags\n", nodemap->nodes[i].flags, i));
3736                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, nodemap->nodes[i].flags);
3737                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3738                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3739                                                     vnnmap);
3740                                         return;
3741                                 }
3742                         }
3743                 }
3744         }
3745
3746
3747         /* there better be the same number of lmasters in the vnn map
3748            as there are active nodes or we will have to do a recovery
3749          */
3750         if (vnnmap->size != rec->num_active) {
3751                 DEBUG(DEBUG_ERR, (__location__ " The vnnmap count is different from the number of active nodes. %u vs %u\n", 
3752                           vnnmap->size, rec->num_active));
3753                 ctdb_set_culprit(rec, ctdb->pnn);
3754                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3755                 return;
3756         }
3757
3758         /* verify that all active nodes in the nodemap also exist in 
3759            the vnnmap.
3760          */
3761         for (j=0; j<nodemap->num; j++) {
3762                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3763                         continue;
3764                 }
3765                 if (nodemap->nodes[j].pnn == pnn) {
3766                         continue;
3767                 }
3768
3769                 for (i=0; i<vnnmap->size; i++) {
3770                         if (vnnmap->map[i] == nodemap->nodes[j].pnn) {
3771                                 break;
3772                         }
3773                 }
3774                 if (i == vnnmap->size) {
3775                         DEBUG(DEBUG_ERR, (__location__ " Node %u is active in the nodemap but did not exist in the vnnmap\n", 
3776                                   nodemap->nodes[j].pnn));
3777                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3778                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3779                         return;
3780                 }
3781         }
3782
3783         
3784         /* verify that all other nodes have the same vnnmap
3785            and are from the same generation
3786          */
3787         for (j=0; j<nodemap->num; j++) {
3788                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3789                         continue;
3790                 }
3791                 if (nodemap->nodes[j].pnn == pnn) {
3792                         continue;
3793                 }
3794
3795                 ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
3796                                           mem_ctx, &remote_vnnmap);
3797                 if (ret != 0) {
3798                         DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from remote node %u\n", 
3799                                   nodemap->nodes[j].pnn));
3800                         return;
3801                 }
3802
3803                 /* verify the vnnmap generation is the same */
3804                 if (vnnmap->generation != remote_vnnmap->generation) {
3805                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different generation of vnnmap. %u vs %u (ours)\n", 
3806                                   nodemap->nodes[j].pnn, remote_vnnmap->generation, vnnmap->generation));
3807                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3808                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3809                         return;
3810                 }
3811
3812                 /* verify the vnnmap size is the same */
3813                 if (vnnmap->size != remote_vnnmap->size) {
3814                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different size of vnnmap. %u vs %u (ours)\n", 
3815                                   nodemap->nodes[j].pnn, remote_vnnmap->size, vnnmap->size));
3816                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3817                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3818                         return;
3819                 }
3820
3821                 /* verify the vnnmap is the same */
3822                 for (i=0;i<vnnmap->size;i++) {
3823                         if (remote_vnnmap->map[i] != vnnmap->map[i]) {
3824                                 DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different vnnmap.\n", 
3825                                           nodemap->nodes[j].pnn));
3826                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3827                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3828                                             vnnmap);
3829                                 return;
3830                         }
3831                 }
3832         }
3833
3834         /* we might need to change who has what IP assigned */
3835         if (rec->need_takeover_run) {
3836                 uint32_t culprit = (uint32_t)-1;
3837
3838                 rec->need_takeover_run = false;
3839
3840                 /* update the list of public ips that a node can handle for
3841                    all connected nodes
3842                 */
3843                 ret = ctdb_reload_remote_public_ips(ctdb, rec, nodemap, &culprit);
3844                 if (ret != 0) {
3845                         DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
3846                                          culprit));
3847                         rec->need_takeover_run = true;
3848                         return;
3849                 }
3850
3851                 /* execute the "startrecovery" event script on all nodes */
3852                 ret = run_startrecovery_eventscript(rec, nodemap);
3853                 if (ret!=0) {
3854                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
3855                         ctdb_set_culprit(rec, ctdb->pnn);
3856                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3857                         return;
3858                 }
3859
3860                 /* If takeover run fails, then the offending nodes are
3861                  * assigned ban culprit counts. And we re-try takeover.
3862                  * If takeover run fails repeatedly, the node would get
3863                  * banned.
3864                  *
3865                  * If rec->need_takeover_run is not set to true at this
3866                  * failure, monitoring is disabled cluster-wide (via
3867                  * startrecovery eventscript) and will not get enabled.
3868                  */
3869                 ret = ctdb_takeover_run(ctdb, nodemap, takeover_fail_callback, rec);
3870                 if (ret != 0) {
3871                         DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses. Trying again\n"));
3872                         return;
3873                 }
3874
3875                 /* execute the "recovered" event script on all nodes */
3876                 ret = run_recovered_eventscript(rec, nodemap, "monitor_cluster");
3877 #if 0
3878 // we cant check whether the event completed successfully
3879 // since this script WILL fail if the node is in recovery mode
3880 // and if that race happens, the code here would just cause a second
3881 // cascading recovery.
3882                 if (ret!=0) {
3883                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Update of public ips failed.\n"));
3884                         ctdb_set_culprit(rec, ctdb->pnn);
3885                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3886                 }
3887 #endif
3888         }
3889 }
3890
3891 /*
3892   the main monitoring loop
3893  */
3894 static void monitor_cluster(struct ctdb_context *ctdb)
3895 {
3896         struct ctdb_recoverd *rec;
3897
3898         DEBUG(DEBUG_NOTICE,("monitor_cluster starting\n"));
3899
3900         rec = talloc_zero(ctdb, struct ctdb_recoverd);
3901         CTDB_NO_MEMORY_FATAL(ctdb, rec);
3902
3903         rec->ctdb = ctdb;
3904
3905         rec->priority_time = timeval_current();
3906
3907         /* register a message port for sending memory dumps */
3908         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_MEM_DUMP, mem_dump_handler, rec);
3909
3910         /* register a message port for requesting logs */
3911         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_GETLOG, getlog_handler, rec);
3912
3913         /* register a message port for clearing logs */
3914         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_CLEARLOG, clearlog_handler, rec);
3915
3916         /* register a message port for recovery elections */
3917         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RECOVERY, election_handler, rec);
3918
3919         /* when nodes are disabled/enabled */
3920         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_SET_NODE_FLAGS, monitor_handler, rec);
3921
3922         /* when we are asked to puch out a flag change */
3923         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_PUSH_NODE_FLAGS, push_flags_handler, rec);
3924
3925         /* register a message port for vacuum fetch */
3926         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_VACUUM_FETCH, vacuum_fetch_handler, rec);
3927
3928         /* register a message port for reloadnodes  */
3929         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RELOAD_NODES, reload_nodes_handler, rec);
3930
3931         /* register a message port for performing a takeover run */
3932         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_TAKEOVER_RUN, ip_reallocate_handler, rec);
3933
3934         /* register a message port for performing a reload all ips */
3935         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RELOAD_ALL_IPS, ip_reloadall_handler, rec);
3936
3937         /* register a message port for disabling the ip check for a short while */
3938         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_DISABLE_IP_CHECK, disable_ip_check_handler, rec);
3939
3940         /* register a message port for updating the recovery daemons node assignment for an ip */
3941         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RECD_UPDATE_IP, recd_update_ip_handler, rec);
3942
3943         /* register a message port for forcing a rebalance of a node next
3944            reallocation */
3945         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_REBALANCE_NODE, recd_node_rebalance_handler, rec);
3946
3947         for (;;) {
3948                 TALLOC_CTX *mem_ctx = talloc_new(ctdb);
3949                 struct timeval start;
3950                 double elapsed;
3951
3952                 if (!mem_ctx) {
3953                         DEBUG(DEBUG_CRIT,(__location__
3954                                           " Failed to create temp context\n"));
3955                         exit(-1);
3956                 }
3957
3958                 start = timeval_current();
3959                 main_loop(ctdb, rec, mem_ctx);
3960                 talloc_free(mem_ctx);
3961
3962                 /* we only check for recovery once every second */
3963                 elapsed = timeval_elapsed(&start);
3964                 if (elapsed < ctdb->tunable.recover_interval) {
3965                         ctdb_wait_timeout(ctdb, ctdb->tunable.recover_interval
3966                                           - elapsed);
3967                 }
3968         }
3969 }
3970
3971 /*
3972   event handler for when the main ctdbd dies
3973  */
3974 static void ctdb_recoverd_parent(struct event_context *ev, struct fd_event *fde, 
3975                                  uint16_t flags, void *private_data)
3976 {
3977         DEBUG(DEBUG_ALERT,("recovery daemon parent died - exiting\n"));
3978         _exit(1);
3979 }
3980
3981 /*
3982   called regularly to verify that the recovery daemon is still running
3983  */
3984 static void ctdb_check_recd(struct event_context *ev, struct timed_event *te, 
3985                               struct timeval yt, void *p)
3986 {
3987         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
3988
3989         if (ctdb_kill(ctdb, ctdb->recoverd_pid, 0) != 0) {
3990                 DEBUG(DEBUG_ERR,("Recovery daemon (pid:%d) is no longer running. Trying to restart recovery daemon.\n", (int)ctdb->recoverd_pid));
3991
3992                 event_add_timed(ctdb->ev, ctdb, timeval_zero(), 
3993                                 ctdb_restart_recd, ctdb);
3994
3995                 return;
3996         }
3997
3998         event_add_timed(ctdb->ev, ctdb->recd_ctx,
3999                         timeval_current_ofs(30, 0),
4000                         ctdb_check_recd, ctdb);
4001 }
4002
4003 static void recd_sig_child_handler(struct event_context *ev,
4004         struct signal_event *se, int signum, int count,
4005         void *dont_care, 
4006         void *private_data)
4007 {
4008 //      struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
4009         int status;
4010         pid_t pid = -1;
4011
4012         while (pid != 0) {
4013                 pid = waitpid(-1, &status, WNOHANG);
4014                 if (pid == -1) {
4015                         if (errno != ECHILD) {
4016                                 DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%s(%d)\n", strerror(errno),errno));
4017                         }
4018                         return;
4019                 }
4020                 if (pid > 0) {
4021                         DEBUG(DEBUG_DEBUG, ("RECD SIGCHLD from %d\n", (int)pid));
4022                 }
4023         }
4024 }
4025
4026 /*
4027   startup the recovery daemon as a child of the main ctdb daemon
4028  */
4029 int ctdb_start_recoverd(struct ctdb_context *ctdb)
4030 {
4031         int fd[2];
4032         struct signal_event *se;
4033         struct tevent_fd *fde;
4034
4035         if (pipe(fd) != 0) {
4036                 return -1;
4037         }
4038
4039         ctdb->ctdbd_pid = getpid();
4040
4041         ctdb->recoverd_pid = ctdb_fork_no_free_ringbuffer(ctdb);
4042         if (ctdb->recoverd_pid == -1) {
4043                 return -1;
4044         }
4045
4046         if (ctdb->recoverd_pid != 0) {
4047                 talloc_free(ctdb->recd_ctx);
4048                 ctdb->recd_ctx = talloc_new(ctdb);
4049                 CTDB_NO_MEMORY(ctdb, ctdb->recd_ctx);
4050
4051                 close(fd[0]);
4052                 event_add_timed(ctdb->ev, ctdb->recd_ctx,
4053                                 timeval_current_ofs(30, 0),
4054                                 ctdb_check_recd, ctdb);
4055                 return 0;
4056         }
4057
4058         close(fd[1]);
4059
4060         srandom(getpid() ^ time(NULL));
4061
4062         /* Clear the log ringbuffer */
4063         ctdb_clear_log(ctdb);
4064
4065         if (switch_from_server_to_client(ctdb, "recoverd") != 0) {
4066                 DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch recovery daemon into client mode. shutting down.\n"));
4067                 exit(1);
4068         }
4069
4070         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d to recovery daemon\n", fd[0]));
4071
4072         fde = event_add_fd(ctdb->ev, ctdb, fd[0], EVENT_FD_READ,
4073                      ctdb_recoverd_parent, &fd[0]);
4074         tevent_fd_set_auto_close(fde);
4075
4076         /* set up a handler to pick up sigchld */
4077         se = event_add_signal(ctdb->ev, ctdb,
4078                                      SIGCHLD, 0,
4079                                      recd_sig_child_handler,
4080                                      ctdb);
4081         if (se == NULL) {
4082                 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD in recovery daemon\n"));
4083                 exit(1);
4084         }
4085
4086         monitor_cluster(ctdb);
4087
4088         DEBUG(DEBUG_ALERT,("ERROR: ctdb_recoverd finished!?\n"));
4089         return -1;
4090 }
4091
4092 /*
4093   shutdown the recovery daemon
4094  */
4095 void ctdb_stop_recoverd(struct ctdb_context *ctdb)
4096 {
4097         if (ctdb->recoverd_pid == 0) {
4098                 return;
4099         }
4100
4101         DEBUG(DEBUG_NOTICE,("Shutting down recovery daemon\n"));
4102         ctdb_kill(ctdb, ctdb->recoverd_pid, SIGTERM);
4103
4104         TALLOC_FREE(ctdb->recd_ctx);
4105         TALLOC_FREE(ctdb->recd_ping_count);
4106 }
4107
4108 static void ctdb_restart_recd(struct event_context *ev, struct timed_event *te, 
4109                        struct timeval t, void *private_data)
4110 {
4111         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
4112
4113         DEBUG(DEBUG_ERR,("Restarting recovery daemon\n"));
4114         ctdb_stop_recoverd(ctdb);
4115         ctdb_start_recoverd(ctdb);
4116 }