server/recoverd: if we can't get the recovery lock, ban ourself
[metze/ctdb/wip.git] / server / ctdb_recoverd.c
1 /* 
2    ctdb recovery daemon
3
4    Copyright (C) Ronnie Sahlberg  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/tevent/tevent.h"
22 #include "system/filesys.h"
23 #include "system/time.h"
24 #include "system/network.h"
25 #include "system/wait.h"
26 #include "popt.h"
27 #include "cmdline.h"
28 #include "../include/ctdb_client.h"
29 #include "../include/ctdb_private.h"
30 #include "db_wrap.h"
31 #include "dlinklist.h"
32
33
34 /* list of "ctdb ipreallocate" processes to call back when we have
35    finished the takeover run.
36 */
37 struct ip_reallocate_list {
38         struct ip_reallocate_list *next;
39         struct rd_memdump_reply *rd;
40 };
41
42 struct ctdb_banning_state {
43         uint32_t count;
44         struct timeval last_reported_time;
45 };
46
47 /*
48   private state of recovery daemon
49  */
50 struct ctdb_recoverd {
51         struct ctdb_context *ctdb;
52         uint32_t recmaster;
53         uint32_t num_active;
54         uint32_t num_connected;
55         uint32_t last_culprit_node;
56         struct ctdb_node_map *nodemap;
57         struct timeval priority_time;
58         bool need_takeover_run;
59         bool need_recovery;
60         uint32_t node_flags;
61         struct timed_event *send_election_te;
62         struct timed_event *election_timeout;
63         struct vacuum_info *vacuum_info;
64         TALLOC_CTX *ip_reallocate_ctx;
65         struct ip_reallocate_list *reallocate_callers;
66         TALLOC_CTX *ip_check_disable_ctx;
67         struct ctdb_control_get_ifaces *ifaces;
68 };
69
70 #define CONTROL_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_timeout, 0)
71 #define MONITOR_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_interval, 0)
72
73
74 /*
75   ban a node for a period of time
76  */
77 static void ctdb_ban_node(struct ctdb_recoverd *rec, uint32_t pnn, uint32_t ban_time)
78 {
79         int ret;
80         struct ctdb_context *ctdb = rec->ctdb;
81         struct ctdb_ban_time bantime;
82        
83         DEBUG(DEBUG_NOTICE,("Banning node %u for %u seconds\n", pnn, ban_time));
84
85         if (!ctdb_validate_pnn(ctdb, pnn)) {
86                 DEBUG(DEBUG_ERR,("Bad pnn %u in ctdb_ban_node\n", pnn));
87                 return;
88         }
89
90         bantime.pnn  = pnn;
91         bantime.time = ban_time;
92
93         ret = ctdb_ctrl_set_ban(ctdb, CONTROL_TIMEOUT(), pnn, &bantime);
94         if (ret != 0) {
95                 DEBUG(DEBUG_ERR,(__location__ " Failed to ban node %d\n", pnn));
96                 return;
97         }
98
99 }
100
101 enum monitor_result { MONITOR_OK, MONITOR_RECOVERY_NEEDED, MONITOR_ELECTION_NEEDED, MONITOR_FAILED};
102
103
104 /*
105   run the "recovered" eventscript on all nodes
106  */
107 static int run_recovered_eventscript(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, const char *caller)
108 {
109         TALLOC_CTX *tmp_ctx;
110         uint32_t *nodes;
111
112         tmp_ctx = talloc_new(ctdb);
113         CTDB_NO_MEMORY(ctdb, tmp_ctx);
114
115         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
116         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_END_RECOVERY,
117                                         nodes, 0,
118                                         CONTROL_TIMEOUT(), false, tdb_null,
119                                         NULL, NULL,
120                                         NULL) != 0) {
121                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event when called from %s\n", caller));
122
123                 talloc_free(tmp_ctx);
124                 return -1;
125         }
126
127         talloc_free(tmp_ctx);
128         return 0;
129 }
130
131 /*
132   remember the trouble maker
133  */
134 static void ctdb_set_culprit_count(struct ctdb_recoverd *rec, uint32_t culprit, uint32_t count)
135 {
136         struct ctdb_context *ctdb = talloc_get_type(rec->ctdb, struct ctdb_context);
137         struct ctdb_banning_state *ban_state;
138
139         if (culprit > ctdb->num_nodes) {
140                 DEBUG(DEBUG_ERR,("Trying to set culprit %d but num_nodes is %d\n", culprit, ctdb->num_nodes));
141                 return;
142         }
143
144         if (ctdb->nodes[culprit]->ban_state == NULL) {
145                 ctdb->nodes[culprit]->ban_state = talloc_zero(ctdb->nodes[culprit], struct ctdb_banning_state);
146                 CTDB_NO_MEMORY_VOID(ctdb, ctdb->nodes[culprit]->ban_state);
147
148                 
149         }
150         ban_state = ctdb->nodes[culprit]->ban_state;
151         if (timeval_elapsed(&ban_state->last_reported_time) > ctdb->tunable.recovery_grace_period) {
152                 /* this was the first time in a long while this node
153                    misbehaved so we will forgive any old transgressions.
154                 */
155                 ban_state->count = 0;
156         }
157
158         ban_state->count += count;
159         ban_state->last_reported_time = timeval_current();
160         rec->last_culprit_node = culprit;
161 }
162
163 /*
164   remember the trouble maker
165  */
166 static void ctdb_set_culprit(struct ctdb_recoverd *rec, uint32_t culprit)
167 {
168         ctdb_set_culprit_count(rec, culprit, 1);
169 }
170
171
172 /* this callback is called for every node that failed to execute the
173    start recovery event
174 */
175 static void startrecovery_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
176 {
177         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
178
179         DEBUG(DEBUG_ERR, (__location__ " Node %u failed the startrecovery event. Setting it as recovery fail culprit\n", node_pnn));
180
181         ctdb_set_culprit(rec, node_pnn);
182 }
183
184 /*
185   run the "startrecovery" eventscript on all nodes
186  */
187 static int run_startrecovery_eventscript(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
188 {
189         TALLOC_CTX *tmp_ctx;
190         uint32_t *nodes;
191         struct ctdb_context *ctdb = rec->ctdb;
192
193         tmp_ctx = talloc_new(ctdb);
194         CTDB_NO_MEMORY(ctdb, tmp_ctx);
195
196         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
197         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_START_RECOVERY,
198                                         nodes, 0,
199                                         CONTROL_TIMEOUT(), false, tdb_null,
200                                         NULL,
201                                         startrecovery_fail_callback,
202                                         rec) != 0) {
203                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event. Recovery failed.\n"));
204                 talloc_free(tmp_ctx);
205                 return -1;
206         }
207
208         talloc_free(tmp_ctx);
209         return 0;
210 }
211
212 static void async_getcap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
213 {
214         if ( (outdata.dsize != sizeof(uint32_t)) || (outdata.dptr == NULL) ) {
215                 DEBUG(DEBUG_ERR, (__location__ " Invalid length/pointer for getcap callback : %u %p\n",  (unsigned)outdata.dsize, outdata.dptr));
216                 return;
217         }
218         if (node_pnn < ctdb->num_nodes) {
219                 ctdb->nodes[node_pnn]->capabilities = *((uint32_t *)outdata.dptr);
220         }
221 }
222
223 /*
224   update the node capabilities for all connected nodes
225  */
226 static int update_capabilities(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
227 {
228         uint32_t *nodes;
229         TALLOC_CTX *tmp_ctx;
230
231         tmp_ctx = talloc_new(ctdb);
232         CTDB_NO_MEMORY(ctdb, tmp_ctx);
233
234         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
235         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_CAPABILITIES,
236                                         nodes, 0,
237                                         CONTROL_TIMEOUT(),
238                                         false, tdb_null,
239                                         async_getcap_callback, NULL,
240                                         NULL) != 0) {
241                 DEBUG(DEBUG_ERR, (__location__ " Failed to read node capabilities.\n"));
242                 talloc_free(tmp_ctx);
243                 return -1;
244         }
245
246         talloc_free(tmp_ctx);
247         return 0;
248 }
249
250 static void set_recmode_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
251 {
252         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
253
254         DEBUG(DEBUG_ERR,("Failed to freeze node %u during recovery. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
255         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
256 }
257
258 static void transaction_start_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
259 {
260         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
261
262         DEBUG(DEBUG_ERR,("Failed to start recovery transaction on node %u. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
263         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
264 }
265
266 /*
267   change recovery mode on all nodes
268  */
269 static int set_recovery_mode(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t rec_mode)
270 {
271         TDB_DATA data;
272         uint32_t *nodes;
273         TALLOC_CTX *tmp_ctx;
274
275         tmp_ctx = talloc_new(ctdb);
276         CTDB_NO_MEMORY(ctdb, tmp_ctx);
277
278         /* freeze all nodes */
279         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
280         if (rec_mode == CTDB_RECOVERY_ACTIVE) {
281                 int i;
282
283                 for (i=1; i<=NUM_DB_PRIORITIES; i++) {
284                         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
285                                                 nodes, i,
286                                                 CONTROL_TIMEOUT(),
287                                                 false, tdb_null,
288                                                 NULL,
289                                                 set_recmode_fail_callback,
290                                                 rec) != 0) {
291                                 DEBUG(DEBUG_ERR, (__location__ " Unable to freeze nodes. Recovery failed.\n"));
292                                 talloc_free(tmp_ctx);
293                                 return -1;
294                         }
295                 }
296         }
297
298
299         data.dsize = sizeof(uint32_t);
300         data.dptr = (unsigned char *)&rec_mode;
301
302         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMODE,
303                                         nodes, 0,
304                                         CONTROL_TIMEOUT(),
305                                         false, data,
306                                         NULL, NULL,
307                                         NULL) != 0) {
308                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode. Recovery failed.\n"));
309                 talloc_free(tmp_ctx);
310                 return -1;
311         }
312
313         talloc_free(tmp_ctx);
314         return 0;
315 }
316
317 /*
318   change recovery master on all node
319  */
320 static int set_recovery_master(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn)
321 {
322         TDB_DATA data;
323         TALLOC_CTX *tmp_ctx;
324         uint32_t *nodes;
325
326         tmp_ctx = talloc_new(ctdb);
327         CTDB_NO_MEMORY(ctdb, tmp_ctx);
328
329         data.dsize = sizeof(uint32_t);
330         data.dptr = (unsigned char *)&pnn;
331
332         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
333         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMASTER,
334                                         nodes, 0,
335                                         CONTROL_TIMEOUT(), false, data,
336                                         NULL, NULL,
337                                         NULL) != 0) {
338                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recmaster. Recovery failed.\n"));
339                 talloc_free(tmp_ctx);
340                 return -1;
341         }
342
343         talloc_free(tmp_ctx);
344         return 0;
345 }
346
347 /* update all remote nodes to use the same db priority that we have
348    this can fail if the remove node has not yet been upgraded to 
349    support this function, so we always return success and never fail
350    a recovery if this call fails.
351 */
352 static int update_db_priority_on_remote_nodes(struct ctdb_context *ctdb,
353         struct ctdb_node_map *nodemap, 
354         uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
355 {
356         int db;
357         uint32_t *nodes;
358
359         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
360
361         /* step through all local databases */
362         for (db=0; db<dbmap->num;db++) {
363                 TDB_DATA data;
364                 struct ctdb_db_priority db_prio;
365                 int ret;
366
367                 db_prio.db_id     = dbmap->dbs[db].dbid;
368                 ret = ctdb_ctrl_get_db_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, dbmap->dbs[db].dbid, &db_prio.priority);
369                 if (ret != 0) {
370                         DEBUG(DEBUG_ERR,(__location__ " Failed to read database priority from local node for db 0x%08x\n", dbmap->dbs[db].dbid));
371                         continue;
372                 }
373
374                 DEBUG(DEBUG_INFO,("Update DB priority for db 0x%08x to %u\n", dbmap->dbs[db].dbid, db_prio.priority)); 
375
376                 data.dptr  = (uint8_t *)&db_prio;
377                 data.dsize = sizeof(db_prio);
378
379                 if (ctdb_client_async_control(ctdb,
380                                         CTDB_CONTROL_SET_DB_PRIORITY,
381                                         nodes, 0,
382                                         CONTROL_TIMEOUT(), false, data,
383                                         NULL, NULL,
384                                         NULL) != 0) {
385                         DEBUG(DEBUG_ERR,(__location__ " Failed to set DB priority for 0x%08x\n", db_prio.db_id));
386                 }
387         }
388
389         return 0;
390 }                       
391
392 /*
393   ensure all other nodes have attached to any databases that we have
394  */
395 static int create_missing_remote_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
396                                            uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
397 {
398         int i, j, db, ret;
399         struct ctdb_dbid_map *remote_dbmap;
400
401         /* verify that all other nodes have all our databases */
402         for (j=0; j<nodemap->num; j++) {
403                 /* we dont need to ourself ourselves */
404                 if (nodemap->nodes[j].pnn == pnn) {
405                         continue;
406                 }
407                 /* dont check nodes that are unavailable */
408                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
409                         continue;
410                 }
411
412                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
413                                          mem_ctx, &remote_dbmap);
414                 if (ret != 0) {
415                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
416                         return -1;
417                 }
418
419                 /* step through all local databases */
420                 for (db=0; db<dbmap->num;db++) {
421                         const char *name;
422
423
424                         for (i=0;i<remote_dbmap->num;i++) {
425                                 if (dbmap->dbs[db].dbid == remote_dbmap->dbs[i].dbid) {
426                                         break;
427                                 }
428                         }
429                         /* the remote node already have this database */
430                         if (i!=remote_dbmap->num) {
431                                 continue;
432                         }
433                         /* ok so we need to create this database */
434                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), pnn, dbmap->dbs[db].dbid, 
435                                             mem_ctx, &name);
436                         if (ret != 0) {
437                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", pnn));
438                                 return -1;
439                         }
440                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
441                                            mem_ctx, name, dbmap->dbs[db].persistent);
442                         if (ret != 0) {
443                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create remote db:%s\n", name));
444                                 return -1;
445                         }
446                 }
447         }
448
449         return 0;
450 }
451
452
453 /*
454   ensure we are attached to any databases that anyone else is attached to
455  */
456 static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
457                                           uint32_t pnn, struct ctdb_dbid_map **dbmap, TALLOC_CTX *mem_ctx)
458 {
459         int i, j, db, ret;
460         struct ctdb_dbid_map *remote_dbmap;
461
462         /* verify that we have all database any other node has */
463         for (j=0; j<nodemap->num; j++) {
464                 /* we dont need to ourself ourselves */
465                 if (nodemap->nodes[j].pnn == pnn) {
466                         continue;
467                 }
468                 /* dont check nodes that are unavailable */
469                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
470                         continue;
471                 }
472
473                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
474                                          mem_ctx, &remote_dbmap);
475                 if (ret != 0) {
476                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
477                         return -1;
478                 }
479
480                 /* step through all databases on the remote node */
481                 for (db=0; db<remote_dbmap->num;db++) {
482                         const char *name;
483
484                         for (i=0;i<(*dbmap)->num;i++) {
485                                 if (remote_dbmap->dbs[db].dbid == (*dbmap)->dbs[i].dbid) {
486                                         break;
487                                 }
488                         }
489                         /* we already have this db locally */
490                         if (i!=(*dbmap)->num) {
491                                 continue;
492                         }
493                         /* ok so we need to create this database and
494                            rebuild dbmap
495                          */
496                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
497                                             remote_dbmap->dbs[db].dbid, mem_ctx, &name);
498                         if (ret != 0) {
499                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", 
500                                           nodemap->nodes[j].pnn));
501                                 return -1;
502                         }
503                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, name, 
504                                            remote_dbmap->dbs[db].persistent);
505                         if (ret != 0) {
506                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create local db:%s\n", name));
507                                 return -1;
508                         }
509                         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, dbmap);
510                         if (ret != 0) {
511                                 DEBUG(DEBUG_ERR, (__location__ " Unable to reread dbmap on node %u\n", pnn));
512                                 return -1;
513                         }
514                 }
515         }
516
517         return 0;
518 }
519
520
521 /*
522   pull the remote database contents from one node into the recdb
523  */
524 static int pull_one_remote_database(struct ctdb_context *ctdb, uint32_t srcnode, 
525                                     struct tdb_wrap *recdb, uint32_t dbid,
526                                     bool persistent)
527 {
528         int ret;
529         TDB_DATA outdata;
530         struct ctdb_marshall_buffer *reply;
531         struct ctdb_rec_data *rec;
532         int i;
533         TALLOC_CTX *tmp_ctx = talloc_new(recdb);
534
535         ret = ctdb_ctrl_pulldb(ctdb, srcnode, dbid, CTDB_LMASTER_ANY, tmp_ctx,
536                                CONTROL_TIMEOUT(), &outdata);
537         if (ret != 0) {
538                 DEBUG(DEBUG_ERR,(__location__ " Unable to copy db from node %u\n", srcnode));
539                 talloc_free(tmp_ctx);
540                 return -1;
541         }
542
543         reply = (struct ctdb_marshall_buffer *)outdata.dptr;
544
545         if (outdata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
546                 DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
547                 talloc_free(tmp_ctx);
548                 return -1;
549         }
550         
551         rec = (struct ctdb_rec_data *)&reply->data[0];
552         
553         for (i=0;
554              i<reply->count;
555              rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec), i++) {
556                 TDB_DATA key, data;
557                 struct ctdb_ltdb_header *hdr;
558                 TDB_DATA existing;
559                 
560                 key.dptr = &rec->data[0];
561                 key.dsize = rec->keylen;
562                 data.dptr = &rec->data[key.dsize];
563                 data.dsize = rec->datalen;
564                 
565                 hdr = (struct ctdb_ltdb_header *)data.dptr;
566
567                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
568                         DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
569                         talloc_free(tmp_ctx);
570                         return -1;
571                 }
572
573                 /* fetch the existing record, if any */
574                 existing = tdb_fetch(recdb->tdb, key);
575                 
576                 if (existing.dptr != NULL) {
577                         struct ctdb_ltdb_header header;
578                         if (existing.dsize < sizeof(struct ctdb_ltdb_header)) {
579                                 DEBUG(DEBUG_CRIT,(__location__ " Bad record size %u from node %u\n", 
580                                          (unsigned)existing.dsize, srcnode));
581                                 free(existing.dptr);
582                                 talloc_free(tmp_ctx);
583                                 return -1;
584                         }
585                         header = *(struct ctdb_ltdb_header *)existing.dptr;
586                         free(existing.dptr);
587                         if (!(header.rsn < hdr->rsn ||
588                               (header.dmaster != ctdb->recovery_master && header.rsn == hdr->rsn))) {
589                                 continue;
590                         }
591                 }
592                 
593                 if (tdb_store(recdb->tdb, key, data, TDB_REPLACE) != 0) {
594                         DEBUG(DEBUG_CRIT,(__location__ " Failed to store record\n"));
595                         talloc_free(tmp_ctx);
596                         return -1;                              
597                 }
598         }
599
600         talloc_free(tmp_ctx);
601
602         return 0;
603 }
604
605 /*
606   pull all the remote database contents into the recdb
607  */
608 static int pull_remote_database(struct ctdb_context *ctdb,
609                                 struct ctdb_recoverd *rec, 
610                                 struct ctdb_node_map *nodemap, 
611                                 struct tdb_wrap *recdb, uint32_t dbid,
612                                 bool persistent)
613 {
614         int j;
615
616         /* pull all records from all other nodes across onto this node
617            (this merges based on rsn)
618         */
619         for (j=0; j<nodemap->num; j++) {
620                 /* dont merge from nodes that are unavailable */
621                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
622                         continue;
623                 }
624                 if (pull_one_remote_database(ctdb, nodemap->nodes[j].pnn, recdb, dbid, persistent) != 0) {
625                         DEBUG(DEBUG_ERR,(__location__ " Failed to pull remote database from node %u\n", 
626                                  nodemap->nodes[j].pnn));
627                         ctdb_set_culprit_count(rec, nodemap->nodes[j].pnn, nodemap->num);
628                         return -1;
629                 }
630         }
631         
632         return 0;
633 }
634
635
636 /*
637   update flags on all active nodes
638  */
639 static int update_flags_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn, uint32_t flags)
640 {
641         int ret;
642
643         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, flags, ~flags);
644                 if (ret != 0) {
645                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
646                 return -1;
647         }
648
649         return 0;
650 }
651
652 /*
653   ensure all nodes have the same vnnmap we do
654  */
655 static int update_vnnmap_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
656                                       uint32_t pnn, struct ctdb_vnn_map *vnnmap, TALLOC_CTX *mem_ctx)
657 {
658         int j, ret;
659
660         /* push the new vnn map out to all the nodes */
661         for (j=0; j<nodemap->num; j++) {
662                 /* dont push to nodes that are unavailable */
663                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
664                         continue;
665                 }
666
667                 ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, mem_ctx, vnnmap);
668                 if (ret != 0) {
669                         DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
670                         return -1;
671                 }
672         }
673
674         return 0;
675 }
676
677
678 struct vacuum_info {
679         struct vacuum_info *next, *prev;
680         struct ctdb_recoverd *rec;
681         uint32_t srcnode;
682         struct ctdb_db_context *ctdb_db;
683         struct ctdb_marshall_buffer *recs;
684         struct ctdb_rec_data *r;
685 };
686
687 static void vacuum_fetch_next(struct vacuum_info *v);
688
689 /*
690   called when a vacuum fetch has completed - just free it and do the next one
691  */
692 static void vacuum_fetch_callback(struct ctdb_client_call_state *state)
693 {
694         struct vacuum_info *v = talloc_get_type(state->async.private_data, struct vacuum_info);
695         talloc_free(state);
696         vacuum_fetch_next(v);
697 }
698
699
700 /*
701   process the next element from the vacuum list
702 */
703 static void vacuum_fetch_next(struct vacuum_info *v)
704 {
705         struct ctdb_call call;
706         struct ctdb_rec_data *r;
707
708         while (v->recs->count) {
709                 struct ctdb_client_call_state *state;
710                 TDB_DATA data;
711                 struct ctdb_ltdb_header *hdr;
712
713                 ZERO_STRUCT(call);
714                 call.call_id = CTDB_NULL_FUNC;
715                 call.flags = CTDB_IMMEDIATE_MIGRATION;
716
717                 r = v->r;
718                 v->r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
719                 v->recs->count--;
720
721                 call.key.dptr = &r->data[0];
722                 call.key.dsize = r->keylen;
723
724                 /* ensure we don't block this daemon - just skip a record if we can't get
725                    the chainlock */
726                 if (tdb_chainlock_nonblock(v->ctdb_db->ltdb->tdb, call.key) != 0) {
727                         continue;
728                 }
729
730                 data = tdb_fetch(v->ctdb_db->ltdb->tdb, call.key);
731                 if (data.dptr == NULL) {
732                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
733                         continue;
734                 }
735
736                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
737                         free(data.dptr);
738                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
739                         continue;
740                 }
741                 
742                 hdr = (struct ctdb_ltdb_header *)data.dptr;
743                 if (hdr->dmaster == v->rec->ctdb->pnn) {
744                         /* its already local */
745                         free(data.dptr);
746                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
747                         continue;
748                 }
749
750                 free(data.dptr);
751
752                 state = ctdb_call_send(v->ctdb_db, &call);
753                 tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
754                 if (state == NULL) {
755                         DEBUG(DEBUG_ERR,(__location__ " Failed to setup vacuum fetch call\n"));
756                         talloc_free(v);
757                         return;
758                 }
759                 state->async.fn = vacuum_fetch_callback;
760                 state->async.private_data = v;
761                 return;
762         }
763
764         talloc_free(v);
765 }
766
767
768 /*
769   destroy a vacuum info structure
770  */
771 static int vacuum_info_destructor(struct vacuum_info *v)
772 {
773         DLIST_REMOVE(v->rec->vacuum_info, v);
774         return 0;
775 }
776
777
778 /*
779   handler for vacuum fetch
780 */
781 static void vacuum_fetch_handler(struct ctdb_context *ctdb, uint64_t srvid, 
782                                  TDB_DATA data, void *private_data)
783 {
784         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
785         struct ctdb_marshall_buffer *recs;
786         int ret, i;
787         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
788         const char *name;
789         struct ctdb_dbid_map *dbmap=NULL;
790         bool persistent = false;
791         struct ctdb_db_context *ctdb_db;
792         struct ctdb_rec_data *r;
793         uint32_t srcnode;
794         struct vacuum_info *v;
795
796         recs = (struct ctdb_marshall_buffer *)data.dptr;
797         r = (struct ctdb_rec_data *)&recs->data[0];
798
799         if (recs->count == 0) {
800                 talloc_free(tmp_ctx);
801                 return;
802         }
803
804         srcnode = r->reqid;
805
806         for (v=rec->vacuum_info;v;v=v->next) {
807                 if (srcnode == v->srcnode && recs->db_id == v->ctdb_db->db_id) {
808                         /* we're already working on records from this node */
809                         talloc_free(tmp_ctx);
810                         return;
811                 }
812         }
813
814         /* work out if the database is persistent */
815         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &dbmap);
816         if (ret != 0) {
817                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from local node\n"));
818                 talloc_free(tmp_ctx);
819                 return;
820         }
821
822         for (i=0;i<dbmap->num;i++) {
823                 if (dbmap->dbs[i].dbid == recs->db_id) {
824                         persistent = dbmap->dbs[i].persistent;
825                         break;
826                 }
827         }
828         if (i == dbmap->num) {
829                 DEBUG(DEBUG_ERR, (__location__ " Unable to find db_id 0x%x on local node\n", recs->db_id));
830                 talloc_free(tmp_ctx);
831                 return;         
832         }
833
834         /* find the name of this database */
835         if (ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, recs->db_id, tmp_ctx, &name) != 0) {
836                 DEBUG(DEBUG_ERR,(__location__ " Failed to get name of db 0x%x\n", recs->db_id));
837                 talloc_free(tmp_ctx);
838                 return;
839         }
840
841         /* attach to it */
842         ctdb_db = ctdb_attach(ctdb, name, persistent, 0);
843         if (ctdb_db == NULL) {
844                 DEBUG(DEBUG_ERR,(__location__ " Failed to attach to database '%s'\n", name));
845                 talloc_free(tmp_ctx);
846                 return;
847         }
848
849         v = talloc_zero(rec, struct vacuum_info);
850         if (v == NULL) {
851                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
852                 talloc_free(tmp_ctx);
853                 return;
854         }
855
856         v->rec = rec;
857         v->srcnode = srcnode;
858         v->ctdb_db = ctdb_db;
859         v->recs = talloc_memdup(v, recs, data.dsize);
860         if (v->recs == NULL) {
861                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
862                 talloc_free(v);
863                 talloc_free(tmp_ctx);
864                 return;         
865         }
866         v->r =  (struct ctdb_rec_data *)&v->recs->data[0];
867
868         DLIST_ADD(rec->vacuum_info, v);
869
870         talloc_set_destructor(v, vacuum_info_destructor);
871
872         vacuum_fetch_next(v);
873         talloc_free(tmp_ctx);
874 }
875
876
877 /*
878   called when ctdb_wait_timeout should finish
879  */
880 static void ctdb_wait_handler(struct event_context *ev, struct timed_event *te, 
881                               struct timeval yt, void *p)
882 {
883         uint32_t *timed_out = (uint32_t *)p;
884         (*timed_out) = 1;
885 }
886
887 /*
888   wait for a given number of seconds
889  */
890 static void ctdb_wait_timeout(struct ctdb_context *ctdb, double secs)
891 {
892         uint32_t timed_out = 0;
893         time_t usecs = (secs - (time_t)secs) * 1000000;
894         event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(secs, usecs), ctdb_wait_handler, &timed_out);
895         while (!timed_out) {
896                 event_loop_once(ctdb->ev);
897         }
898 }
899
900 /*
901   called when an election times out (ends)
902  */
903 static void ctdb_election_timeout(struct event_context *ev, struct timed_event *te, 
904                                   struct timeval t, void *p)
905 {
906         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
907         rec->election_timeout = NULL;
908         fast_start = false;
909
910         DEBUG(DEBUG_WARNING,(__location__ " Election timed out\n"));
911 }
912
913
914 /*
915   wait for an election to finish. It finished election_timeout seconds after
916   the last election packet is received
917  */
918 static void ctdb_wait_election(struct ctdb_recoverd *rec)
919 {
920         struct ctdb_context *ctdb = rec->ctdb;
921         while (rec->election_timeout) {
922                 event_loop_once(ctdb->ev);
923         }
924 }
925
926 /*
927   Update our local flags from all remote connected nodes. 
928   This is only run when we are or we belive we are the recovery master
929  */
930 static int update_local_flags(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
931 {
932         int j;
933         struct ctdb_context *ctdb = rec->ctdb;
934         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
935
936         /* get the nodemap for all active remote nodes and verify
937            they are the same as for this node
938          */
939         for (j=0; j<nodemap->num; j++) {
940                 struct ctdb_node_map *remote_nodemap=NULL;
941                 int ret;
942
943                 if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
944                         continue;
945                 }
946                 if (nodemap->nodes[j].pnn == ctdb->pnn) {
947                         continue;
948                 }
949
950                 ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
951                                            mem_ctx, &remote_nodemap);
952                 if (ret != 0) {
953                         DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from remote node %u\n", 
954                                   nodemap->nodes[j].pnn));
955                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
956                         talloc_free(mem_ctx);
957                         return MONITOR_FAILED;
958                 }
959                 if (nodemap->nodes[j].flags != remote_nodemap->nodes[j].flags) {
960                         /* We should tell our daemon about this so it
961                            updates its flags or else we will log the same 
962                            message again in the next iteration of recovery.
963                            Since we are the recovery master we can just as
964                            well update the flags on all nodes.
965                         */
966                         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, nodemap->nodes[j].flags, ~nodemap->nodes[j].flags);
967                         if (ret != 0) {
968                                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
969                                 return -1;
970                         }
971
972                         /* Update our local copy of the flags in the recovery
973                            daemon.
974                         */
975                         DEBUG(DEBUG_NOTICE,("Remote node %u had flags 0x%x, local had 0x%x - updating local\n",
976                                  nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags,
977                                  nodemap->nodes[j].flags));
978                         nodemap->nodes[j].flags = remote_nodemap->nodes[j].flags;
979                 }
980                 talloc_free(remote_nodemap);
981         }
982         talloc_free(mem_ctx);
983         return MONITOR_OK;
984 }
985
986
987 /* Create a new random generation ip. 
988    The generation id can not be the INVALID_GENERATION id
989 */
990 static uint32_t new_generation(void)
991 {
992         uint32_t generation;
993
994         while (1) {
995                 generation = random();
996
997                 if (generation != INVALID_GENERATION) {
998                         break;
999                 }
1000         }
1001
1002         return generation;
1003 }
1004
1005
1006 /*
1007   create a temporary working database
1008  */
1009 static struct tdb_wrap *create_recdb(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx)
1010 {
1011         char *name;
1012         struct tdb_wrap *recdb;
1013         unsigned tdb_flags;
1014
1015         /* open up the temporary recovery database */
1016         name = talloc_asprintf(mem_ctx, "%s/recdb.tdb.%u",
1017                                ctdb->db_directory_state,
1018                                ctdb->pnn);
1019         if (name == NULL) {
1020                 return NULL;
1021         }
1022         unlink(name);
1023
1024         tdb_flags = TDB_NOLOCK;
1025         if (ctdb->valgrinding) {
1026                 tdb_flags |= TDB_NOMMAP;
1027         }
1028         tdb_flags |= TDB_DISALLOW_NESTING;
1029
1030         recdb = tdb_wrap_open(mem_ctx, name, ctdb->tunable.database_hash_size, 
1031                               tdb_flags, O_RDWR|O_CREAT|O_EXCL, 0600);
1032         if (recdb == NULL) {
1033                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create temp recovery database '%s'\n", name));
1034         }
1035
1036         talloc_free(name);
1037
1038         return recdb;
1039 }
1040
1041
1042 /* 
1043    a traverse function for pulling all relevent records from recdb
1044  */
1045 struct recdb_data {
1046         struct ctdb_context *ctdb;
1047         struct ctdb_marshall_buffer *recdata;
1048         uint32_t len;
1049         bool failed;
1050         bool persistent;
1051 };
1052
1053 static int traverse_recdb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
1054 {
1055         struct recdb_data *params = (struct recdb_data *)p;
1056         struct ctdb_rec_data *rec;
1057         struct ctdb_ltdb_header *hdr;
1058
1059         /* skip empty records */
1060         if (data.dsize <= sizeof(struct ctdb_ltdb_header)) {
1061                 return 0;
1062         }
1063
1064         /* update the dmaster field to point to us */
1065         hdr = (struct ctdb_ltdb_header *)data.dptr;
1066         if (!params->persistent) {
1067                 hdr->dmaster = params->ctdb->pnn;
1068         }
1069
1070         /* add the record to the blob ready to send to the nodes */
1071         rec = ctdb_marshall_record(params->recdata, 0, key, NULL, data);
1072         if (rec == NULL) {
1073                 params->failed = true;
1074                 return -1;
1075         }
1076         params->recdata = talloc_realloc_size(NULL, params->recdata, rec->length + params->len);
1077         if (params->recdata == NULL) {
1078                 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata to %u (%u records)\n", 
1079                          rec->length + params->len, params->recdata->count));
1080                 params->failed = true;
1081                 return -1;
1082         }
1083         params->recdata->count++;
1084         memcpy(params->len+(uint8_t *)params->recdata, rec, rec->length);
1085         params->len += rec->length;
1086         talloc_free(rec);
1087
1088         return 0;
1089 }
1090
1091 /*
1092   push the recdb database out to all nodes
1093  */
1094 static int push_recdb_database(struct ctdb_context *ctdb, uint32_t dbid,
1095                                bool persistent,
1096                                struct tdb_wrap *recdb, struct ctdb_node_map *nodemap)
1097 {
1098         struct recdb_data params;
1099         struct ctdb_marshall_buffer *recdata;
1100         TDB_DATA outdata;
1101         TALLOC_CTX *tmp_ctx;
1102         uint32_t *nodes;
1103
1104         tmp_ctx = talloc_new(ctdb);
1105         CTDB_NO_MEMORY(ctdb, tmp_ctx);
1106
1107         recdata = talloc_zero(recdb, struct ctdb_marshall_buffer);
1108         CTDB_NO_MEMORY(ctdb, recdata);
1109
1110         recdata->db_id = dbid;
1111
1112         params.ctdb = ctdb;
1113         params.recdata = recdata;
1114         params.len = offsetof(struct ctdb_marshall_buffer, data);
1115         params.failed = false;
1116         params.persistent = persistent;
1117
1118         if (tdb_traverse_read(recdb->tdb, traverse_recdb, &params) == -1) {
1119                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1120                 talloc_free(params.recdata);
1121                 talloc_free(tmp_ctx);
1122                 return -1;
1123         }
1124
1125         if (params.failed) {
1126                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1127                 talloc_free(params.recdata);
1128                 talloc_free(tmp_ctx);
1129                 return -1;              
1130         }
1131
1132         recdata = params.recdata;
1133
1134         outdata.dptr = (void *)recdata;
1135         outdata.dsize = params.len;
1136
1137         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
1138         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_PUSH_DB,
1139                                         nodes, 0,
1140                                         CONTROL_TIMEOUT(), false, outdata,
1141                                         NULL, NULL,
1142                                         NULL) != 0) {
1143                 DEBUG(DEBUG_ERR,(__location__ " Failed to push recdb records to nodes for db 0x%x\n", dbid));
1144                 talloc_free(recdata);
1145                 talloc_free(tmp_ctx);
1146                 return -1;
1147         }
1148
1149         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pushed remote database 0x%x of size %u\n", 
1150                   dbid, recdata->count));
1151
1152         talloc_free(recdata);
1153         talloc_free(tmp_ctx);
1154
1155         return 0;
1156 }
1157
1158
1159 /*
1160   go through a full recovery on one database 
1161  */
1162 static int recover_database(struct ctdb_recoverd *rec, 
1163                             TALLOC_CTX *mem_ctx,
1164                             uint32_t dbid,
1165                             bool persistent,
1166                             uint32_t pnn, 
1167                             struct ctdb_node_map *nodemap,
1168                             uint32_t transaction_id)
1169 {
1170         struct tdb_wrap *recdb;
1171         int ret;
1172         struct ctdb_context *ctdb = rec->ctdb;
1173         TDB_DATA data;
1174         struct ctdb_control_wipe_database w;
1175         uint32_t *nodes;
1176
1177         recdb = create_recdb(ctdb, mem_ctx);
1178         if (recdb == NULL) {
1179                 return -1;
1180         }
1181
1182         /* pull all remote databases onto the recdb */
1183         ret = pull_remote_database(ctdb, rec, nodemap, recdb, dbid, persistent);
1184         if (ret != 0) {
1185                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull remote database 0x%x\n", dbid));
1186                 return -1;
1187         }
1188
1189         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pulled remote database 0x%x\n", dbid));
1190
1191         /* wipe all the remote databases. This is safe as we are in a transaction */
1192         w.db_id = dbid;
1193         w.transaction_id = transaction_id;
1194
1195         data.dptr = (void *)&w;
1196         data.dsize = sizeof(w);
1197
1198         nodes = list_of_active_nodes(ctdb, nodemap, recdb, true);
1199         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
1200                                         nodes, 0,
1201                                         CONTROL_TIMEOUT(), false, data,
1202                                         NULL, NULL,
1203                                         NULL) != 0) {
1204                 DEBUG(DEBUG_ERR, (__location__ " Unable to wipe database. Recovery failed.\n"));
1205                 talloc_free(recdb);
1206                 return -1;
1207         }
1208         
1209         /* push out the correct database. This sets the dmaster and skips 
1210            the empty records */
1211         ret = push_recdb_database(ctdb, dbid, persistent, recdb, nodemap);
1212         if (ret != 0) {
1213                 talloc_free(recdb);
1214                 return -1;
1215         }
1216
1217         /* all done with this database */
1218         talloc_free(recdb);
1219
1220         return 0;
1221 }
1222
1223 /*
1224   reload the nodes file 
1225 */
1226 static void reload_nodes_file(struct ctdb_context *ctdb)
1227 {
1228         ctdb->nodes = NULL;
1229         ctdb_load_nodes_file(ctdb);
1230 }
1231
1232 static int ctdb_reload_remote_public_ips(struct ctdb_context *ctdb,
1233                                          struct ctdb_recoverd *rec,
1234                                          struct ctdb_node_map *nodemap,
1235                                          uint32_t *culprit)
1236 {
1237         int j;
1238         int ret;
1239
1240         if (ctdb->num_nodes != nodemap->num) {
1241                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) invalid param\n",
1242                                   ctdb->num_nodes, nodemap->num));
1243                 if (culprit) {
1244                         *culprit = ctdb->pnn;
1245                 }
1246                 return -1;
1247         }
1248
1249         for (j=0; j<nodemap->num; j++) {
1250                 /* release any existing data */
1251                 if (ctdb->nodes[j]->known_public_ips) {
1252                         talloc_free(ctdb->nodes[j]->known_public_ips);
1253                         ctdb->nodes[j]->known_public_ips = NULL;
1254                 }
1255                 if (ctdb->nodes[j]->available_public_ips) {
1256                         talloc_free(ctdb->nodes[j]->available_public_ips);
1257                         ctdb->nodes[j]->available_public_ips = NULL;
1258                 }
1259
1260                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1261                         continue;
1262                 }
1263
1264                 /* grab a new shiny list of public ips from the node */
1265                 ret = ctdb_ctrl_get_public_ips_flags(ctdb,
1266                                         CONTROL_TIMEOUT(),
1267                                         ctdb->nodes[j]->pnn,
1268                                         ctdb->nodes,
1269                                         0,
1270                                         &ctdb->nodes[j]->known_public_ips);
1271                 if (ret != 0) {
1272                         DEBUG(DEBUG_ERR,("Failed to read known public ips from node : %u\n",
1273                                 ctdb->nodes[j]->pnn));
1274                         if (culprit) {
1275                                 *culprit = ctdb->nodes[j]->pnn;
1276                         }
1277                         return -1;
1278                 }
1279
1280                 if (rec->ip_check_disable_ctx == NULL) {
1281                         if (verify_remote_ip_allocation(ctdb, ctdb->nodes[j]->known_public_ips)) {
1282                                 DEBUG(DEBUG_ERR,("Node %d has inconsistent public ip allocation and needs update.\n", ctdb->nodes[j]->pnn));
1283                                 rec->need_takeover_run = true;
1284                         }
1285                 }
1286
1287                 /* grab a new shiny list of public ips from the node */
1288                 ret = ctdb_ctrl_get_public_ips_flags(ctdb,
1289                                         CONTROL_TIMEOUT(),
1290                                         ctdb->nodes[j]->pnn,
1291                                         ctdb->nodes,
1292                                         CTDB_PUBLIC_IP_FLAGS_ONLY_AVAILABLE,
1293                                         &ctdb->nodes[j]->available_public_ips);
1294                 if (ret != 0) {
1295                         DEBUG(DEBUG_ERR,("Failed to read available public ips from node : %u\n",
1296                                 ctdb->nodes[j]->pnn));
1297                         if (culprit) {
1298                                 *culprit = ctdb->nodes[j]->pnn;
1299                         }
1300                         return -1;
1301                 }
1302         }
1303
1304         return 0;
1305 }
1306
1307 /* when we start a recovery, make sure all nodes use the same reclock file
1308    setting
1309 */
1310 static int sync_recovery_lock_file_across_cluster(struct ctdb_recoverd *rec)
1311 {
1312         struct ctdb_context *ctdb = rec->ctdb;
1313         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
1314         TDB_DATA data;
1315         uint32_t *nodes;
1316
1317         if (ctdb->recovery_lock_file == NULL) {
1318                 data.dptr  = NULL;
1319                 data.dsize = 0;
1320         } else {
1321                 data.dsize = strlen(ctdb->recovery_lock_file) + 1;
1322                 data.dptr  = (uint8_t *)ctdb->recovery_lock_file;
1323         }
1324
1325         nodes = list_of_active_nodes(ctdb, rec->nodemap, tmp_ctx, true);
1326         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECLOCK_FILE,
1327                                         nodes, 0,
1328                                         CONTROL_TIMEOUT(),
1329                                         false, data,
1330                                         NULL, NULL,
1331                                         rec) != 0) {
1332                 DEBUG(DEBUG_ERR, (__location__ " Failed to sync reclock file settings\n"));
1333                 talloc_free(tmp_ctx);
1334                 return -1;
1335         }
1336
1337         talloc_free(tmp_ctx);
1338         return 0;
1339 }
1340
1341
1342 /*
1343   we are the recmaster, and recovery is needed - start a recovery run
1344  */
1345 static int do_recovery(struct ctdb_recoverd *rec, 
1346                        TALLOC_CTX *mem_ctx, uint32_t pnn,
1347                        struct ctdb_node_map *nodemap, struct ctdb_vnn_map *vnnmap)
1348 {
1349         struct ctdb_context *ctdb = rec->ctdb;
1350         int i, j, ret;
1351         uint32_t generation;
1352         struct ctdb_dbid_map *dbmap;
1353         TDB_DATA data;
1354         uint32_t *nodes;
1355         struct timeval start_time;
1356         uint32_t culprit = (uint32_t)-1;
1357
1358         DEBUG(DEBUG_NOTICE, (__location__ " Starting do_recovery\n"));
1359
1360         /* if recovery fails, force it again */
1361         rec->need_recovery = true;
1362
1363         for (i=0; i<ctdb->num_nodes; i++) {
1364                 struct ctdb_banning_state *ban_state;
1365
1366                 if (ctdb->nodes[i]->ban_state == NULL) {
1367                         continue;
1368                 }
1369                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
1370                 if (ban_state->count < 2*ctdb->num_nodes) {
1371                         continue;
1372                 }
1373                 DEBUG(DEBUG_NOTICE,("Node %u has caused %u recoveries recently - banning it for %u seconds\n",
1374                         ctdb->nodes[i]->pnn, ban_state->count,
1375                         ctdb->tunable.recovery_ban_period));
1376                 ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
1377                 ban_state->count = 0;
1378         }
1379
1380
1381         if (ctdb->tunable.verify_recovery_lock != 0) {
1382                 DEBUG(DEBUG_ERR,("Taking out recovery lock from recovery daemon\n"));
1383                 start_time = timeval_current();
1384                 if (!ctdb_recovery_lock(ctdb, true)) {
1385                         DEBUG(DEBUG_ERR,("Unable to get recovery lock - aborting recovery "
1386                                          "and ban ourself for %u seconds\n",
1387                                          ctdb->tunable.recovery_ban_period));
1388                         ctdb_ban_node(rec, pnn, ctdb->tunable.recovery_ban_period);
1389                         return -1;
1390                 }
1391                 ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&start_time));
1392                 DEBUG(DEBUG_NOTICE,("Recovery lock taken successfully by recovery daemon\n"));
1393         }
1394
1395         DEBUG(DEBUG_NOTICE, (__location__ " Recovery initiated due to problem with node %u\n", rec->last_culprit_node));
1396
1397         /* get a list of all databases */
1398         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &dbmap);
1399         if (ret != 0) {
1400                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node :%u\n", pnn));
1401                 return -1;
1402         }
1403
1404         /* we do the db creation before we set the recovery mode, so the freeze happens
1405            on all databases we will be dealing with. */
1406
1407         /* verify that we have all the databases any other node has */
1408         ret = create_missing_local_databases(ctdb, nodemap, pnn, &dbmap, mem_ctx);
1409         if (ret != 0) {
1410                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing local databases\n"));
1411                 return -1;
1412         }
1413
1414         /* verify that all other nodes have all our databases */
1415         ret = create_missing_remote_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
1416         if (ret != 0) {
1417                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing remote databases\n"));
1418                 return -1;
1419         }
1420         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - created remote databases\n"));
1421
1422         /* update the database priority for all remote databases */
1423         ret = update_db_priority_on_remote_nodes(ctdb, nodemap, pnn, dbmap, mem_ctx);
1424         if (ret != 0) {
1425                 DEBUG(DEBUG_ERR, (__location__ " Unable to set db priority on remote nodes\n"));
1426         }
1427         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated db priority for all databases\n"));
1428
1429
1430         /* update all other nodes to use the same setting for reclock files
1431            as the local recovery master.
1432         */
1433         sync_recovery_lock_file_across_cluster(rec);
1434
1435         /* set recovery mode to active on all nodes */
1436         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
1437         if (ret != 0) {
1438                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
1439                 return -1;
1440         }
1441
1442         /* execute the "startrecovery" event script on all nodes */
1443         ret = run_startrecovery_eventscript(rec, nodemap);
1444         if (ret!=0) {
1445                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
1446                 return -1;
1447         }
1448
1449         /*
1450           update all nodes to have the same flags that we have
1451          */
1452         for (i=0;i<nodemap->num;i++) {
1453                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1454                         continue;
1455                 }
1456
1457                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
1458                 if (ret != 0) {
1459                         DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
1460                         return -1;
1461                 }
1462         }
1463
1464         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
1465
1466         /* pick a new generation number */
1467         generation = new_generation();
1468
1469         /* change the vnnmap on this node to use the new generation 
1470            number but not on any other nodes.
1471            this guarantees that if we abort the recovery prematurely
1472            for some reason (a node stops responding?)
1473            that we can just return immediately and we will reenter
1474            recovery shortly again.
1475            I.e. we deliberately leave the cluster with an inconsistent
1476            generation id to allow us to abort recovery at any stage and
1477            just restart it from scratch.
1478          */
1479         vnnmap->generation = generation;
1480         ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, vnnmap);
1481         if (ret != 0) {
1482                 DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
1483                 return -1;
1484         }
1485
1486         data.dptr = (void *)&generation;
1487         data.dsize = sizeof(uint32_t);
1488
1489         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
1490         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
1491                                         nodes, 0,
1492                                         CONTROL_TIMEOUT(), false, data,
1493                                         NULL,
1494                                         transaction_start_fail_callback,
1495                                         rec) != 0) {
1496                 DEBUG(DEBUG_ERR, (__location__ " Unable to start transactions. Recovery failed.\n"));
1497                 if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_CANCEL,
1498                                         nodes, 0,
1499                                         CONTROL_TIMEOUT(), false, tdb_null,
1500                                         NULL,
1501                                         NULL,
1502                                         NULL) != 0) {
1503                         DEBUG(DEBUG_ERR,("Failed to cancel recovery transaction\n"));
1504                 }
1505                 return -1;
1506         }
1507
1508         DEBUG(DEBUG_NOTICE,(__location__ " started transactions on all nodes\n"));
1509
1510         for (i=0;i<dbmap->num;i++) {
1511                 ret = recover_database(rec, mem_ctx,
1512                                        dbmap->dbs[i].dbid,
1513                                        dbmap->dbs[i].persistent,
1514                                        pnn, nodemap, generation);
1515                 if (ret != 0) {
1516                         DEBUG(DEBUG_ERR, (__location__ " Failed to recover database 0x%x\n", dbmap->dbs[i].dbid));
1517                         return -1;
1518                 }
1519         }
1520
1521         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - starting database commits\n"));
1522
1523         /* commit all the changes */
1524         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
1525                                         nodes, 0,
1526                                         CONTROL_TIMEOUT(), false, data,
1527                                         NULL, NULL,
1528                                         NULL) != 0) {
1529                 DEBUG(DEBUG_ERR, (__location__ " Unable to commit recovery changes. Recovery failed.\n"));
1530                 return -1;
1531         }
1532
1533         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - committed databases\n"));
1534         
1535
1536         /* update the capabilities for all nodes */
1537         ret = update_capabilities(ctdb, nodemap);
1538         if (ret!=0) {
1539                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
1540                 return -1;
1541         }
1542
1543         /* build a new vnn map with all the currently active and
1544            unbanned nodes */
1545         generation = new_generation();
1546         vnnmap = talloc(mem_ctx, struct ctdb_vnn_map);
1547         CTDB_NO_MEMORY(ctdb, vnnmap);
1548         vnnmap->generation = generation;
1549         vnnmap->size = 0;
1550         vnnmap->map = talloc_zero_array(vnnmap, uint32_t, vnnmap->size);
1551         CTDB_NO_MEMORY(ctdb, vnnmap->map);
1552         for (i=j=0;i<nodemap->num;i++) {
1553                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1554                         continue;
1555                 }
1556                 if (!(ctdb->nodes[i]->capabilities & CTDB_CAP_LMASTER)) {
1557                         /* this node can not be an lmaster */
1558                         DEBUG(DEBUG_DEBUG, ("Node %d cant be a LMASTER, skipping it\n", i));
1559                         continue;
1560                 }
1561
1562                 vnnmap->size++;
1563                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1564                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1565                 vnnmap->map[j++] = nodemap->nodes[i].pnn;
1566
1567         }
1568         if (vnnmap->size == 0) {
1569                 DEBUG(DEBUG_NOTICE, ("No suitable lmasters found. Adding local node (recmaster) anyway.\n"));
1570                 vnnmap->size++;
1571                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1572                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1573                 vnnmap->map[0] = pnn;
1574         }       
1575
1576         /* update to the new vnnmap on all nodes */
1577         ret = update_vnnmap_on_all_nodes(ctdb, nodemap, pnn, vnnmap, mem_ctx);
1578         if (ret != 0) {
1579                 DEBUG(DEBUG_ERR, (__location__ " Unable to update vnnmap on all nodes\n"));
1580                 return -1;
1581         }
1582
1583         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated vnnmap\n"));
1584
1585         /* update recmaster to point to us for all nodes */
1586         ret = set_recovery_master(ctdb, nodemap, pnn);
1587         if (ret!=0) {
1588                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery master\n"));
1589                 return -1;
1590         }
1591
1592         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated recmaster\n"));
1593
1594         /*
1595           update all nodes to have the same flags that we have
1596          */
1597         for (i=0;i<nodemap->num;i++) {
1598                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1599                         continue;
1600                 }
1601
1602                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
1603                 if (ret != 0) {
1604                         DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
1605                         return -1;
1606                 }
1607         }
1608
1609         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
1610
1611         /* disable recovery mode */
1612         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_NORMAL);
1613         if (ret != 0) {
1614                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to normal on cluster\n"));
1615                 return -1;
1616         }
1617
1618         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - disabled recovery mode\n"));
1619
1620         /*
1621           tell nodes to takeover their public IPs
1622          */
1623         ret = ctdb_reload_remote_public_ips(ctdb, rec, nodemap, &culprit);
1624         if (ret != 0) {
1625                 DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
1626                                  culprit));
1627                 return -1;
1628         }
1629         rec->need_takeover_run = false;
1630         ret = ctdb_takeover_run(ctdb, nodemap);
1631         if (ret != 0) {
1632                 DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses\n"));
1633                 return -1;
1634         }
1635         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - takeip finished\n"));
1636
1637         /* execute the "recovered" event script on all nodes */
1638         ret = run_recovered_eventscript(ctdb, nodemap, "do_recovery");
1639         if (ret!=0) {
1640                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Recovery process failed.\n"));
1641                 return -1;
1642         }
1643
1644         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - finished the recovered event\n"));
1645
1646         /* send a message to all clients telling them that the cluster 
1647            has been reconfigured */
1648         ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_RECONFIGURE, tdb_null);
1649
1650         DEBUG(DEBUG_NOTICE, (__location__ " Recovery complete\n"));
1651
1652         rec->need_recovery = false;
1653
1654         /* we managed to complete a full recovery, make sure to forgive
1655            any past sins by the nodes that could now participate in the
1656            recovery.
1657         */
1658         DEBUG(DEBUG_ERR,("Resetting ban count to 0 for all nodes\n"));
1659         for (i=0;i<nodemap->num;i++) {
1660                 struct ctdb_banning_state *ban_state;
1661
1662                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1663                         continue;
1664                 }
1665
1666                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[nodemap->nodes[i].pnn]->ban_state;
1667                 if (ban_state == NULL) {
1668                         continue;
1669                 }
1670
1671                 ban_state->count = 0;
1672         }
1673
1674
1675         /* We just finished a recovery successfully. 
1676            We now wait for rerecovery_timeout before we allow 
1677            another recovery to take place.
1678         */
1679         DEBUG(DEBUG_NOTICE, (__location__ " New recoveries supressed for the rerecovery timeout\n"));
1680         ctdb_wait_timeout(ctdb, ctdb->tunable.rerecovery_timeout);
1681         DEBUG(DEBUG_NOTICE, (__location__ " Rerecovery timeout elapsed. Recovery reactivated.\n"));
1682
1683         return 0;
1684 }
1685
1686
1687 /*
1688   elections are won by first checking the number of connected nodes, then
1689   the priority time, then the pnn
1690  */
1691 struct election_message {
1692         uint32_t num_connected;
1693         struct timeval priority_time;
1694         uint32_t pnn;
1695         uint32_t node_flags;
1696 };
1697
1698 /*
1699   form this nodes election data
1700  */
1701 static void ctdb_election_data(struct ctdb_recoverd *rec, struct election_message *em)
1702 {
1703         int ret, i;
1704         struct ctdb_node_map *nodemap;
1705         struct ctdb_context *ctdb = rec->ctdb;
1706
1707         ZERO_STRUCTP(em);
1708
1709         em->pnn = rec->ctdb->pnn;
1710         em->priority_time = rec->priority_time;
1711
1712         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, rec, &nodemap);
1713         if (ret != 0) {
1714                 DEBUG(DEBUG_ERR,(__location__ " unable to get election data\n"));
1715                 return;
1716         }
1717
1718         rec->node_flags = nodemap->nodes[ctdb->pnn].flags;
1719         em->node_flags = rec->node_flags;
1720
1721         for (i=0;i<nodemap->num;i++) {
1722                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1723                         em->num_connected++;
1724                 }
1725         }
1726
1727         /* we shouldnt try to win this election if we cant be a recmaster */
1728         if ((ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1729                 em->num_connected = 0;
1730                 em->priority_time = timeval_current();
1731         }
1732
1733         talloc_free(nodemap);
1734 }
1735
1736 /*
1737   see if the given election data wins
1738  */
1739 static bool ctdb_election_win(struct ctdb_recoverd *rec, struct election_message *em)
1740 {
1741         struct election_message myem;
1742         int cmp = 0;
1743
1744         ctdb_election_data(rec, &myem);
1745
1746         /* we cant win if we dont have the recmaster capability */
1747         if ((rec->ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1748                 return false;
1749         }
1750
1751         /* we cant win if we are banned */
1752         if (rec->node_flags & NODE_FLAGS_BANNED) {
1753                 return false;
1754         }       
1755
1756         /* we cant win if we are stopped */
1757         if (rec->node_flags & NODE_FLAGS_STOPPED) {
1758                 return false;
1759         }       
1760
1761         /* we will automatically win if the other node is banned */
1762         if (em->node_flags & NODE_FLAGS_BANNED) {
1763                 return true;
1764         }
1765
1766         /* we will automatically win if the other node is banned */
1767         if (em->node_flags & NODE_FLAGS_STOPPED) {
1768                 return true;
1769         }
1770
1771         /* try to use the most connected node */
1772         if (cmp == 0) {
1773                 cmp = (int)myem.num_connected - (int)em->num_connected;
1774         }
1775
1776         /* then the longest running node */
1777         if (cmp == 0) {
1778                 cmp = timeval_compare(&em->priority_time, &myem.priority_time);
1779         }
1780
1781         if (cmp == 0) {
1782                 cmp = (int)myem.pnn - (int)em->pnn;
1783         }
1784
1785         return cmp > 0;
1786 }
1787
1788 /*
1789   send out an election request
1790  */
1791 static int send_election_request(struct ctdb_recoverd *rec, uint32_t pnn, bool update_recmaster)
1792 {
1793         int ret;
1794         TDB_DATA election_data;
1795         struct election_message emsg;
1796         uint64_t srvid;
1797         struct ctdb_context *ctdb = rec->ctdb;
1798
1799         srvid = CTDB_SRVID_RECOVERY;
1800
1801         ctdb_election_data(rec, &emsg);
1802
1803         election_data.dsize = sizeof(struct election_message);
1804         election_data.dptr  = (unsigned char *)&emsg;
1805
1806
1807         /* send an election message to all active nodes */
1808         DEBUG(DEBUG_INFO,(__location__ " Send election request to all active nodes\n"));
1809         ctdb_client_send_message(ctdb, CTDB_BROADCAST_ALL, srvid, election_data);
1810
1811
1812         /* A new node that is already frozen has entered the cluster.
1813            The existing nodes are not frozen and dont need to be frozen
1814            until the election has ended and we start the actual recovery
1815         */
1816         if (update_recmaster == true) {
1817                 /* first we assume we will win the election and set 
1818                    recoverymaster to be ourself on the current node
1819                  */
1820                 ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), pnn, pnn);
1821                 if (ret != 0) {
1822                         DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request\n"));
1823                         return -1;
1824                 }
1825         }
1826
1827
1828         return 0;
1829 }
1830
1831 /*
1832   this function will unban all nodes in the cluster
1833 */
1834 static void unban_all_nodes(struct ctdb_context *ctdb)
1835 {
1836         int ret, i;
1837         struct ctdb_node_map *nodemap;
1838         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1839         
1840         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1841         if (ret != 0) {
1842                 DEBUG(DEBUG_ERR,(__location__ " failed to get nodemap to unban all nodes\n"));
1843                 return;
1844         }
1845
1846         for (i=0;i<nodemap->num;i++) {
1847                 if ( (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED))
1848                   && (nodemap->nodes[i].flags & NODE_FLAGS_BANNED) ) {
1849                         ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[i].pnn, 0, NODE_FLAGS_BANNED);
1850                 }
1851         }
1852
1853         talloc_free(tmp_ctx);
1854 }
1855
1856
1857 /*
1858   we think we are winning the election - send a broadcast election request
1859  */
1860 static void election_send_request(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
1861 {
1862         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1863         int ret;
1864
1865         ret = send_election_request(rec, ctdb_get_pnn(rec->ctdb), false);
1866         if (ret != 0) {
1867                 DEBUG(DEBUG_ERR,("Failed to send election request!\n"));
1868         }
1869
1870         talloc_free(rec->send_election_te);
1871         rec->send_election_te = NULL;
1872 }
1873
1874 /*
1875   handler for memory dumps
1876 */
1877 static void mem_dump_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1878                              TDB_DATA data, void *private_data)
1879 {
1880         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1881         TDB_DATA *dump;
1882         int ret;
1883         struct rd_memdump_reply *rd;
1884
1885         if (data.dsize != sizeof(struct rd_memdump_reply)) {
1886                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
1887                 talloc_free(tmp_ctx);
1888                 return;
1889         }
1890         rd = (struct rd_memdump_reply *)data.dptr;
1891
1892         dump = talloc_zero(tmp_ctx, TDB_DATA);
1893         if (dump == NULL) {
1894                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate memory for memdump\n"));
1895                 talloc_free(tmp_ctx);
1896                 return;
1897         }
1898         ret = ctdb_dump_memory(ctdb, dump);
1899         if (ret != 0) {
1900                 DEBUG(DEBUG_ERR, (__location__ " ctdb_dump_memory() failed\n"));
1901                 talloc_free(tmp_ctx);
1902                 return;
1903         }
1904
1905 DEBUG(DEBUG_ERR, ("recovery master memory dump\n"));            
1906
1907         ret = ctdb_client_send_message(ctdb, rd->pnn, rd->srvid, *dump);
1908         if (ret != 0) {
1909                 DEBUG(DEBUG_ERR,("Failed to send rd memdump reply message\n"));
1910                 talloc_free(tmp_ctx);
1911                 return;
1912         }
1913
1914         talloc_free(tmp_ctx);
1915 }
1916
1917 /*
1918   handler for reload_nodes
1919 */
1920 static void reload_nodes_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1921                              TDB_DATA data, void *private_data)
1922 {
1923         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1924
1925         DEBUG(DEBUG_ERR, (__location__ " Reload nodes file from recovery daemon\n"));
1926
1927         reload_nodes_file(rec->ctdb);
1928 }
1929
1930
1931 static void reenable_ip_check(struct event_context *ev, struct timed_event *te, 
1932                               struct timeval yt, void *p)
1933 {
1934         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1935
1936         talloc_free(rec->ip_check_disable_ctx);
1937         rec->ip_check_disable_ctx = NULL;
1938 }
1939
1940
1941 static void recd_update_ip_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1942                              TDB_DATA data, void *private_data)
1943 {
1944         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1945         struct ctdb_public_ip *ip;
1946
1947         if (rec->recmaster != rec->ctdb->pnn) {
1948                 DEBUG(DEBUG_INFO,("Not recmaster, ignore update ip message\n"));
1949                 return;
1950         }
1951
1952         if (data.dsize != sizeof(struct ctdb_public_ip)) {
1953                 DEBUG(DEBUG_ERR,(__location__ " Incorrect size of recd update ip message. Was %zd but expected %zd bytes\n", data.dsize, sizeof(struct ctdb_public_ip)));
1954                 return;
1955         }
1956
1957         ip = (struct ctdb_public_ip *)data.dptr;
1958
1959         update_ip_assignment_tree(rec->ctdb, ip);
1960 }
1961
1962
1963 static void disable_ip_check_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1964                              TDB_DATA data, void *private_data)
1965 {
1966         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1967         uint32_t timeout;
1968
1969         if (rec->ip_check_disable_ctx != NULL) {
1970                 talloc_free(rec->ip_check_disable_ctx);
1971                 rec->ip_check_disable_ctx = NULL;
1972         }
1973
1974         if (data.dsize != sizeof(uint32_t)) {
1975                 DEBUG(DEBUG_ERR,(__location__ " Wrong size for data :%lu "
1976                                  "expexting %lu\n", (long unsigned)data.dsize,
1977                                  (long unsigned)sizeof(uint32_t)));
1978                 return;
1979         }
1980         if (data.dptr == NULL) {
1981                 DEBUG(DEBUG_ERR,(__location__ " No data recaived\n"));
1982                 return;
1983         }
1984
1985         timeout = *((uint32_t *)data.dptr);
1986         DEBUG(DEBUG_NOTICE,("Disabling ip check for %u seconds\n", timeout));
1987
1988         rec->ip_check_disable_ctx = talloc_new(rec);
1989         CTDB_NO_MEMORY_VOID(ctdb, rec->ip_check_disable_ctx);
1990
1991         event_add_timed(ctdb->ev, rec->ip_check_disable_ctx, timeval_current_ofs(timeout, 0), reenable_ip_check, rec);
1992 }
1993
1994
1995 /*
1996   handler for ip reallocate, just add it to the list of callers and 
1997   handle this later in the monitor_cluster loop so we do not recurse
1998   with other callers to takeover_run()
1999 */
2000 static void ip_reallocate_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2001                              TDB_DATA data, void *private_data)
2002 {
2003         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2004         struct ip_reallocate_list *caller;
2005
2006         if (data.dsize != sizeof(struct rd_memdump_reply)) {
2007                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
2008                 return;
2009         }
2010
2011         if (rec->ip_reallocate_ctx == NULL) {
2012                 rec->ip_reallocate_ctx = talloc_new(rec);
2013                 CTDB_NO_MEMORY_FATAL(ctdb, rec->ip_reallocate_ctx);
2014         }
2015
2016         caller = talloc(rec->ip_reallocate_ctx, struct ip_reallocate_list);
2017         CTDB_NO_MEMORY_FATAL(ctdb, caller);
2018
2019         caller->rd   = (struct rd_memdump_reply *)talloc_steal(caller, data.dptr);
2020         caller->next = rec->reallocate_callers;
2021         rec->reallocate_callers = caller;
2022
2023         return;
2024 }
2025
2026 static void process_ipreallocate_requests(struct ctdb_context *ctdb, struct ctdb_recoverd *rec)
2027 {
2028         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2029         TDB_DATA result;
2030         int32_t ret;
2031         struct ip_reallocate_list *callers;
2032         uint32_t culprit;
2033
2034         DEBUG(DEBUG_INFO, ("recovery master forced ip reallocation\n"));
2035
2036         /* update the list of public ips that a node can handle for
2037            all connected nodes
2038         */
2039         ret = ctdb_reload_remote_public_ips(ctdb, rec, rec->nodemap, &culprit);
2040         if (ret != 0) {
2041                 DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
2042                                  culprit));
2043                 rec->need_takeover_run = true;
2044         }
2045         if (ret == 0) {
2046                 ret = ctdb_takeover_run(ctdb, rec->nodemap);
2047                 if (ret != 0) {
2048                         DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
2049                                          culprit));
2050                         rec->need_takeover_run = true;
2051                 }
2052         }
2053
2054         result.dsize = sizeof(int32_t);
2055         result.dptr  = (uint8_t *)&ret;
2056
2057         for (callers=rec->reallocate_callers; callers; callers=callers->next) {
2058
2059                 /* Someone that sent srvid==0 does not want a reply */
2060                 if (callers->rd->srvid == 0) {
2061                         continue;
2062                 }
2063                 DEBUG(DEBUG_INFO,("Sending ip reallocate reply message to "
2064                                   "%u:%llu\n", (unsigned)callers->rd->pnn,
2065                                   (unsigned long long)callers->rd->srvid));
2066                 ret = ctdb_client_send_message(ctdb, callers->rd->pnn, callers->rd->srvid, result);
2067                 if (ret != 0) {
2068                         DEBUG(DEBUG_ERR,("Failed to send ip reallocate reply "
2069                                          "message to %u:%llu\n",
2070                                          (unsigned)callers->rd->pnn,
2071                                          (unsigned long long)callers->rd->srvid));
2072                 }
2073         }
2074
2075         talloc_free(tmp_ctx);
2076         talloc_free(rec->ip_reallocate_ctx);
2077         rec->ip_reallocate_ctx = NULL;
2078         rec->reallocate_callers = NULL;
2079         
2080 }
2081
2082
2083 /*
2084   handler for recovery master elections
2085 */
2086 static void election_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2087                              TDB_DATA data, void *private_data)
2088 {
2089         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2090         int ret;
2091         struct election_message *em = (struct election_message *)data.dptr;
2092         TALLOC_CTX *mem_ctx;
2093
2094         /* we got an election packet - update the timeout for the election */
2095         talloc_free(rec->election_timeout);
2096         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
2097                                                 fast_start ?
2098                                                 timeval_current_ofs(0, 500000) :
2099                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
2100                                                 ctdb_election_timeout, rec);
2101
2102         mem_ctx = talloc_new(ctdb);
2103
2104         /* someone called an election. check their election data
2105            and if we disagree and we would rather be the elected node, 
2106            send a new election message to all other nodes
2107          */
2108         if (ctdb_election_win(rec, em)) {
2109                 if (!rec->send_election_te) {
2110                         rec->send_election_te = event_add_timed(ctdb->ev, rec, 
2111                                                                 timeval_current_ofs(0, 500000),
2112                                                                 election_send_request, rec);
2113                 }
2114                 talloc_free(mem_ctx);
2115                 /*unban_all_nodes(ctdb);*/
2116                 return;
2117         }
2118         
2119         /* we didn't win */
2120         talloc_free(rec->send_election_te);
2121         rec->send_election_te = NULL;
2122
2123         if (ctdb->tunable.verify_recovery_lock != 0) {
2124                 /* release the recmaster lock */
2125                 if (em->pnn != ctdb->pnn &&
2126                     ctdb->recovery_lock_fd != -1) {
2127                         close(ctdb->recovery_lock_fd);
2128                         ctdb->recovery_lock_fd = -1;
2129                         unban_all_nodes(ctdb);
2130                 }
2131         }
2132
2133         /* ok, let that guy become recmaster then */
2134         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), ctdb_get_pnn(ctdb), em->pnn);
2135         if (ret != 0) {
2136                 DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request"));
2137                 talloc_free(mem_ctx);
2138                 return;
2139         }
2140
2141         talloc_free(mem_ctx);
2142         return;
2143 }
2144
2145
2146 /*
2147   force the start of the election process
2148  */
2149 static void force_election(struct ctdb_recoverd *rec, uint32_t pnn, 
2150                            struct ctdb_node_map *nodemap)
2151 {
2152         int ret;
2153         struct ctdb_context *ctdb = rec->ctdb;
2154
2155         DEBUG(DEBUG_INFO,(__location__ " Force an election\n"));
2156
2157         /* set all nodes to recovery mode to stop all internode traffic */
2158         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
2159         if (ret != 0) {
2160                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
2161                 return;
2162         }
2163
2164         talloc_free(rec->election_timeout);
2165         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
2166                                                 fast_start ?
2167                                                 timeval_current_ofs(0, 500000) :
2168                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
2169                                                 ctdb_election_timeout, rec);
2170
2171         ret = send_election_request(rec, pnn, true);
2172         if (ret!=0) {
2173                 DEBUG(DEBUG_ERR, (__location__ " failed to initiate recmaster election"));
2174                 return;
2175         }
2176
2177         /* wait for a few seconds to collect all responses */
2178         ctdb_wait_election(rec);
2179 }
2180
2181
2182
2183 /*
2184   handler for when a node changes its flags
2185 */
2186 static void monitor_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2187                             TDB_DATA data, void *private_data)
2188 {
2189         int ret;
2190         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2191         struct ctdb_node_map *nodemap=NULL;
2192         TALLOC_CTX *tmp_ctx;
2193         uint32_t changed_flags;
2194         int i;
2195         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2196         int disabled_flag_changed;
2197
2198         if (data.dsize != sizeof(*c)) {
2199                 DEBUG(DEBUG_ERR,(__location__ "Invalid data in ctdb_node_flag_change\n"));
2200                 return;
2201         }
2202
2203         tmp_ctx = talloc_new(ctdb);
2204         CTDB_NO_MEMORY_VOID(ctdb, tmp_ctx);
2205
2206         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
2207         if (ret != 0) {
2208                 DEBUG(DEBUG_ERR,(__location__ "ctdb_ctrl_getnodemap failed in monitor_handler\n"));
2209                 talloc_free(tmp_ctx);
2210                 return;         
2211         }
2212
2213
2214         for (i=0;i<nodemap->num;i++) {
2215                 if (nodemap->nodes[i].pnn == c->pnn) break;
2216         }
2217
2218         if (i == nodemap->num) {
2219                 DEBUG(DEBUG_CRIT,(__location__ "Flag change for non-existant node %u\n", c->pnn));
2220                 talloc_free(tmp_ctx);
2221                 return;
2222         }
2223
2224         changed_flags = c->old_flags ^ c->new_flags;
2225
2226         if (nodemap->nodes[i].flags != c->new_flags) {
2227                 DEBUG(DEBUG_NOTICE,("Node %u has changed flags - now 0x%x  was 0x%x\n", c->pnn, c->new_flags, c->old_flags));
2228         }
2229
2230         disabled_flag_changed =  (nodemap->nodes[i].flags ^ c->new_flags) & NODE_FLAGS_DISABLED;
2231
2232         nodemap->nodes[i].flags = c->new_flags;
2233
2234         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
2235                                      CTDB_CURRENT_NODE, &ctdb->recovery_master);
2236
2237         if (ret == 0) {
2238                 ret = ctdb_ctrl_getrecmode(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
2239                                            CTDB_CURRENT_NODE, &ctdb->recovery_mode);
2240         }
2241         
2242         if (ret == 0 &&
2243             ctdb->recovery_master == ctdb->pnn &&
2244             ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
2245                 /* Only do the takeover run if the perm disabled or unhealthy
2246                    flags changed since these will cause an ip failover but not
2247                    a recovery.
2248                    If the node became disconnected or banned this will also
2249                    lead to an ip address failover but that is handled 
2250                    during recovery
2251                 */
2252                 if (disabled_flag_changed) {
2253                         rec->need_takeover_run = true;
2254                 }
2255         }
2256
2257         talloc_free(tmp_ctx);
2258 }
2259
2260 /*
2261   handler for when we need to push out flag changes ot all other nodes
2262 */
2263 static void push_flags_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2264                             TDB_DATA data, void *private_data)
2265 {
2266         int ret;
2267         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2268         struct ctdb_node_map *nodemap=NULL;
2269         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2270         uint32_t recmaster;
2271         uint32_t *nodes;
2272
2273         /* find the recovery master */
2274         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &recmaster);
2275         if (ret != 0) {
2276                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
2277                 talloc_free(tmp_ctx);
2278                 return;
2279         }
2280
2281         /* read the node flags from the recmaster */
2282         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), recmaster, tmp_ctx, &nodemap);
2283         if (ret != 0) {
2284                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", c->pnn));
2285                 talloc_free(tmp_ctx);
2286                 return;
2287         }
2288         if (c->pnn >= nodemap->num) {
2289                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", c->pnn));
2290                 talloc_free(tmp_ctx);
2291                 return;
2292         }
2293
2294         /* send the flags update to all connected nodes */
2295         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2296
2297         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
2298                                       nodes, 0, CONTROL_TIMEOUT(),
2299                                       false, data,
2300                                       NULL, NULL,
2301                                       NULL) != 0) {
2302                 DEBUG(DEBUG_ERR, (__location__ " ctdb_control to modify node flags failed\n"));
2303
2304                 talloc_free(tmp_ctx);
2305                 return;
2306         }
2307
2308         talloc_free(tmp_ctx);
2309 }
2310
2311
2312 struct verify_recmode_normal_data {
2313         uint32_t count;
2314         enum monitor_result status;
2315 };
2316
2317 static void verify_recmode_normal_callback(struct ctdb_client_control_state *state)
2318 {
2319         struct verify_recmode_normal_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmode_normal_data);
2320
2321
2322         /* one more node has responded with recmode data*/
2323         rmdata->count--;
2324
2325         /* if we failed to get the recmode, then return an error and let
2326            the main loop try again.
2327         */
2328         if (state->state != CTDB_CONTROL_DONE) {
2329                 if (rmdata->status == MONITOR_OK) {
2330                         rmdata->status = MONITOR_FAILED;
2331                 }
2332                 return;
2333         }
2334
2335         /* if we got a response, then the recmode will be stored in the
2336            status field
2337         */
2338         if (state->status != CTDB_RECOVERY_NORMAL) {
2339                 DEBUG(DEBUG_NOTICE, (__location__ " Node:%u was in recovery mode. Restart recovery process\n", state->c->hdr.destnode));
2340                 rmdata->status = MONITOR_RECOVERY_NEEDED;
2341         }
2342
2343         return;
2344 }
2345
2346
2347 /* verify that all nodes are in normal recovery mode */
2348 static enum monitor_result verify_recmode(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
2349 {
2350         struct verify_recmode_normal_data *rmdata;
2351         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2352         struct ctdb_client_control_state *state;
2353         enum monitor_result status;
2354         int j;
2355         
2356         rmdata = talloc(mem_ctx, struct verify_recmode_normal_data);
2357         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2358         rmdata->count  = 0;
2359         rmdata->status = MONITOR_OK;
2360
2361         /* loop over all active nodes and send an async getrecmode call to 
2362            them*/
2363         for (j=0; j<nodemap->num; j++) {
2364                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2365                         continue;
2366                 }
2367                 state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, 
2368                                         CONTROL_TIMEOUT(), 
2369                                         nodemap->nodes[j].pnn);
2370                 if (state == NULL) {
2371                         /* we failed to send the control, treat this as 
2372                            an error and try again next iteration
2373                         */                      
2374                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmode_send during monitoring\n"));
2375                         talloc_free(mem_ctx);
2376                         return MONITOR_FAILED;
2377                 }
2378
2379                 /* set up the callback functions */
2380                 state->async.fn = verify_recmode_normal_callback;
2381                 state->async.private_data = rmdata;
2382
2383                 /* one more control to wait for to complete */
2384                 rmdata->count++;
2385         }
2386
2387
2388         /* now wait for up to the maximum number of seconds allowed
2389            or until all nodes we expect a response from has replied
2390         */
2391         while (rmdata->count > 0) {
2392                 event_loop_once(ctdb->ev);
2393         }
2394
2395         status = rmdata->status;
2396         talloc_free(mem_ctx);
2397         return status;
2398 }
2399
2400
2401 struct verify_recmaster_data {
2402         struct ctdb_recoverd *rec;
2403         uint32_t count;
2404         uint32_t pnn;
2405         enum monitor_result status;
2406 };
2407
2408 static void verify_recmaster_callback(struct ctdb_client_control_state *state)
2409 {
2410         struct verify_recmaster_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmaster_data);
2411
2412
2413         /* one more node has responded with recmaster data*/
2414         rmdata->count--;
2415
2416         /* if we failed to get the recmaster, then return an error and let
2417            the main loop try again.
2418         */
2419         if (state->state != CTDB_CONTROL_DONE) {
2420                 if (rmdata->status == MONITOR_OK) {
2421                         rmdata->status = MONITOR_FAILED;
2422                 }
2423                 return;
2424         }
2425
2426         /* if we got a response, then the recmaster will be stored in the
2427            status field
2428         */
2429         if (state->status != rmdata->pnn) {
2430                 DEBUG(DEBUG_ERR,("Node %d does not agree we are the recmaster. Need a new recmaster election\n", state->c->hdr.destnode));
2431                 ctdb_set_culprit(rmdata->rec, state->c->hdr.destnode);
2432                 rmdata->status = MONITOR_ELECTION_NEEDED;
2433         }
2434
2435         return;
2436 }
2437
2438
2439 /* verify that all nodes agree that we are the recmaster */
2440 static enum monitor_result verify_recmaster(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t pnn)
2441 {
2442         struct ctdb_context *ctdb = rec->ctdb;
2443         struct verify_recmaster_data *rmdata;
2444         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2445         struct ctdb_client_control_state *state;
2446         enum monitor_result status;
2447         int j;
2448         
2449         rmdata = talloc(mem_ctx, struct verify_recmaster_data);
2450         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2451         rmdata->rec    = rec;
2452         rmdata->count  = 0;
2453         rmdata->pnn    = pnn;
2454         rmdata->status = MONITOR_OK;
2455
2456         /* loop over all active nodes and send an async getrecmaster call to 
2457            them*/
2458         for (j=0; j<nodemap->num; j++) {
2459                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2460                         continue;
2461                 }
2462                 state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, 
2463                                         CONTROL_TIMEOUT(),
2464                                         nodemap->nodes[j].pnn);
2465                 if (state == NULL) {
2466                         /* we failed to send the control, treat this as 
2467                            an error and try again next iteration
2468                         */                      
2469                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmaster_send during monitoring\n"));
2470                         talloc_free(mem_ctx);
2471                         return MONITOR_FAILED;
2472                 }
2473
2474                 /* set up the callback functions */
2475                 state->async.fn = verify_recmaster_callback;
2476                 state->async.private_data = rmdata;
2477
2478                 /* one more control to wait for to complete */
2479                 rmdata->count++;
2480         }
2481
2482
2483         /* now wait for up to the maximum number of seconds allowed
2484            or until all nodes we expect a response from has replied
2485         */
2486         while (rmdata->count > 0) {
2487                 event_loop_once(ctdb->ev);
2488         }
2489
2490         status = rmdata->status;
2491         talloc_free(mem_ctx);
2492         return status;
2493 }
2494
2495
2496 /* called to check that the local allocation of public ip addresses is ok.
2497 */
2498 static int verify_local_ip_allocation(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, uint32_t pnn)
2499 {
2500         TALLOC_CTX *mem_ctx = talloc_new(NULL);
2501         struct ctdb_control_get_ifaces *ifaces = NULL;
2502         struct ctdb_all_public_ips *ips = NULL;
2503         struct ctdb_uptime *uptime1 = NULL;
2504         struct ctdb_uptime *uptime2 = NULL;
2505         int ret, j;
2506         bool need_iface_check = false;
2507         bool need_takeover_run = false;
2508
2509         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
2510                                 CTDB_CURRENT_NODE, &uptime1);
2511         if (ret != 0) {
2512                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
2513                 talloc_free(mem_ctx);
2514                 return -1;
2515         }
2516
2517
2518         /* read the interfaces from the local node */
2519         ret = ctdb_ctrl_get_ifaces(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, &ifaces);
2520         if (ret != 0) {
2521                 DEBUG(DEBUG_ERR, ("Unable to get interfaces from local node %u\n", pnn));
2522                 talloc_free(mem_ctx);
2523                 return -1;
2524         }
2525
2526         if (!rec->ifaces) {
2527                 need_iface_check = true;
2528         } else if (rec->ifaces->num != ifaces->num) {
2529                 need_iface_check = true;
2530         } else if (memcmp(rec->ifaces, ifaces, talloc_get_size(ifaces)) != 0) {
2531                 need_iface_check = true;
2532         }
2533
2534         if (need_iface_check) {
2535                 DEBUG(DEBUG_NOTICE, ("The interfaces status has changed on "
2536                                      "local node %u - force takeover run\n",
2537                                      pnn));
2538                 need_takeover_run = true;
2539         }
2540
2541         /* read the ip allocation from the local node */
2542         ret = ctdb_ctrl_get_public_ips(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, &ips);
2543         if (ret != 0) {
2544                 DEBUG(DEBUG_ERR, ("Unable to get public ips from local node %u\n", pnn));
2545                 talloc_free(mem_ctx);
2546                 return -1;
2547         }
2548
2549         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
2550                                 CTDB_CURRENT_NODE, &uptime2);
2551         if (ret != 0) {
2552                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
2553                 talloc_free(mem_ctx);
2554                 return -1;
2555         }
2556
2557         /* skip the check if the startrecovery time has changed */
2558         if (timeval_compare(&uptime1->last_recovery_started,
2559                             &uptime2->last_recovery_started) != 0) {
2560                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
2561                 talloc_free(mem_ctx);
2562                 return 0;
2563         }
2564
2565         /* skip the check if the endrecovery time has changed */
2566         if (timeval_compare(&uptime1->last_recovery_finished,
2567                             &uptime2->last_recovery_finished) != 0) {
2568                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
2569                 talloc_free(mem_ctx);
2570                 return 0;
2571         }
2572
2573         /* skip the check if we have started but not finished recovery */
2574         if (timeval_compare(&uptime1->last_recovery_finished,
2575                             &uptime1->last_recovery_started) != 1) {
2576                 DEBUG(DEBUG_NOTICE, (__location__ " in the middle of recovery or ip reallocation. skipping public ip address check\n"));
2577                 talloc_free(mem_ctx);
2578
2579                 return 0;
2580         }
2581
2582         talloc_free(rec->ifaces);
2583         rec->ifaces = talloc_steal(rec, ifaces);
2584
2585         /* verify that we have the ip addresses we should have
2586            and we dont have ones we shouldnt have.
2587            if we find an inconsistency we set recmode to
2588            active on the local node and wait for the recmaster
2589            to do a full blown recovery
2590         */
2591         for (j=0; j<ips->num; j++) {
2592                 if (ips->ips[j].pnn == pnn) {
2593                         if (!ctdb_sys_have_ip(&ips->ips[j].addr)) {
2594                                 DEBUG(DEBUG_CRIT,("Public address '%s' is missing and we should serve this ip\n",
2595                                         ctdb_addr_to_str(&ips->ips[j].addr)));
2596                                 need_takeover_run = true;
2597                         }
2598                 } else {
2599                         if (ctdb_sys_have_ip(&ips->ips[j].addr)) {
2600                                 DEBUG(DEBUG_CRIT,("We are still serving a public address '%s' that we should not be serving.\n", 
2601                                         ctdb_addr_to_str(&ips->ips[j].addr)));
2602                                 need_takeover_run = true;
2603                         }
2604                 }
2605         }
2606
2607         if (need_takeover_run) {
2608                 struct takeover_run_reply rd;
2609                 TDB_DATA data;
2610
2611                 DEBUG(DEBUG_CRIT,("Trigger takeoverrun\n"));
2612
2613                 rd.pnn = ctdb->pnn;
2614                 rd.srvid = 0;
2615                 data.dptr = (uint8_t *)&rd;
2616                 data.dsize = sizeof(rd);
2617
2618                 ret = ctdb_client_send_message(ctdb, rec->recmaster, CTDB_SRVID_TAKEOVER_RUN, data);
2619                 if (ret != 0) {
2620                         DEBUG(DEBUG_ERR,(__location__ " Failed to send ipreallocate to recmaster :%d\n", (int)rec->recmaster));
2621                 }
2622         }
2623         talloc_free(mem_ctx);
2624         return 0;
2625 }
2626
2627
2628 static void async_getnodemap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
2629 {
2630         struct ctdb_node_map **remote_nodemaps = callback_data;
2631
2632         if (node_pnn >= ctdb->num_nodes) {
2633                 DEBUG(DEBUG_ERR,(__location__ " pnn from invalid node\n"));
2634                 return;
2635         }
2636
2637         remote_nodemaps[node_pnn] = (struct ctdb_node_map *)talloc_steal(remote_nodemaps, outdata.dptr);
2638
2639 }
2640
2641 static int get_remote_nodemaps(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx,
2642         struct ctdb_node_map *nodemap,
2643         struct ctdb_node_map **remote_nodemaps)
2644 {
2645         uint32_t *nodes;
2646
2647         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
2648         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_NODEMAP,
2649                                         nodes, 0,
2650                                         CONTROL_TIMEOUT(), false, tdb_null,
2651                                         async_getnodemap_callback,
2652                                         NULL,
2653                                         remote_nodemaps) != 0) {
2654                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull all remote nodemaps\n"));
2655
2656                 return -1;
2657         }
2658
2659         return 0;
2660 }
2661
2662 enum reclock_child_status { RECLOCK_CHECKING, RECLOCK_OK, RECLOCK_FAILED, RECLOCK_TIMEOUT};
2663 struct ctdb_check_reclock_state {
2664         struct ctdb_context *ctdb;
2665         struct timeval start_time;
2666         int fd[2];
2667         pid_t child;
2668         struct timed_event *te;
2669         struct fd_event *fde;
2670         enum reclock_child_status status;
2671 };
2672
2673 /* when we free the reclock state we must kill any child process.
2674 */
2675 static int check_reclock_destructor(struct ctdb_check_reclock_state *state)
2676 {
2677         struct ctdb_context *ctdb = state->ctdb;
2678
2679         ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&state->start_time));
2680
2681         if (state->fd[0] != -1) {
2682                 close(state->fd[0]);
2683                 state->fd[0] = -1;
2684         }
2685         if (state->fd[1] != -1) {
2686                 close(state->fd[1]);
2687                 state->fd[1] = -1;
2688         }
2689         kill(state->child, SIGKILL);
2690         return 0;
2691 }
2692
2693 /*
2694   called if our check_reclock child times out. this would happen if
2695   i/o to the reclock file blocks.
2696  */
2697 static void ctdb_check_reclock_timeout(struct event_context *ev, struct timed_event *te, 
2698                                          struct timeval t, void *private_data)
2699 {
2700         struct ctdb_check_reclock_state *state = talloc_get_type(private_data, 
2701                                            struct ctdb_check_reclock_state);
2702
2703         DEBUG(DEBUG_ERR,(__location__ " check_reclock child process hung/timedout CFS slow to grant locks?\n"));
2704         state->status = RECLOCK_TIMEOUT;
2705 }
2706
2707 /* this is called when the child process has completed checking the reclock
2708    file and has written data back to us through the pipe.
2709 */
2710 static void reclock_child_handler(struct event_context *ev, struct fd_event *fde, 
2711                              uint16_t flags, void *private_data)
2712 {
2713         struct ctdb_check_reclock_state *state= talloc_get_type(private_data, 
2714                                              struct ctdb_check_reclock_state);
2715         char c = 0;
2716         int ret;
2717
2718         /* we got a response from our child process so we can abort the
2719            timeout.
2720         */
2721         talloc_free(state->te);
2722         state->te = NULL;
2723
2724         ret = read(state->fd[0], &c, 1);
2725         if (ret != 1 || c != RECLOCK_OK) {
2726                 DEBUG(DEBUG_ERR,(__location__ " reclock child process returned error %d\n", c));
2727                 state->status = RECLOCK_FAILED;
2728
2729                 return;
2730         }
2731
2732         state->status = RECLOCK_OK;
2733         return;
2734 }
2735
2736 static int check_recovery_lock(struct ctdb_context *ctdb)
2737 {
2738         int ret;
2739         struct ctdb_check_reclock_state *state;
2740         pid_t parent = getpid();
2741
2742         if (ctdb->recovery_lock_fd == -1) {
2743                 DEBUG(DEBUG_CRIT,("recovery master doesn't have the recovery lock\n"));
2744                 return -1;
2745         }
2746
2747         state = talloc(ctdb, struct ctdb_check_reclock_state);
2748         CTDB_NO_MEMORY(ctdb, state);
2749
2750         state->ctdb = ctdb;
2751         state->start_time = timeval_current();
2752         state->status = RECLOCK_CHECKING;
2753         state->fd[0] = -1;
2754         state->fd[1] = -1;
2755
2756         ret = pipe(state->fd);
2757         if (ret != 0) {
2758                 talloc_free(state);
2759                 DEBUG(DEBUG_CRIT,(__location__ " Failed to open pipe for check_reclock child\n"));
2760                 return -1;
2761         }
2762
2763         state->child = fork();
2764         if (state->child == (pid_t)-1) {
2765                 DEBUG(DEBUG_CRIT,(__location__ " fork() failed in check_reclock child\n"));
2766                 close(state->fd[0]);
2767                 state->fd[0] = -1;
2768                 close(state->fd[1]);
2769                 state->fd[1] = -1;
2770                 talloc_free(state);
2771                 return -1;
2772         }
2773
2774         if (state->child == 0) {
2775                 char cc = RECLOCK_OK;
2776                 close(state->fd[0]);
2777                 state->fd[0] = -1;
2778
2779                 debug_extra = talloc_asprintf(NULL, "recovery-lock:");
2780                 if (pread(ctdb->recovery_lock_fd, &cc, 1, 0) == -1) {
2781                         DEBUG(DEBUG_CRIT,("failed read from recovery_lock_fd - %s\n", strerror(errno)));
2782                         cc = RECLOCK_FAILED;
2783                 }
2784
2785                 write(state->fd[1], &cc, 1);
2786                 /* make sure we die when our parent dies */
2787                 while (kill(parent, 0) == 0 || errno != ESRCH) {
2788                         sleep(5);
2789                         write(state->fd[1], &cc, 1);
2790                 }
2791                 _exit(0);
2792         }
2793         close(state->fd[1]);
2794         state->fd[1] = -1;
2795         set_close_on_exec(state->fd[0]);
2796
2797         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d for check_recovery_lock\n", state->fd[0]));
2798
2799         talloc_set_destructor(state, check_reclock_destructor);
2800
2801         state->te = event_add_timed(ctdb->ev, state, timeval_current_ofs(15, 0),
2802                                     ctdb_check_reclock_timeout, state);
2803         if (state->te == NULL) {
2804                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create a timed event for reclock child\n"));
2805                 talloc_free(state);
2806                 return -1;
2807         }
2808
2809         state->fde = event_add_fd(ctdb->ev, state, state->fd[0],
2810                                 EVENT_FD_READ,
2811                                 reclock_child_handler,
2812                                 (void *)state);
2813
2814         if (state->fde == NULL) {
2815                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create an fd event for reclock child\n"));
2816                 talloc_free(state);
2817                 return -1;
2818         }
2819         tevent_fd_set_auto_close(state->fde);
2820
2821         while (state->status == RECLOCK_CHECKING) {
2822                 event_loop_once(ctdb->ev);
2823         }
2824
2825         if (state->status == RECLOCK_FAILED) {
2826                 DEBUG(DEBUG_ERR,(__location__ " reclock child failed when checking file\n"));
2827                 close(ctdb->recovery_lock_fd);
2828                 ctdb->recovery_lock_fd = -1;
2829                 talloc_free(state);
2830                 return -1;
2831         }
2832
2833         talloc_free(state);
2834         return 0;
2835 }
2836
2837 static int update_recovery_lock_file(struct ctdb_context *ctdb)
2838 {
2839         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
2840         const char *reclockfile;
2841
2842         if (ctdb_ctrl_getreclock(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &reclockfile) != 0) {
2843                 DEBUG(DEBUG_ERR,("Failed to read reclock file from daemon\n"));
2844                 talloc_free(tmp_ctx);
2845                 return -1;      
2846         }
2847
2848         if (reclockfile == NULL) {
2849                 if (ctdb->recovery_lock_file != NULL) {
2850                         DEBUG(DEBUG_ERR,("Reclock file disabled\n"));
2851                         talloc_free(ctdb->recovery_lock_file);
2852                         ctdb->recovery_lock_file = NULL;
2853                         if (ctdb->recovery_lock_fd != -1) {
2854                                 close(ctdb->recovery_lock_fd);
2855                                 ctdb->recovery_lock_fd = -1;
2856                         }
2857                 }
2858                 ctdb->tunable.verify_recovery_lock = 0;
2859                 talloc_free(tmp_ctx);
2860                 return 0;
2861         }
2862
2863         if (ctdb->recovery_lock_file == NULL) {
2864                 ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
2865                 if (ctdb->recovery_lock_fd != -1) {
2866                         close(ctdb->recovery_lock_fd);
2867                         ctdb->recovery_lock_fd = -1;
2868                 }
2869                 talloc_free(tmp_ctx);
2870                 return 0;
2871         }
2872
2873
2874         if (!strcmp(reclockfile, ctdb->recovery_lock_file)) {
2875                 talloc_free(tmp_ctx);
2876                 return 0;
2877         }
2878
2879         talloc_free(ctdb->recovery_lock_file);
2880         ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
2881         ctdb->tunable.verify_recovery_lock = 0;
2882         if (ctdb->recovery_lock_fd != -1) {
2883                 close(ctdb->recovery_lock_fd);
2884                 ctdb->recovery_lock_fd = -1;
2885         }
2886
2887         talloc_free(tmp_ctx);
2888         return 0;
2889 }
2890
2891 static void main_loop(struct ctdb_context *ctdb, struct ctdb_recoverd *rec,
2892                       TALLOC_CTX *mem_ctx)
2893 {
2894         uint32_t pnn;
2895         struct ctdb_node_map *nodemap=NULL;
2896         struct ctdb_node_map *recmaster_nodemap=NULL;
2897         struct ctdb_node_map **remote_nodemaps=NULL;
2898         struct ctdb_vnn_map *vnnmap=NULL;
2899         struct ctdb_vnn_map *remote_vnnmap=NULL;
2900         int32_t debug_level;
2901         int i, j, ret;
2902
2903
2904
2905         /* verify that the main daemon is still running */
2906         if (kill(ctdb->ctdbd_pid, 0) != 0) {
2907                 DEBUG(DEBUG_CRIT,("CTDB daemon is no longer available. Shutting down recovery daemon\n"));
2908                 exit(-1);
2909         }
2910
2911         /* ping the local daemon to tell it we are alive */
2912         ctdb_ctrl_recd_ping(ctdb);
2913
2914         if (rec->election_timeout) {
2915                 /* an election is in progress */
2916                 return;
2917         }
2918
2919         /* read the debug level from the parent and update locally */
2920         ret = ctdb_ctrl_get_debuglevel(ctdb, CTDB_CURRENT_NODE, &debug_level);
2921         if (ret !=0) {
2922                 DEBUG(DEBUG_ERR, (__location__ " Failed to read debuglevel from parent\n"));
2923                 return;
2924         }
2925         LogLevel = debug_level;
2926
2927
2928         /* We must check if we need to ban a node here but we want to do this
2929            as early as possible so we dont wait until we have pulled the node
2930            map from the local node. thats why we have the hardcoded value 20
2931         */
2932         for (i=0; i<ctdb->num_nodes; i++) {
2933                 struct ctdb_banning_state *ban_state;
2934
2935                 if (ctdb->nodes[i]->ban_state == NULL) {
2936                         continue;
2937                 }
2938                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
2939                 if (ban_state->count < 20) {
2940                         continue;
2941                 }
2942                 DEBUG(DEBUG_NOTICE,("Node %u has caused %u recoveries recently - banning it for %u seconds\n",
2943                         ctdb->nodes[i]->pnn, ban_state->count,
2944                         ctdb->tunable.recovery_ban_period));
2945                 ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
2946                 ban_state->count = 0;
2947         }
2948
2949         /* get relevant tunables */
2950         ret = ctdb_ctrl_get_all_tunables(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->tunable);
2951         if (ret != 0) {
2952                 DEBUG(DEBUG_ERR,("Failed to get tunables - retrying\n"));
2953                 return;
2954         }
2955
2956         /* get the current recovery lock file from the server */
2957         if (update_recovery_lock_file(ctdb) != 0) {
2958                 DEBUG(DEBUG_ERR,("Failed to update the recovery lock file\n"));
2959                 return;
2960         }
2961
2962         /* Make sure that if recovery lock verification becomes disabled when
2963            we close the file
2964         */
2965         if (ctdb->tunable.verify_recovery_lock == 0) {
2966                 if (ctdb->recovery_lock_fd != -1) {
2967                         close(ctdb->recovery_lock_fd);
2968                         ctdb->recovery_lock_fd = -1;
2969                 }
2970         }
2971
2972         pnn = ctdb_ctrl_getpnn(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
2973         if (pnn == (uint32_t)-1) {
2974                 DEBUG(DEBUG_ERR,("Failed to get local pnn - retrying\n"));
2975                 return;
2976         }
2977
2978         /* get the vnnmap */
2979         ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &vnnmap);
2980         if (ret != 0) {
2981                 DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from node %u\n", pnn));
2982                 return;
2983         }
2984
2985
2986         /* get number of nodes */
2987         if (rec->nodemap) {
2988                 talloc_free(rec->nodemap);
2989                 rec->nodemap = NULL;
2990                 nodemap=NULL;
2991         }
2992         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), pnn, rec, &rec->nodemap);
2993         if (ret != 0) {
2994                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", pnn));
2995                 return;
2996         }
2997         nodemap = rec->nodemap;
2998
2999         /* check which node is the recovery master */
3000         ret = ctdb_ctrl_getrecmaster(ctdb, mem_ctx, CONTROL_TIMEOUT(), pnn, &rec->recmaster);
3001         if (ret != 0) {
3002                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from node %u\n", pnn));
3003                 return;
3004         }
3005
3006         /* if we are not the recmaster we can safely ignore any ip reallocate requests */
3007         if (rec->recmaster != pnn) {
3008                 if (rec->ip_reallocate_ctx != NULL) {
3009                         talloc_free(rec->ip_reallocate_ctx);
3010                         rec->ip_reallocate_ctx = NULL;
3011                         rec->reallocate_callers = NULL;
3012                 }
3013         }
3014
3015         if (rec->recmaster == (uint32_t)-1) {
3016                 DEBUG(DEBUG_NOTICE,(__location__ " Initial recovery master set - forcing election\n"));
3017                 force_election(rec, pnn, nodemap);
3018                 return;
3019         }
3020
3021
3022         /* if the local daemon is STOPPED, we verify that the databases are
3023            also frozen and thet the recmode is set to active 
3024         */
3025         if (nodemap->nodes[pnn].flags & NODE_FLAGS_STOPPED) {
3026                 ret = ctdb_ctrl_getrecmode(ctdb, mem_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->recovery_mode);
3027                 if (ret != 0) {
3028                         DEBUG(DEBUG_ERR,(__location__ " Failed to read recmode from local node\n"));
3029                 }
3030                 if (ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
3031                         DEBUG(DEBUG_ERR,("Node is stopped but recovery mode is not active. Activate recovery mode and lock databases\n"));
3032
3033                         ret = ctdb_ctrl_freeze_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, 1);
3034                         if (ret != 0) {
3035                                 DEBUG(DEBUG_ERR,(__location__ " Failed to freeze node due to node being STOPPED\n"));
3036                                 return;
3037                         }
3038                         ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
3039                         if (ret != 0) {
3040                                 DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode due to node being stopped\n"));
3041
3042                                 return;
3043                         }
3044                         return;
3045                 }
3046         }
3047         /* If the local node is stopped, verify we are not the recmaster 
3048            and yield this role if so
3049         */
3050         if ((nodemap->nodes[pnn].flags & NODE_FLAGS_STOPPED) && (rec->recmaster == pnn)) {
3051                 DEBUG(DEBUG_ERR,("Local node is STOPPED. Yielding recmaster role\n"));
3052                 force_election(rec, pnn, nodemap);
3053                 return;
3054         }
3055         
3056         /* check that we (recovery daemon) and the local ctdb daemon
3057            agrees on whether we are banned or not
3058         */
3059 //qqq
3060
3061         /* remember our own node flags */
3062         rec->node_flags = nodemap->nodes[pnn].flags;
3063
3064         /* count how many active nodes there are */
3065         rec->num_active    = 0;
3066         rec->num_connected = 0;
3067         for (i=0; i<nodemap->num; i++) {
3068                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
3069                         rec->num_active++;
3070                 }
3071                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
3072                         rec->num_connected++;
3073                 }
3074         }
3075
3076
3077         /* verify that the recmaster node is still active */
3078         for (j=0; j<nodemap->num; j++) {
3079                 if (nodemap->nodes[j].pnn==rec->recmaster) {
3080                         break;
3081                 }
3082         }
3083
3084         if (j == nodemap->num) {
3085                 DEBUG(DEBUG_ERR, ("Recmaster node %u not in list. Force reelection\n", rec->recmaster));
3086                 force_election(rec, pnn, nodemap);
3087                 return;
3088         }
3089
3090         /* if recovery master is disconnected we must elect a new recmaster */
3091         if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
3092                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u is disconnected. Force reelection\n", nodemap->nodes[j].pnn));
3093                 force_election(rec, pnn, nodemap);
3094                 return;
3095         }
3096
3097         /* grap the nodemap from the recovery master to check if it is banned */
3098         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
3099                                    mem_ctx, &recmaster_nodemap);
3100         if (ret != 0) {
3101                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from recovery master %u\n", 
3102                           nodemap->nodes[j].pnn));
3103                 return;
3104         }
3105
3106
3107         if (recmaster_nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3108                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u no longer available. Force reelection\n", nodemap->nodes[j].pnn));
3109                 force_election(rec, pnn, nodemap);
3110                 return;
3111         }
3112
3113
3114         /* verify that we have all ip addresses we should have and we dont
3115          * have addresses we shouldnt have.
3116          */ 
3117         if (ctdb->do_checkpublicip) {
3118                 if (rec->ip_check_disable_ctx == NULL) {
3119                         if (verify_local_ip_allocation(ctdb, rec, pnn) != 0) {
3120                                 DEBUG(DEBUG_ERR, (__location__ " Public IPs were inconsistent.\n"));
3121                         }
3122                 }
3123         }
3124
3125
3126         /* if we are not the recmaster then we do not need to check
3127            if recovery is needed
3128          */
3129         if (pnn != rec->recmaster) {
3130                 return;
3131         }
3132
3133
3134         /* ensure our local copies of flags are right */
3135         ret = update_local_flags(rec, nodemap);
3136         if (ret == MONITOR_ELECTION_NEEDED) {
3137                 DEBUG(DEBUG_NOTICE,("update_local_flags() called for a re-election.\n"));
3138                 force_election(rec, pnn, nodemap);
3139                 return;
3140         }
3141         if (ret != MONITOR_OK) {
3142                 DEBUG(DEBUG_ERR,("Unable to update local flags\n"));
3143                 return;
3144         }
3145
3146         if (ctdb->num_nodes != nodemap->num) {
3147                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) reloading nodes file\n", ctdb->num_nodes, nodemap->num));
3148                 reload_nodes_file(ctdb);
3149                 return;
3150         }
3151
3152         /* verify that all active nodes agree that we are the recmaster */
3153         switch (verify_recmaster(rec, nodemap, pnn)) {
3154         case MONITOR_RECOVERY_NEEDED:
3155                 /* can not happen */
3156                 return;
3157         case MONITOR_ELECTION_NEEDED:
3158                 force_election(rec, pnn, nodemap);
3159                 return;
3160         case MONITOR_OK:
3161                 break;
3162         case MONITOR_FAILED:
3163                 return;
3164         }
3165
3166
3167         if (rec->need_recovery) {
3168                 /* a previous recovery didn't finish */
3169                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3170                 return;
3171         }
3172
3173         /* verify that all active nodes are in normal mode 
3174            and not in recovery mode 
3175         */
3176         switch (verify_recmode(ctdb, nodemap)) {
3177         case MONITOR_RECOVERY_NEEDED:
3178                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3179                 return;
3180         case MONITOR_FAILED:
3181                 return;
3182         case MONITOR_ELECTION_NEEDED:
3183                 /* can not happen */
3184         case MONITOR_OK:
3185                 break;
3186         }
3187
3188
3189         if (ctdb->tunable.verify_recovery_lock != 0) {
3190                 /* we should have the reclock - check its not stale */
3191                 ret = check_recovery_lock(ctdb);
3192                 if (ret != 0) {
3193                         DEBUG(DEBUG_ERR,("Failed check_recovery_lock. Force a recovery\n"));
3194                         ctdb_set_culprit(rec, ctdb->pnn);
3195                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3196                         return;
3197                 }
3198         }
3199
3200         /* if there are takeovers requested, perform it and notify the waiters */
3201         if (rec->reallocate_callers) {
3202                 process_ipreallocate_requests(ctdb, rec);
3203         }
3204
3205         /* get the nodemap for all active remote nodes
3206          */
3207         remote_nodemaps = talloc_array(mem_ctx, struct ctdb_node_map *, nodemap->num);
3208         if (remote_nodemaps == NULL) {
3209                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate remote nodemap array\n"));
3210                 return;
3211         }
3212         for(i=0; i<nodemap->num; i++) {
3213                 remote_nodemaps[i] = NULL;
3214         }
3215         if (get_remote_nodemaps(ctdb, mem_ctx, nodemap, remote_nodemaps) != 0) {
3216                 DEBUG(DEBUG_ERR,(__location__ " Failed to read remote nodemaps\n"));
3217                 return;
3218         } 
3219
3220         /* verify that all other nodes have the same nodemap as we have
3221         */
3222         for (j=0; j<nodemap->num; j++) {
3223                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3224                         continue;
3225                 }
3226
3227                 if (remote_nodemaps[j] == NULL) {
3228                         DEBUG(DEBUG_ERR,(__location__ " Did not get a remote nodemap for node %d, restarting monitoring\n", j));
3229                         ctdb_set_culprit(rec, j);
3230
3231                         return;
3232                 }
3233
3234                 /* if the nodes disagree on how many nodes there are
3235                    then this is a good reason to try recovery
3236                  */
3237                 if (remote_nodemaps[j]->num != nodemap->num) {
3238                         DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different node count. %u vs %u of the local node\n",
3239                                   nodemap->nodes[j].pnn, remote_nodemaps[j]->num, nodemap->num));
3240                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3241                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3242                         return;
3243                 }
3244
3245                 /* if the nodes disagree on which nodes exist and are
3246                    active, then that is also a good reason to do recovery
3247                  */
3248                 for (i=0;i<nodemap->num;i++) {
3249                         if (remote_nodemaps[j]->nodes[i].pnn != nodemap->nodes[i].pnn) {
3250                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different nodemap pnn for %d (%u vs %u).\n", 
3251                                           nodemap->nodes[j].pnn, i, 
3252                                           remote_nodemaps[j]->nodes[i].pnn, nodemap->nodes[i].pnn));
3253                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3254                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3255                                             vnnmap);
3256                                 return;
3257                         }
3258                 }
3259
3260                 /* verify the flags are consistent
3261                 */
3262       &